diff --git a/mm2src/lp_ordermatch.rs b/mm2src/lp_ordermatch.rs index 0617d22ff5..c71469dc48 100644 --- a/mm2src/lp_ordermatch.rs +++ b/mm2src/lp_ordermatch.rs @@ -64,11 +64,13 @@ use crate::mm2::{database::my_swaps::insert_new_swap, run_taker_swap, AtomicLocktimeVersion, CheckBalanceError, MakerSwap, RunMakerSwapInput, RunTakerSwapInput, SwapConfirmationsSettings, TakerSwap}}; pub use best_orders::best_orders_rpc; +pub use orderbook_depth::orderbook_depth_rpc; #[path = "lp_ordermatch/best_orders.rs"] mod best_orders; #[path = "lp_ordermatch/new_protocol.rs"] mod new_protocol; #[path = "lp_ordermatch/order_requests_tracker.rs"] mod order_requests_tracker; +#[path = "lp_ordermatch/orderbook_depth.rs"] mod orderbook_depth; #[cfg(test)] #[cfg(feature = "native")] #[path = "ordermatch_tests.rs"] @@ -383,7 +385,10 @@ pub async fn process_msg(ctx: MmArc, _topics: Vec, from_peer: String, ms #[derive(Debug, Deserialize, Eq, PartialEq, Serialize)] pub enum OrdermatchRequest { /// Get an orderbook for the given pair. - GetOrderbook { base: String, rel: String }, + GetOrderbook { + base: String, + rel: String, + }, /// Sync specific pubkey orderbook state if our known Patricia trie state doesn't match the latest keep alive message SyncPubkeyOrderbookState { pubkey: String, @@ -395,6 +400,9 @@ pub enum OrdermatchRequest { action: BestOrdersAction, volume: BigRational, }, + OrderbookDepth { + pairs: Vec<(String, String)>, + }, } #[derive(Debug)] @@ -445,6 +453,9 @@ pub async fn process_peer_request(ctx: MmArc, request: OrdermatchRequest) -> Res OrdermatchRequest::BestOrders { coin, action, volume } => { best_orders::process_best_orders_p2p_request(ctx, coin, action, volume).await }, + OrdermatchRequest::OrderbookDepth { pairs } => { + orderbook_depth::process_orderbook_depth_p2p_request(ctx, pairs).await + }, } } diff --git a/mm2src/lp_ordermatch/orderbook_depth.rs b/mm2src/lp_ordermatch/orderbook_depth.rs new file mode 100644 index 0000000000..f85b548d03 --- /dev/null +++ b/mm2src/lp_ordermatch/orderbook_depth.rs @@ -0,0 +1,101 @@ +use super::{orderbook_topic_from_base_rel, OrdermatchContext, OrdermatchRequest}; +use crate::mm2::lp_network::{request_any_relay, P2PRequest}; +use common::{log, mm_ctx::MmArc}; +use http::Response; +use serde_json::{self as json, Value as Json}; +use std::collections::HashMap; + +#[derive(Debug, Deserialize)] +struct OrderbookDepthReq { + pairs: Vec<(String, String)>, +} + +#[derive(Debug, Deserialize, Serialize)] +struct PairDepth { + asks: usize, + bids: usize, +} + +#[derive(Debug, Deserialize, Serialize)] +struct OrderbookDepthP2PResponse { + depth: HashMap<(String, String), PairDepth>, +} + +#[derive(Debug, Deserialize, Serialize)] +struct PairWithDepth { + pair: (String, String), + depth: PairDepth, +} + +pub async fn orderbook_depth_rpc(ctx: MmArc, req: Json) -> Result>, String> { + let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap(); + let req: OrderbookDepthReq = try_s!(json::from_value(req)); + let mut result = Vec::with_capacity(req.pairs.len()); + + let orderbook = ordermatch_ctx.orderbook.lock().await; + + // the Iter::filter uses &Self::Item, which is undesirable, we need owned pair + #[allow(clippy::unnecessary_filter_map)] + let to_request_from_relay: Vec<_> = req + .pairs + .into_iter() + .filter_map(|pair| { + let topic = orderbook_topic_from_base_rel(&pair.0, &pair.1); + if orderbook.is_subscribed_to(&topic) { + let asks = orderbook.unordered.get(&pair).map_or(0, |orders| orders.len()); + let reversed = (pair.1.clone(), pair.0.clone()); + let bids = orderbook.unordered.get(&reversed).map_or(0, |orders| orders.len()); + result.push(PairWithDepth { + pair, + depth: PairDepth { asks, bids }, + }); + None + } else { + Some(pair) + } + }) + .collect(); + + // avoid locking orderbook for long time during P2P request + drop(orderbook); + if !to_request_from_relay.is_empty() { + let p2p_request = OrdermatchRequest::OrderbookDepth { + pairs: to_request_from_relay, + }; + log::debug!("Sending request_any_relay({:?})", p2p_request); + let p2p_response = try_s!( + request_any_relay::(ctx.clone(), P2PRequest::Ordermatch(p2p_request)).await + ); + log::debug!("Received response {:?}", p2p_response); + if let Some((response, _)) = p2p_response { + for (pair, depth) in response.depth { + result.push(PairWithDepth { pair, depth }); + } + } + } + + let res = json!({ "result": result }); + Response::builder() + .body(json::to_vec(&res).expect("Serialization failed")) + .map_err(|e| ERRL!("{}", e)) +} + +pub async fn process_orderbook_depth_p2p_request( + ctx: MmArc, + pairs: Vec<(String, String)>, +) -> Result>, String> { + let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).expect("ordermatch_ctx must exist at this point"); + let orderbook = ordermatch_ctx.orderbook.lock().await; + let depth = pairs + .into_iter() + .map(|pair| { + let asks = orderbook.unordered.get(&pair).map_or(0, |orders| orders.len()); + let reversed = (pair.1.clone(), pair.0.clone()); + let bids = orderbook.unordered.get(&reversed).map_or(0, |orders| orders.len()); + (pair, PairDepth { asks, bids }) + }) + .collect(); + let response = OrderbookDepthP2PResponse { depth }; + let encoded = rmp_serde::to_vec(&response).expect("rmp_serde::to_vec should not fail here"); + Ok(Some(encoded)) +} diff --git a/mm2src/mm2_tests.rs b/mm2src/mm2_tests.rs index 7284d13565..48e365f504 100644 --- a/mm2src/mm2_tests.rs +++ b/mm2src/mm2_tests.rs @@ -5408,6 +5408,145 @@ fn test_best_orders() { block_on(mm_alice.stop()).unwrap(); } +fn request_and_check_orderbook_depth(mm_alice: &MarketMakerIt) { + let rc = block_on(mm_alice.rpc(json! ({ + "userpass": mm_alice.userpass, + "method": "orderbook_depth", + "pairs": [("RICK", "MORTY"), ("RICK", "ETH"), ("MORTY", "ETH")], + }))) + .unwrap(); + assert!(rc.0.is_success(), "!orderbook_depth: {}", rc.1); + let response: OrderbookDepthResponse = json::from_str(&rc.1).unwrap(); + let rick_morty = response + .result + .iter() + .find(|pair_depth| pair_depth.pair.0 == "RICK" && pair_depth.pair.1 == "MORTY") + .unwrap(); + assert_eq!(3, rick_morty.depth.asks); + assert_eq!(2, rick_morty.depth.bids); + + let rick_eth = response + .result + .iter() + .find(|pair_depth| pair_depth.pair.0 == "RICK" && pair_depth.pair.1 == "ETH") + .unwrap(); + assert_eq!(1, rick_eth.depth.asks); + assert_eq!(1, rick_eth.depth.bids); + + let morty_eth = response + .result + .iter() + .find(|pair_depth| pair_depth.pair.0 == "MORTY" && pair_depth.pair.1 == "ETH") + .unwrap(); + assert_eq!(0, morty_eth.depth.asks); + assert_eq!(0, morty_eth.depth.bids); +} + +#[test] +fn test_orderbook_depth() { + let bob_passphrase = get_passphrase(&".env.seed", "BOB_PASSPHRASE").unwrap(); + + let coins = json!([ + {"coin":"RICK","asset":"RICK","rpcport":8923,"txversion":4,"overwintered":1,"protocol":{"type":"UTXO"}}, + {"coin":"MORTY","asset":"MORTY","rpcport":11608,"txversion":4,"overwintered":1,"protocol":{"type":"UTXO"}}, + {"coin":"ETH","name":"ethereum","protocol":{"type":"ETH"},"rpcport":80}, + {"coin":"JST","name":"jst","protocol":{"type":"ERC20", "protocol_data":{"platform":"ETH","contract_address":"0x2b294F029Fde858b2c62184e8390591755521d8E"}}} + ]); + + // start bob and immediately place the orders + let mut mm_bob = MarketMakerIt::start( + json! ({ + "gui": "nogui", + "netid": 9998, + "myipaddr": env::var ("BOB_TRADE_IP") .ok(), + "rpcip": env::var ("BOB_TRADE_IP") .ok(), + "canbind": env::var ("BOB_TRADE_PORT") .ok().map (|s| s.parse::().unwrap()), + "passphrase": bob_passphrase, + "coins": coins, + "rpc_password": "pass", + "i_am_seed": true, + }), + "pass".into(), + local_start!("bob"), + ) + .unwrap(); + let (_bob_dump_log, _bob_dump_dashboard) = mm_dump(&mm_bob.log_path); + log!({"Bob log path: {}", mm_bob.log_path.display()}); + block_on(mm_bob.wait_for_log(22., |log| log.contains("INFO Listening on"))).unwrap(); + block_on(mm_bob.wait_for_log(22., |log| log.contains(">>>>>>>>> DEX stats "))).unwrap(); + // Enable coins on Bob side. Print the replies in case we need the "address". + let bob_coins = block_on(enable_coins_eth_electrum(&mm_bob, &["http://195.201.0.6:8565"])); + log!({ "enable_coins (bob): {:?}", bob_coins }); + // issue sell request on Bob side by setting base/rel price + log!("Issue bob sell requests"); + + let bob_orders = [ + // (base, rel, price, volume, min_volume) + ("RICK", "MORTY", "0.9", "0.9", None), + ("RICK", "MORTY", "0.8", "0.9", None), + ("RICK", "MORTY", "0.7", "0.9", Some("0.9")), + ("RICK", "ETH", "0.8", "0.9", None), + ("MORTY", "RICK", "0.8", "0.9", None), + ("MORTY", "RICK", "0.9", "0.9", None), + ("ETH", "RICK", "0.8", "0.9", None), + ]; + for (base, rel, price, volume, min_volume) in bob_orders.iter() { + let rc = block_on(mm_bob.rpc(json! ({ + "userpass": mm_bob.userpass, + "method": "setprice", + "base": base, + "rel": rel, + "price": price, + "volume": volume, + "min_volume": min_volume.unwrap_or("0.00777"), + "cancel_previous": false, + }))) + .unwrap(); + assert!(rc.0.is_success(), "!setprice: {}", rc.1); + } + + let mut mm_alice = MarketMakerIt::start( + json! ({ + "gui": "nogui", + "netid": 9998, + "myipaddr": env::var ("ALICE_TRADE_IP") .ok(), + "rpcip": env::var ("ALICE_TRADE_IP") .ok(), + "passphrase": "alice passphrase", + "coins": coins, + "seednodes": [fomat!((mm_bob.ip))], + "rpc_password": "pass", + }), + "pass".into(), + local_start!("alice"), + ) + .unwrap(); + + let (_alice_dump_log, _alice_dump_dashboard) = mm_dump(&mm_alice.log_path); + log!({ "Alice log path: {}", mm_alice.log_path.display() }); + + block_on(mm_bob.wait_for_log(22., |log| { + log.contains("DEBUG Handling IncludedTorelaysMesh message for peer") + })) + .unwrap(); + block_on(mm_alice.wait_for_log(22., |log| log.contains(">>>>>>>>> DEX stats "))).unwrap(); + + request_and_check_orderbook_depth(&mm_alice); + // request RICK/MORTY orderbook to subscribe Alice + let rc = block_on(mm_alice.rpc(json! ({ + "userpass": mm_alice.userpass, + "method": "orderbook", + "base": "RICK", + "rel": "MORTY", + }))) + .unwrap(); + assert!(rc.0.is_success(), "!orderbook: {}", rc.1); + + request_and_check_orderbook_depth(&mm_alice); + + block_on(mm_bob.stop()).unwrap(); + block_on(mm_alice.stop()).unwrap(); +} + // HOWTO // 1. Install Firefox. // 2. Install forked version of wasm-bindgen-cli: cargo install wasm-bindgen-cli --git https://github.com/artemii235/wasm-bindgen.git diff --git a/mm2src/mm2_tests/structs.rs b/mm2src/mm2_tests/structs.rs index e5b2f75475..57d044255a 100644 --- a/mm2src/mm2_tests/structs.rs +++ b/mm2src/mm2_tests/structs.rs @@ -186,6 +186,23 @@ pub struct BestOrdersResponse { pub result: HashMap>, } +#[derive(Deserialize)] +pub struct PairDepth { + pub asks: usize, + pub bids: usize, +} + +#[derive(Deserialize)] +pub struct PairWithDepth { + pub pair: (String, String), + pub depth: PairDepth, +} + +#[derive(Deserialize)] +pub struct OrderbookDepthResponse { + pub result: Vec, +} + #[derive(Debug, Deserialize)] pub struct EnableElectrumResponse { pub coin: String, diff --git a/mm2src/rpc.rs b/mm2src/rpc.rs index dd270b128f..c73c1f4b69 100644 --- a/mm2src/rpc.rs +++ b/mm2src/rpc.rs @@ -37,7 +37,7 @@ use std::future::Future as Future03; use std::net::SocketAddr; use crate::mm2::lp_ordermatch::{best_orders_rpc, buy, cancel_all_orders, cancel_order, my_orders, order_status, - orderbook, sell, set_price}; + orderbook, orderbook_depth_rpc, sell, set_price}; use crate::mm2::lp_swap::{active_swaps_rpc, all_swaps_uuids_by_filter, coins_needed_for_kick_start, import_swaps, list_banned_pubkeys, max_taker_vol, my_recent_swaps, my_swap_status, recover_funds_of_swap, stats_swap_status, trade_preimage, unban_pubkeys}; @@ -111,7 +111,6 @@ fn hyres(handler: impl Future03>, String>> + Se /// /// Returns `None` if the requested "method" wasn't found among the ported RPC methods and has to be handled elsewhere. pub fn dispatcher(req: Json, ctx: MmArc) -> DispatcherRes { - //log! ("dispatcher] " (json::to_string (&req) .unwrap())); let method = match req["method"].clone() { Json::String(method) => method, _ => return DispatcherRes::NoMatch(req), @@ -163,6 +162,7 @@ pub fn dispatcher(req: Json, ctx: MmArc) -> DispatcherRes { "my_tx_history" => my_tx_history(ctx, req), "order_status" => hyres(order_status(ctx, req)), "orderbook" => hyres(orderbook(ctx, req)), + "orderbook_depth" => hyres(orderbook_depth_rpc(ctx, req)), "sim_panic" => hyres(sim_panic(req)), "recover_funds_of_swap" => { #[cfg(feature = "native")] @@ -174,7 +174,6 @@ pub fn dispatcher(req: Json, ctx: MmArc) -> DispatcherRes { return DispatcherRes::NoMatch(req); } }, - // "passphrase" => passphrase (ctx, req), "sell" => hyres(sell(ctx, req)), "show_priv_key" => hyres(show_priv_key(ctx, req)), "send_raw_transaction" => hyres(send_raw_transaction(ctx, req)),