Skip to content

Commit

Permalink
Handling accept_only_from in progress.
Browse files Browse the repository at this point in the history
  • Loading branch information
Artem Vitae committed Dec 13, 2023
1 parent 56a45b0 commit 01994cd
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 29 deletions.
39 changes: 26 additions & 13 deletions mm2src/mm2_main/src/lp_ordermatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use http::Response;
use keys::{AddressFormat, KeyPair};
use mm2_core::mm_ctx::{from_ctx, MmArc, MmWeak};
use mm2_err_handle::prelude::*;
use mm2_libp2p::{decode_signed, encode_and_sign, encode_message, pub_sub_topic, TopicHash, TopicPrefix,
use mm2_libp2p::{decode_signed, encode_and_sign, encode_message, pub_sub_topic, PublicKey, TopicHash, TopicPrefix,
TOPIC_SEPARATOR};
use mm2_metrics::mm_gauge;
use mm2_number::{BigDecimal, BigRational, MmNumber, MmNumberMultiRepr};
Expand Down Expand Up @@ -572,11 +572,11 @@ pub async fn process_msg(ctx: MmArc, from_peer: String, msg: &[u8], i_am_relay:
Ok(())
},
new_protocol::OrdermatchMessage::TakerConnect(taker_connect) => {
process_taker_connect(ctx, pubkey.unprefixed().into(), taker_connect.into()).await;
process_taker_connect(ctx, pubkey, taker_connect.into()).await;
Ok(())
},
new_protocol::OrdermatchMessage::MakerConnected(maker_connected) => {
process_maker_connected(ctx, pubkey.unprefixed().into(), maker_connected.into()).await;
process_maker_connected(ctx, pubkey, maker_connected.into()).await;
Ok(())
},
new_protocol::OrdermatchMessage::MakerOrderCancelled(cancelled_msg) => {
Expand Down Expand Up @@ -2872,7 +2872,7 @@ impl MakerOrdersContext {
}

#[cfg_attr(test, mockable)]
fn lp_connect_start_bob(ctx: MmArc, maker_match: MakerMatch, maker_order: MakerOrder) {
fn lp_connect_start_bob(ctx: MmArc, maker_match: MakerMatch, maker_order: MakerOrder, taker_p2p_pubkey: PublicKey) {
let spawner = ctx.spawner();
let uuid = maker_match.request.uuid;

Expand Down Expand Up @@ -2982,6 +2982,9 @@ fn lp_connect_start_bob(ctx: MmArc, maker_match: MakerMatch, maker_order: MakerO
p2p_keypair: maker_order.p2p_privkey.map(SerializableSecp256k1Keypair::into_inner),
secret_hash_algo,
lock_duration: lock_time,
taker_p2p_pubkey: match taker_p2p_pubkey {
PublicKey::Secp256k1(pubkey) => pubkey.into_inner(),
},
};
#[allow(clippy::box_default)]
maker_swap_state_machine
Expand Down Expand Up @@ -3027,7 +3030,7 @@ fn lp_connect_start_bob(ctx: MmArc, maker_match: MakerMatch, maker_order: MakerO
spawner.spawn_with_settings(fut, settings);
}

fn lp_connected_alice(ctx: MmArc, taker_order: TakerOrder, taker_match: TakerMatch) {
fn lp_connected_alice(ctx: MmArc, taker_order: TakerOrder, taker_match: TakerMatch, maker_p2p_pubkey: PublicKey) {
let spawner = ctx.spawner();
let uuid = taker_match.reserved.taker_order_uuid;

Expand Down Expand Up @@ -3138,6 +3141,9 @@ fn lp_connected_alice(ctx: MmArc, taker_order: TakerOrder, taker_match: TakerMat
uuid,
p2p_keypair: taker_order.p2p_privkey.map(SerializableSecp256k1Keypair::into_inner),
taker_secret,
maker_p2p_pubkey: match maker_p2p_pubkey {
PublicKey::Secp256k1(pubkey) => pubkey.into_inner(),
},
};
#[allow(clippy::box_default)]
taker_swap_state_machine
Expand Down Expand Up @@ -3555,7 +3561,7 @@ async fn process_maker_reserved(ctx: MmArc, from_pubkey: H256Json, reserved_msg:
}
}

async fn process_maker_connected(ctx: MmArc, from_pubkey: H256Json, connected: MakerConnected) {
async fn process_maker_connected(ctx: MmArc, from_pubkey: PublicKey, connected: MakerConnected) {
log::debug!("Processing MakerConnected {:?}", connected);
let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap();

Expand All @@ -3564,7 +3570,8 @@ async fn process_maker_connected(ctx: MmArc, from_pubkey: H256Json, connected: M
Err(_) => return,
};

if our_public_id.bytes == from_pubkey.0 {
let unprefixed_from = from_pubkey.unprefixed();
if our_public_id.bytes == unprefixed_from {
log::warn!("Skip maker connected from our pubkey");
return;
}
Expand All @@ -3585,12 +3592,17 @@ async fn process_maker_connected(ctx: MmArc, from_pubkey: H256Json, connected: M
},
};

if order_match.reserved.sender_pubkey != from_pubkey {
if order_match.reserved.sender_pubkey != unprefixed_from.into() {
error!("Connected message sender pubkey != reserved message sender pubkey");
return;
}
// alice
lp_connected_alice(ctx.clone(), my_order_entry.get().clone(), order_match.clone());
lp_connected_alice(
ctx.clone(),
my_order_entry.get().clone(),
order_match.clone(),
from_pubkey,
);
// remove the matched order immediately
let order = my_order_entry.remove();
delete_my_taker_order(ctx, order, TakerOrderCancellationReason::Fulfilled)
Expand Down Expand Up @@ -3701,7 +3713,7 @@ async fn process_taker_request(ctx: MmArc, from_pubkey: H256Json, taker_request:
}
}

async fn process_taker_connect(ctx: MmArc, sender_pubkey: H256Json, connect_msg: TakerConnect) {
async fn process_taker_connect(ctx: MmArc, sender_pubkey: PublicKey, connect_msg: TakerConnect) {
log::debug!("Processing TakerConnect {:?}", connect_msg);
let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap();

Expand All @@ -3710,7 +3722,8 @@ async fn process_taker_connect(ctx: MmArc, sender_pubkey: H256Json, connect_msg:
Err(_) => return,
};

if our_public_id.bytes == sender_pubkey.0 {
let sender_unprefixed = sender_pubkey.unprefixed();
if our_public_id.bytes == sender_unprefixed {
log::warn!("Skip taker connect from our pubkey");
return;
}
Expand All @@ -3737,7 +3750,7 @@ async fn process_taker_connect(ctx: MmArc, sender_pubkey: H256Json, connect_msg:
return;
},
};
if order_match.request.sender_pubkey != sender_pubkey {
if order_match.request.sender_pubkey != sender_unprefixed.into() {
log::warn!("Connect message sender pubkey != request message sender pubkey");
return;
}
Expand All @@ -3754,7 +3767,7 @@ async fn process_taker_connect(ctx: MmArc, sender_pubkey: H256Json, connect_msg:
order_match.connected = Some(connected.clone());
let order_match = order_match.clone();
my_order.started_swaps.push(order_match.request.uuid);
lp_connect_start_bob(ctx.clone(), order_match, my_order.clone());
lp_connect_start_bob(ctx.clone(), order_match, my_order.clone(), sender_pubkey);
let topic = my_order.orderbook_topic();
broadcast_ordermatch_message(&ctx, topic.clone(), connected.into(), my_order.p2p_keypair());

Expand Down
13 changes: 11 additions & 2 deletions mm2src/mm2_main/src/lp_swap/maker_swap_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use mm2_state_machine::prelude::*;
use mm2_state_machine::storable_state_machine::*;
use primitives::hash::H256;
use rpc::v1::types::{Bytes as BytesJson, H256 as H256Json};
use secp256k1::PublicKey;
use std::convert::TryInto;
use std::marker::PhantomData;
use uuid::Uuid;
Expand Down Expand Up @@ -365,6 +366,8 @@ pub struct MakerSwapStateMachine<MakerCoin: MmCoin + CoinAssocTypes, TakerCoin:
pub p2p_keypair: Option<KeyPair>,
/// Abortable queue used to spawn related activities
pub abortable_system: AbortableQueue,
/// Taker's P2P pubkey
pub taker_p2p_pubkey: PublicKey,
}

impl<MakerCoin: MmCoin + CoinAssocTypes, TakerCoin: MmCoin + SwapOpsV2> MakerSwapStateMachine<MakerCoin, TakerCoin> {
Expand Down Expand Up @@ -593,6 +596,11 @@ impl<MakerCoin: MmCoin + CoinAssocTypes, TakerCoin: MmCoin + SwapOpsV2> Storable
p2p_topic: swap_v2_topic(&uuid),
uuid,
p2p_keypair: repr.p2p_keypair.map(|k| k.into_inner()),
taker_p2p_pubkey: PublicKey::from_slice(&[
3, 23, 183, 225, 206, 31, 159, 148, 195, 42, 67, 115, 146, 41, 248, 140, 11, 3, 51, 41, 111, 180, 110,
143, 114, 134, 88, 73, 198, 174, 52, 184, 78,
])
.unwrap(),
};

Ok((RestoredMachine::new(machine), current_state))
Expand All @@ -607,12 +615,13 @@ impl<MakerCoin: MmCoin + CoinAssocTypes, TakerCoin: MmCoin + SwapOpsV2> Storable
}

fn init_additional_context(&mut self) {
init_additional_context_impl(&self.ctx, ActiveSwapV2Info {
let swap_info = ActiveSwapV2Info {
uuid: self.uuid,
maker_coin: self.maker_coin.ticker().into(),
taker_coin: self.taker_coin.ticker().into(),
swap_type: MAKER_SWAP_V2_TYPE,
})
};
init_additional_context_impl(&self.ctx, swap_info, self.taker_p2p_pubkey);
}

fn clean_up_context(&mut self) {
Expand Down
12 changes: 2 additions & 10 deletions mm2src/mm2_main/src/lp_swap/swap_v2_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,18 +228,10 @@ pub(super) async fn mark_swap_as_finished(ctx: MmArc, id: Uuid) -> MmResult<(),
Ok(())
}

pub(super) fn init_additional_context_impl(ctx: &MmArc, swap_info: ActiveSwapV2Info) {
pub(super) fn init_additional_context_impl(ctx: &MmArc, swap_info: ActiveSwapV2Info, other_p2p_pubkey: PublicKey) {
subscribe_to_topic(ctx, swap_v2_topic(&swap_info.uuid));
let swap_ctx = SwapsContext::from_ctx(ctx).expect("SwapsContext::from_ctx should not fail");
swap_ctx.init_msg_v2_store(
swap_info.uuid,
// just a "random" pubkey for now
PublicKey::from_slice(&[
3, 23, 183, 225, 206, 31, 159, 148, 195, 42, 67, 115, 146, 41, 248, 140, 11, 3, 51, 41, 111, 180, 110, 143,
114, 134, 88, 73, 198, 174, 52, 184, 78,
])
.unwrap(),
);
swap_ctx.init_msg_v2_store(swap_info.uuid, other_p2p_pubkey);
swap_ctx
.active_swaps_v2_infos
.lock()
Expand Down
13 changes: 11 additions & 2 deletions mm2src/mm2_main/src/lp_swap/taker_swap_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use mm2_state_machine::prelude::*;
use mm2_state_machine::storable_state_machine::*;
use primitives::hash::H256;
use rpc::v1::types::{Bytes as BytesJson, H256 as H256Json};
use secp256k1::PublicKey;
use std::convert::TryInto;
use std::marker::PhantomData;
use uuid::Uuid;
Expand Down Expand Up @@ -396,6 +397,8 @@ pub struct TakerSwapStateMachine<MakerCoin: MmCoin + CoinAssocTypes, TakerCoin:
pub taker_secret: H256,
/// Abortable queue used to spawn related activities
pub abortable_system: AbortableQueue,
/// Maker's P2P pubkey
pub maker_p2p_pubkey: PublicKey,
}

impl<MakerCoin: MmCoin + CoinAssocTypes, TakerCoin: MmCoin + SwapOpsV2> TakerSwapStateMachine<MakerCoin, TakerCoin> {
Expand Down Expand Up @@ -691,6 +694,11 @@ impl<MakerCoin: MmCoin + CoinAssocTypes, TakerCoin: MmCoin + SwapOpsV2> Storable
uuid,
p2p_keypair: repr.p2p_keypair.map(|k| k.into_inner()),
taker_secret: repr.taker_secret.into(),
maker_p2p_pubkey: PublicKey::from_slice(&[
3, 23, 183, 225, 206, 31, 159, 148, 195, 42, 67, 115, 146, 41, 248, 140, 11, 3, 51, 41, 111, 180, 110,
143, 114, 134, 88, 73, 198, 174, 52, 184, 78,
])
.unwrap(),
};
Ok((RestoredMachine::new(machine), current_state))
}
Expand All @@ -704,12 +712,13 @@ impl<MakerCoin: MmCoin + CoinAssocTypes, TakerCoin: MmCoin + SwapOpsV2> Storable
}

fn init_additional_context(&mut self) {
init_additional_context_impl(&self.ctx, ActiveSwapV2Info {
let swap_info = ActiveSwapV2Info {
uuid: self.uuid,
maker_coin: self.maker_coin.ticker().into(),
taker_coin: self.taker_coin.ticker().into(),
swap_type: TAKER_SWAP_V2_TYPE,
})
};
init_additional_context_impl(&self.ctx, swap_info, self.maker_p2p_pubkey);
}

fn clean_up_context(&mut self) {
Expand Down
11 changes: 9 additions & 2 deletions mm2src/mm2_main/src/ordermatch_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use mm2_net::p2p::P2PContext;
use mm2_test_helpers::for_tests::mm_ctx_with_iguana;
use mocktopus::mocking::*;
use rand::{seq::SliceRandom, thread_rng, Rng};
use secp256k1::PublicKey;
use std::collections::HashSet;
use std::iter::{self, FromIterator};
use std::sync::Mutex;
Expand Down Expand Up @@ -1212,7 +1213,7 @@ fn lp_connect_start_bob_should_not_be_invoked_if_order_match_already_connected()
.add_order(ctx.weak(), maker_order, None);

static mut CONNECT_START_CALLED: bool = false;
lp_connect_start_bob.mock_safe(|_, _, _| {
lp_connect_start_bob.mock_safe(|_, _, _, _| {
unsafe {
CONNECT_START_CALLED = true;
}
Expand All @@ -1221,7 +1222,13 @@ fn lp_connect_start_bob_should_not_be_invoked_if_order_match_already_connected()
});

let connect: TakerConnect = json::from_str(r#"{"taker_order_uuid":"2f9afe84-7a89-4194-8947-45fba563118f","maker_order_uuid":"5f6516ea-ccaa-453a-9e37-e1c2c0d527e3","method":"connect","sender_pubkey":"031d4256c4bc9f99ac88bf3dba21773132281f65f9bf23a59928bce08961e2f3","dest_pub_key":"c6a78589e18b482aea046975e6d0acbdea7bf7dbf04d9d5bd67fda917815e3ed"}"#).unwrap();
block_on(process_taker_connect(ctx, connect.sender_pubkey, connect));
let mut prefixed_pub = connect.sender_pubkey.0.to_vec();
prefixed_pub.insert(0, 2);
block_on(process_taker_connect(
ctx,
PublicKey::from_slice(&prefixed_pub).unwrap().into(),
connect,
));
assert!(unsafe { !CONNECT_START_CALLED });
}

Expand Down
4 changes: 4 additions & 0 deletions mm2src/mm2_p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ fn sha256(input: impl AsRef<[u8]>) -> [u8; 32] { Sha256::new().chain(input).fina
#[derive(Debug, Eq, PartialEq)]
pub struct Secp256k1PubkeySerialize(Secp256k1Pubkey);

impl Secp256k1PubkeySerialize {
pub fn into_inner(self) -> Secp256k1Pubkey { self.0 }
}

impl Serialize for Secp256k1PubkeySerialize {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_bytes(&self.0.serialize())
Expand Down

0 comments on commit 01994cd

Please sign in to comment.