Skip to content

Commit

Permalink
implement reusable messages
Browse files Browse the repository at this point in the history
Signed-off-by: onur-ozkan <[email protected]>
  • Loading branch information
onur-ozkan committed Sep 30, 2024
1 parent 8727e52 commit d2d9fff
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 28 deletions.
11 changes: 11 additions & 0 deletions mm2src/common/expirable_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ impl<K: Eq + Hash, V> ExpirableMap<K, V> {
#[inline]
pub fn get(&mut self, k: &K) -> Option<&V> { self.0.get(k).map(|v| &v.value) }

/// Returns the associated value if present and has longer ttl than the given one.
pub fn get_if_has_longer_life_than(&mut self, k: &K, min_ttl: Duration) -> Option<&V> {
self.0.get(k).and_then(|entry| {
if entry.expires_at > Instant::now() + min_ttl {
Some(&entry.value)
} else {
None
}
})
}

/// Inserts a key-value pair with an expiration duration.
///
/// If a value already exists for the given key, it will be updated and then
Expand Down
6 changes: 0 additions & 6 deletions mm2src/mm2_core/src/mm_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,12 @@ const EXPORT_METRICS_INTERVAL: f64 = 5. * 60.;
pub struct HealthChecker {
/// Links the RPC context to the P2P context to handle health check responses.
pub response_handler: AsyncMutex<ExpirableMap<String, oneshot::Sender<()>>>,
/// This is used to record healthcheck sender peers in an expirable manner to prevent DDoS attacks.
pub ddos_shield: AsyncMutex<ExpirableMap<String, ()>>,
pub config: HealthcheckConfig,
}

#[derive(Debug, Deserialize)]
#[serde(default)]
pub struct HealthcheckConfig {
/// Required time (millisecond) to wait before processing another healthcheck request from the same peer.
pub blocking_ms_for_per_address: u64,
/// Lifetime of the message.
/// Do not change this unless you know what you are doing.
pub message_expiration_secs: u64,
Expand All @@ -68,7 +64,6 @@ pub struct HealthcheckConfig {
impl Default for HealthcheckConfig {
fn default() -> Self {
Self {
blocking_ms_for_per_address: 750,
message_expiration_secs: 10,
timeout_secs: 10,
}
Expand Down Expand Up @@ -227,7 +222,6 @@ impl MmCtx {
async_sqlite_connection: Constructible::default(),
health_checker: HealthChecker {
response_handler: AsyncMutex::new(ExpirableMap::default()),
ddos_shield: AsyncMutex::new(ExpirableMap::default()),
config: HealthcheckConfig::default(),
},
}
Expand Down
68 changes: 46 additions & 22 deletions mm2src/mm2_main/src/lp_healthcheck.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use async_std::prelude::FutureExt;
use chrono::Utc;
use common::executor::SpawnFuture;
use common::expirable_map::ExpirableMap;
use common::{log, HttpStatusCode, StatusCode};
use derive_more::Display;
use futures::channel::oneshot::{self, Receiver, Sender};
use futures::lock::Mutex as AsyncMutex;
use instant::Duration;
use lazy_static::lazy_static;
use mm2_core::mm_ctx::{HealthcheckConfig, MmArc};
use mm2_err_handle::prelude::MmError;
use mm2_libp2p::{decode_message, encode_message, pub_sub_topic, Libp2pPublic, TopicPrefix};
Expand All @@ -20,7 +23,7 @@ use crate::lp_network::broadcast_p2p_msg;

pub(crate) const PEER_HEALTHCHECK_PREFIX: TopicPrefix = "hcheck";

#[derive(Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
#[cfg_attr(any(test, target_arch = "wasm32"), derive(PartialEq))]
pub(crate) struct HealthcheckMessage {
#[serde(deserialize_with = "deserialize_bytes")]
Expand Down Expand Up @@ -189,7 +192,7 @@ impl HealthcheckMessage {
pub(crate) fn sender_peer(&self) -> PeerAddress { self.data.sender_peer }
}

#[derive(Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
#[cfg_attr(any(test, target_arch = "wasm32"), derive(PartialEq))]
struct HealthcheckData {
sender_peer: PeerAddress,
Expand Down Expand Up @@ -325,6 +328,13 @@ pub async fn peer_connection_healthcheck_rpc(
}

pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_libp2p::GossipsubMessage) {
lazy_static! {
static ref RECENTLY_GENERATED_MESSAGES: AsyncMutex<ExpirableMap<String, HealthcheckMessage>> =
AsyncMutex::new(ExpirableMap::new());
}

const MIN_DURATION_FOR_REUSABLE_MSG: Duration = Duration::from_secs(6);

macro_rules! try_or_return {
($exp:expr, $msg: expr) => {
match $exp {
Expand Down Expand Up @@ -357,36 +367,50 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li
return;
};

let mut ddos_shield = ctx.health_checker.ddos_shield.lock().await;
ddos_shield.clear_expired_entries();
if ddos_shield
.insert(
sender_peer.to_string(),
(),
Duration::from_millis(ctx.health_checker.config.blocking_ms_for_per_address),
)
.is_some()
{
log::warn!("Peer '{sender_peer}' exceeded the healthcheck blocking time, skipping their message.");
return;
}
drop(ddos_shield);

if data.should_reply() {
// Reply the message so they know we are healthy.

let topic = peer_healthcheck_topic(&sender_peer);
// If message has longer life than `MIN_DURATION_FOR_REUSABLE_MSG`, we are reusing them to
// reduce the message generation overhead under high pressure.
let mut messages = RECENTLY_GENERATED_MESSAGES.lock().await;
messages.clear_expired_entries();

let msg = try_or_return!(
HealthcheckMessage::generate_message(&ctx, sender_peer, true, 10),
"Couldn't generate the healthcheck message, this is very unusual!"
);
let message_map_key = sender_peer.to_string();

let expiration_secs = ctx
.health_checker
.config
.message_expiration_secs
.try_into()
.unwrap_or(HealthcheckConfig::default().message_expiration_secs as i64);

let msg = match messages
.get_if_has_longer_life_than(&message_map_key, MIN_DURATION_FOR_REUSABLE_MSG)
.cloned()
{
Some(t) => t,
None => {
let msg = try_or_return!(
HealthcheckMessage::generate_message(&ctx, sender_peer, true, expiration_secs),
"Couldn't generate the healthcheck message, this is very unusual!"
);

messages.insert(
message_map_key,
msg.clone(),
Duration::from_secs(expiration_secs as u64),
);

msg
},
};

let encoded_msg = try_or_return!(
msg.encode(),
"Couldn't encode healthcheck message, this is very unusual!"
);

let topic = peer_healthcheck_topic(&sender_peer);
broadcast_p2p_msg(&ctx, topic, encoded_msg, None);
} else {
// The requested peer is healthy; signal the response channel.
Expand Down

0 comments on commit d2d9fff

Please sign in to comment.