Skip to content

Commit

Permalink
use more precise error type and remove sender_peer from hc message
Browse files Browse the repository at this point in the history
Signed-off-by: onur-ozkan <[email protected]>
  • Loading branch information
onur-ozkan committed Oct 2, 2024
1 parent b98832c commit 3169081
Showing 1 changed file with 83 additions and 62 deletions.
145 changes: 83 additions & 62 deletions mm2src/mm2_main/src/lp_healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,37 @@ impl<'de> Deserialize<'de> for PeerAddress {
}
}

#[derive(Debug, Display)]
enum SignValidationError {
#[display(
fmt = "Healthcheck message is expired. Current time in UTC: {now_secs}, healthcheck `expires_at` in UTC: {expires_at_secs}"
)]
Expired { now_secs: u64, expires_at_secs: u64 },
#[display(
fmt = "Healthcheck message have too high expiration time. Max allowed expiration seconds: {max_allowed_expiration_secs}, received message expiration seconds: {remaining_expiration_secs}"
)]
LifetimeOverflow {
max_allowed_expiration_secs: u64,
remaining_expiration_secs: u64,
},
#[display(fmt = "Public key is not valid.")]
InvalidPublicKey,
#[display(fmt = "Signature integrity doesn't match with the public key.")]
FakeSignature,
#[display(fmt = "Process failed unexpectedly due to this reason: {reason}")]
Internal { reason: String },
}

impl HealthcheckMessage {
pub(crate) fn generate_message(ctx: &MmArc, is_a_reply: bool) -> Result<Self, String> {
let p2p_ctx = P2PContext::fetch_from_mm_arc(ctx);
let sender_peer = p2p_ctx.peer_id().into();
let keypair = p2p_ctx.keypair();
let sender_public_key = keypair.public().encode_protobuf();

let data = HealthcheckData {
sender_peer,
sender_public_key,
expires_at: Utc::now().timestamp() + healthcheck_message_exp_secs() as i64,
expires_at_secs: u64::try_from(Utc::now().timestamp()).map_err(|e| e.to_string())?
+ healthcheck_message_exp_secs(),
is_a_reply,
};

Expand All @@ -131,9 +151,8 @@ impl HealthcheckMessage {
HealthcheckMessage {
signature: vec![],
data: HealthcheckData {
sender_peer: create_test_peer_address(),
sender_public_key: vec![],
expires_at: 0,
expires_at_secs: 0,
is_a_reply: false,
},
},
Expand All @@ -157,49 +176,40 @@ impl HealthcheckMessage {
}
}

pub(crate) fn is_received_message_valid(&self) -> bool {
let now = Utc::now().timestamp();
let remaining_expiration_seconds = u64::try_from(self.data.expires_at - now).unwrap_or(0);

if remaining_expiration_seconds == 0 {
log::debug!(
"Healthcheck message is expired. Current time in UTC: {now}, healthcheck `expires_at` in UTC: {}",
self.data.expires_at
);
return false;
} else if remaining_expiration_seconds > healthcheck_message_exp_secs() {
log::debug!(
"Healthcheck message have too high expiration time.\nMax allowed expiration seconds: {}\nReceived message expiration seconds: {}",
self.data.expires_at,
remaining_expiration_seconds,
);
return false;
fn is_received_message_valid(&self) -> Result<PeerAddress, SignValidationError> {
let now_secs = u64::try_from(Utc::now().timestamp())
.map_err(|e| SignValidationError::Internal { reason: e.to_string() })?;

let remaining_expiration_secs = self.data.expires_at_secs - now_secs;

if remaining_expiration_secs == 0 {
return Err(SignValidationError::Expired {
now_secs,
expires_at_secs: self.data.expires_at_secs,
});
} else if remaining_expiration_secs > healthcheck_message_exp_secs() {
return Err(SignValidationError::LifetimeOverflow {
max_allowed_expiration_secs: healthcheck_message_exp_secs(),
remaining_expiration_secs,
});
}

let Ok(public_key) = Libp2pPublic::try_decode_protobuf(&self.data.sender_public_key) else {
log::debug!("Couldn't decode public key from the healthcheck message.");

return false
return Err(SignValidationError::InvalidPublicKey);
};

if self.data.sender_peer != public_key.to_peer_id().into() {
log::debug!("`sender_peer` and `sender_public_key` doesn't belong each other.");

return false;
}

let Ok(encoded_message) = self.data.encode() else {
log::debug!("Couldn't encode healthcheck data.");
return false
};
let encoded_message = self
.data
.encode()
.map_err(|e| SignValidationError::Internal { reason: e.to_string() })?;

let res = public_key.verify(&encoded_message, &self.signature);

if !res {
log::debug!("Healthcheck isn't signed correctly.");
if public_key.verify(&encoded_message, &self.signature) {
Ok(public_key.to_peer_id().into())
} else {
Err(SignValidationError::FakeSignature)
}

res
}

#[inline]
Expand All @@ -210,18 +220,14 @@ impl HealthcheckMessage {

#[inline]
pub(crate) fn should_reply(&self) -> bool { !self.data.is_a_reply }

#[inline]
pub(crate) fn sender_peer(&self) -> PeerAddress { self.data.sender_peer }
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[cfg_attr(any(test, target_arch = "wasm32"), derive(PartialEq))]
struct HealthcheckData {
sender_peer: PeerAddress,
#[serde(deserialize_with = "deserialize_bytes")]
sender_public_key: Vec<u8>,
expires_at: i64,
expires_at_secs: u64,
is_a_reply: bool,
}

Expand Down Expand Up @@ -358,17 +364,17 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li
"Couldn't decode healthcheck message"
);

let sender_peer = data.sender_peer();

let ctx = ctx.clone();

// Pass the remaining work to another thread to free up this one as soon as possible,
// so KDF can handle a high amount of healthcheck messages more efficiently.
ctx.spawner().spawn(async move {
if !data.is_received_message_valid() {
log::error!("Received an invalid healthcheck message.");
log::debug!("Message context: {:?}", data);
return;
let sender_peer = match data.is_received_message_valid() {
Ok(t) => t,
Err(e) => {
log::error!("Received an invalid healthcheck message. Error: {e}");
return;
},
};

if data.should_reply() {
Expand Down Expand Up @@ -400,13 +406,10 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li
});
}

fn create_test_peer_address() -> PeerAddress {
let keypair = mm2_libp2p::Keypair::generate_ed25519();
mm2_libp2p::PeerId::from(keypair.public()).into()
}

#[cfg(any(test, target_arch = "wasm32"))]
mod tests {
use std::mem::discriminant;

use super::*;
use common::cross_test;
use crypto::CryptoCtx;
Expand Down Expand Up @@ -453,30 +456,48 @@ mod tests {
cross_test!(test_valid_message, {
let ctx = ctx();
let message = HealthcheckMessage::generate_message(&ctx, false).unwrap();
assert!(message.is_received_message_valid());
message.is_received_message_valid().unwrap();
});

cross_test!(test_corrupted_messages, {
let ctx = ctx();

let mut message = HealthcheckMessage::generate_message(&ctx, false).unwrap();
message.data.expires_at += 1;
assert!(!message.is_received_message_valid());
message.data.expires_at_secs += healthcheck_message_exp_secs() * 3;
assert_eq!(
discriminant(&message.is_received_message_valid().err().unwrap()),
discriminant(&SignValidationError::LifetimeOverflow {
max_allowed_expiration_secs: 0,
remaining_expiration_secs: 0
})
);

let mut message = HealthcheckMessage::generate_message(&ctx, false).unwrap();
message.data.is_a_reply = !message.data.is_a_reply;
assert!(!message.is_received_message_valid());
assert_eq!(
discriminant(&message.is_received_message_valid().err().unwrap()),
discriminant(&SignValidationError::FakeSignature)
);

let mut message = HealthcheckMessage::generate_message(&ctx, false).unwrap();
message.data.sender_peer = create_test_peer_address();
assert!(!message.is_received_message_valid());
message.data.sender_public_key.push(0);
assert_eq!(
discriminant(&message.is_received_message_valid().err().unwrap()),
discriminant(&SignValidationError::InvalidPublicKey)
);
});

cross_test!(test_expired_message, {
let ctx = ctx();
let message = HealthcheckMessage::generate_message(&ctx, false).unwrap();
common::executor::Timer::sleep(3.).await;
assert!(!message.is_received_message_valid());
assert_eq!(
discriminant(&message.is_received_message_valid().err().unwrap()),
discriminant(&SignValidationError::Expired {
now_secs: 0,
expires_at_secs: 0
})
);
});

cross_test!(test_encode_decode, {
Expand Down

0 comments on commit 3169081

Please sign in to comment.