From 01994cdea0592d314fd7e4fcc5e6efa8d19eb1d0 Mon Sep 17 00:00:00 2001 From: Artem Vitae Date: Wed, 13 Dec 2023 20:10:22 +0700 Subject: [PATCH] Handling accept_only_from in progress. --- mm2src/mm2_main/src/lp_ordermatch.rs | 39 ++++++++++++------- mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs | 13 ++++++- mm2src/mm2_main/src/lp_swap/swap_v2_common.rs | 12 +----- mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs | 13 ++++++- mm2src/mm2_main/src/ordermatch_tests.rs | 11 +++++- mm2src/mm2_p2p/src/lib.rs | 4 ++ 6 files changed, 63 insertions(+), 29 deletions(-) diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index e9d494ea57..1f6fe3c377 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -42,7 +42,7 @@ use http::Response; use keys::{AddressFormat, KeyPair}; use mm2_core::mm_ctx::{from_ctx, MmArc, MmWeak}; use mm2_err_handle::prelude::*; -use mm2_libp2p::{decode_signed, encode_and_sign, encode_message, pub_sub_topic, TopicHash, TopicPrefix, +use mm2_libp2p::{decode_signed, encode_and_sign, encode_message, pub_sub_topic, PublicKey, TopicHash, TopicPrefix, TOPIC_SEPARATOR}; use mm2_metrics::mm_gauge; use mm2_number::{BigDecimal, BigRational, MmNumber, MmNumberMultiRepr}; @@ -572,11 +572,11 @@ pub async fn process_msg(ctx: MmArc, from_peer: String, msg: &[u8], i_am_relay: Ok(()) }, new_protocol::OrdermatchMessage::TakerConnect(taker_connect) => { - process_taker_connect(ctx, pubkey.unprefixed().into(), taker_connect.into()).await; + process_taker_connect(ctx, pubkey, taker_connect.into()).await; Ok(()) }, new_protocol::OrdermatchMessage::MakerConnected(maker_connected) => { - process_maker_connected(ctx, pubkey.unprefixed().into(), maker_connected.into()).await; + process_maker_connected(ctx, pubkey, maker_connected.into()).await; Ok(()) }, new_protocol::OrdermatchMessage::MakerOrderCancelled(cancelled_msg) => { @@ -2872,7 +2872,7 @@ impl MakerOrdersContext { } #[cfg_attr(test, mockable)] -fn lp_connect_start_bob(ctx: MmArc, maker_match: MakerMatch, maker_order: MakerOrder) { +fn lp_connect_start_bob(ctx: MmArc, maker_match: MakerMatch, maker_order: MakerOrder, taker_p2p_pubkey: PublicKey) { let spawner = ctx.spawner(); let uuid = maker_match.request.uuid; @@ -2982,6 +2982,9 @@ fn lp_connect_start_bob(ctx: MmArc, maker_match: MakerMatch, maker_order: MakerO p2p_keypair: maker_order.p2p_privkey.map(SerializableSecp256k1Keypair::into_inner), secret_hash_algo, lock_duration: lock_time, + taker_p2p_pubkey: match taker_p2p_pubkey { + PublicKey::Secp256k1(pubkey) => pubkey.into_inner(), + }, }; #[allow(clippy::box_default)] maker_swap_state_machine @@ -3027,7 +3030,7 @@ fn lp_connect_start_bob(ctx: MmArc, maker_match: MakerMatch, maker_order: MakerO spawner.spawn_with_settings(fut, settings); } -fn lp_connected_alice(ctx: MmArc, taker_order: TakerOrder, taker_match: TakerMatch) { +fn lp_connected_alice(ctx: MmArc, taker_order: TakerOrder, taker_match: TakerMatch, maker_p2p_pubkey: PublicKey) { let spawner = ctx.spawner(); let uuid = taker_match.reserved.taker_order_uuid; @@ -3138,6 +3141,9 @@ fn lp_connected_alice(ctx: MmArc, taker_order: TakerOrder, taker_match: TakerMat uuid, p2p_keypair: taker_order.p2p_privkey.map(SerializableSecp256k1Keypair::into_inner), taker_secret, + maker_p2p_pubkey: match maker_p2p_pubkey { + PublicKey::Secp256k1(pubkey) => pubkey.into_inner(), + }, }; #[allow(clippy::box_default)] taker_swap_state_machine @@ -3555,7 +3561,7 @@ async fn process_maker_reserved(ctx: MmArc, from_pubkey: H256Json, reserved_msg: } } -async fn process_maker_connected(ctx: MmArc, from_pubkey: H256Json, connected: MakerConnected) { +async fn process_maker_connected(ctx: MmArc, from_pubkey: PublicKey, connected: MakerConnected) { log::debug!("Processing MakerConnected {:?}", connected); let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); @@ -3564,7 +3570,8 @@ async fn process_maker_connected(ctx: MmArc, from_pubkey: H256Json, connected: M Err(_) => return, }; - if our_public_id.bytes == from_pubkey.0 { + let unprefixed_from = from_pubkey.unprefixed(); + if our_public_id.bytes == unprefixed_from { log::warn!("Skip maker connected from our pubkey"); return; } @@ -3585,12 +3592,17 @@ async fn process_maker_connected(ctx: MmArc, from_pubkey: H256Json, connected: M }, }; - if order_match.reserved.sender_pubkey != from_pubkey { + if order_match.reserved.sender_pubkey != unprefixed_from.into() { error!("Connected message sender pubkey != reserved message sender pubkey"); return; } // alice - lp_connected_alice(ctx.clone(), my_order_entry.get().clone(), order_match.clone()); + lp_connected_alice( + ctx.clone(), + my_order_entry.get().clone(), + order_match.clone(), + from_pubkey, + ); // remove the matched order immediately let order = my_order_entry.remove(); delete_my_taker_order(ctx, order, TakerOrderCancellationReason::Fulfilled) @@ -3701,7 +3713,7 @@ async fn process_taker_request(ctx: MmArc, from_pubkey: H256Json, taker_request: } } -async fn process_taker_connect(ctx: MmArc, sender_pubkey: H256Json, connect_msg: TakerConnect) { +async fn process_taker_connect(ctx: MmArc, sender_pubkey: PublicKey, connect_msg: TakerConnect) { log::debug!("Processing TakerConnect {:?}", connect_msg); let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); @@ -3710,7 +3722,8 @@ async fn process_taker_connect(ctx: MmArc, sender_pubkey: H256Json, connect_msg: Err(_) => return, }; - if our_public_id.bytes == sender_pubkey.0 { + let sender_unprefixed = sender_pubkey.unprefixed(); + if our_public_id.bytes == sender_unprefixed { log::warn!("Skip taker connect from our pubkey"); return; } @@ -3737,7 +3750,7 @@ async fn process_taker_connect(ctx: MmArc, sender_pubkey: H256Json, connect_msg: return; }, }; - if order_match.request.sender_pubkey != sender_pubkey { + if order_match.request.sender_pubkey != sender_unprefixed.into() { log::warn!("Connect message sender pubkey != request message sender pubkey"); return; } @@ -3754,7 +3767,7 @@ async fn process_taker_connect(ctx: MmArc, sender_pubkey: H256Json, connect_msg: order_match.connected = Some(connected.clone()); let order_match = order_match.clone(); my_order.started_swaps.push(order_match.request.uuid); - lp_connect_start_bob(ctx.clone(), order_match, my_order.clone()); + lp_connect_start_bob(ctx.clone(), order_match, my_order.clone(), sender_pubkey); let topic = my_order.orderbook_topic(); broadcast_ordermatch_message(&ctx, topic.clone(), connected.into(), my_order.p2p_keypair()); diff --git a/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs b/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs index 3596e05c99..db38c4ddbc 100644 --- a/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs +++ b/mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs @@ -24,6 +24,7 @@ use mm2_state_machine::prelude::*; use mm2_state_machine::storable_state_machine::*; use primitives::hash::H256; use rpc::v1::types::{Bytes as BytesJson, H256 as H256Json}; +use secp256k1::PublicKey; use std::convert::TryInto; use std::marker::PhantomData; use uuid::Uuid; @@ -365,6 +366,8 @@ pub struct MakerSwapStateMachine, /// Abortable queue used to spawn related activities pub abortable_system: AbortableQueue, + /// Taker's P2P pubkey + pub taker_p2p_pubkey: PublicKey, } impl MakerSwapStateMachine { @@ -593,6 +596,11 @@ impl Storable p2p_topic: swap_v2_topic(&uuid), uuid, p2p_keypair: repr.p2p_keypair.map(|k| k.into_inner()), + taker_p2p_pubkey: PublicKey::from_slice(&[ + 3, 23, 183, 225, 206, 31, 159, 148, 195, 42, 67, 115, 146, 41, 248, 140, 11, 3, 51, 41, 111, 180, 110, + 143, 114, 134, 88, 73, 198, 174, 52, 184, 78, + ]) + .unwrap(), }; Ok((RestoredMachine::new(machine), current_state)) @@ -607,12 +615,13 @@ impl Storable } fn init_additional_context(&mut self) { - init_additional_context_impl(&self.ctx, ActiveSwapV2Info { + let swap_info = ActiveSwapV2Info { uuid: self.uuid, maker_coin: self.maker_coin.ticker().into(), taker_coin: self.taker_coin.ticker().into(), swap_type: MAKER_SWAP_V2_TYPE, - }) + }; + init_additional_context_impl(&self.ctx, swap_info, self.taker_p2p_pubkey); } fn clean_up_context(&mut self) { diff --git a/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs b/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs index 29cb16445b..ec87e9b79b 100644 --- a/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs +++ b/mm2src/mm2_main/src/lp_swap/swap_v2_common.rs @@ -228,18 +228,10 @@ pub(super) async fn mark_swap_as_finished(ctx: MmArc, id: Uuid) -> MmResult<(), Ok(()) } -pub(super) fn init_additional_context_impl(ctx: &MmArc, swap_info: ActiveSwapV2Info) { +pub(super) fn init_additional_context_impl(ctx: &MmArc, swap_info: ActiveSwapV2Info, other_p2p_pubkey: PublicKey) { subscribe_to_topic(ctx, swap_v2_topic(&swap_info.uuid)); let swap_ctx = SwapsContext::from_ctx(ctx).expect("SwapsContext::from_ctx should not fail"); - swap_ctx.init_msg_v2_store( - swap_info.uuid, - // just a "random" pubkey for now - PublicKey::from_slice(&[ - 3, 23, 183, 225, 206, 31, 159, 148, 195, 42, 67, 115, 146, 41, 248, 140, 11, 3, 51, 41, 111, 180, 110, 143, - 114, 134, 88, 73, 198, 174, 52, 184, 78, - ]) - .unwrap(), - ); + swap_ctx.init_msg_v2_store(swap_info.uuid, other_p2p_pubkey); swap_ctx .active_swaps_v2_infos .lock() diff --git a/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs b/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs index bd908146b4..2c883feef7 100644 --- a/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs +++ b/mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs @@ -25,6 +25,7 @@ use mm2_state_machine::prelude::*; use mm2_state_machine::storable_state_machine::*; use primitives::hash::H256; use rpc::v1::types::{Bytes as BytesJson, H256 as H256Json}; +use secp256k1::PublicKey; use std::convert::TryInto; use std::marker::PhantomData; use uuid::Uuid; @@ -396,6 +397,8 @@ pub struct TakerSwapStateMachine TakerSwapStateMachine { @@ -691,6 +694,11 @@ impl Storable uuid, p2p_keypair: repr.p2p_keypair.map(|k| k.into_inner()), taker_secret: repr.taker_secret.into(), + maker_p2p_pubkey: PublicKey::from_slice(&[ + 3, 23, 183, 225, 206, 31, 159, 148, 195, 42, 67, 115, 146, 41, 248, 140, 11, 3, 51, 41, 111, 180, 110, + 143, 114, 134, 88, 73, 198, 174, 52, 184, 78, + ]) + .unwrap(), }; Ok((RestoredMachine::new(machine), current_state)) } @@ -704,12 +712,13 @@ impl Storable } fn init_additional_context(&mut self) { - init_additional_context_impl(&self.ctx, ActiveSwapV2Info { + let swap_info = ActiveSwapV2Info { uuid: self.uuid, maker_coin: self.maker_coin.ticker().into(), taker_coin: self.taker_coin.ticker().into(), swap_type: TAKER_SWAP_V2_TYPE, - }) + }; + init_additional_context_impl(&self.ctx, swap_info, self.maker_p2p_pubkey); } fn clean_up_context(&mut self) { diff --git a/mm2src/mm2_main/src/ordermatch_tests.rs b/mm2src/mm2_main/src/ordermatch_tests.rs index e9a1c1cb36..b0657b0d00 100644 --- a/mm2src/mm2_main/src/ordermatch_tests.rs +++ b/mm2src/mm2_main/src/ordermatch_tests.rs @@ -12,6 +12,7 @@ use mm2_net::p2p::P2PContext; use mm2_test_helpers::for_tests::mm_ctx_with_iguana; use mocktopus::mocking::*; use rand::{seq::SliceRandom, thread_rng, Rng}; +use secp256k1::PublicKey; use std::collections::HashSet; use std::iter::{self, FromIterator}; use std::sync::Mutex; @@ -1212,7 +1213,7 @@ fn lp_connect_start_bob_should_not_be_invoked_if_order_match_already_connected() .add_order(ctx.weak(), maker_order, None); static mut CONNECT_START_CALLED: bool = false; - lp_connect_start_bob.mock_safe(|_, _, _| { + lp_connect_start_bob.mock_safe(|_, _, _, _| { unsafe { CONNECT_START_CALLED = true; } @@ -1221,7 +1222,13 @@ fn lp_connect_start_bob_should_not_be_invoked_if_order_match_already_connected() }); let connect: TakerConnect = json::from_str(r#"{"taker_order_uuid":"2f9afe84-7a89-4194-8947-45fba563118f","maker_order_uuid":"5f6516ea-ccaa-453a-9e37-e1c2c0d527e3","method":"connect","sender_pubkey":"031d4256c4bc9f99ac88bf3dba21773132281f65f9bf23a59928bce08961e2f3","dest_pub_key":"c6a78589e18b482aea046975e6d0acbdea7bf7dbf04d9d5bd67fda917815e3ed"}"#).unwrap(); - block_on(process_taker_connect(ctx, connect.sender_pubkey, connect)); + let mut prefixed_pub = connect.sender_pubkey.0.to_vec(); + prefixed_pub.insert(0, 2); + block_on(process_taker_connect( + ctx, + PublicKey::from_slice(&prefixed_pub).unwrap().into(), + connect, + )); assert!(unsafe { !CONNECT_START_CALLED }); } diff --git a/mm2src/mm2_p2p/src/lib.rs b/mm2src/mm2_p2p/src/lib.rs index fd13446f6e..9216214d66 100644 --- a/mm2src/mm2_p2p/src/lib.rs +++ b/mm2src/mm2_p2p/src/lib.rs @@ -115,6 +115,10 @@ fn sha256(input: impl AsRef<[u8]>) -> [u8; 32] { Sha256::new().chain(input).fina #[derive(Debug, Eq, PartialEq)] pub struct Secp256k1PubkeySerialize(Secp256k1Pubkey); +impl Secp256k1PubkeySerialize { + pub fn into_inner(self) -> Secp256k1Pubkey { self.0 } +} + impl Serialize for Secp256k1PubkeySerialize { fn serialize(&self, serializer: S) -> Result { serializer.serialize_bytes(&self.0.serialize())