Skip to content

Commit

Permalink
Merge pull request jl777#844 from KomodoPlatform/mm2.1-orderbook-depth
Browse files Browse the repository at this point in the history
Add orderbook_depth RPC jl777#700.
  • Loading branch information
artemii235 authored Mar 4, 2021
2 parents 7079405 + 63f8379 commit 9f6398b
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 4 deletions.
13 changes: 12 additions & 1 deletion mm2src/lp_ordermatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ use crate::mm2::{database::my_swaps::insert_new_swap,
run_taker_swap, AtomicLocktimeVersion, CheckBalanceError, MakerSwap, RunMakerSwapInput,
RunTakerSwapInput, SwapConfirmationsSettings, TakerSwap}};
pub use best_orders::best_orders_rpc;
pub use orderbook_depth::orderbook_depth_rpc;

#[path = "lp_ordermatch/best_orders.rs"] mod best_orders;
#[path = "lp_ordermatch/new_protocol.rs"] mod new_protocol;
#[path = "lp_ordermatch/order_requests_tracker.rs"]
mod order_requests_tracker;
#[path = "lp_ordermatch/orderbook_depth.rs"] mod orderbook_depth;
#[cfg(test)]
#[cfg(feature = "native")]
#[path = "ordermatch_tests.rs"]
Expand Down Expand Up @@ -383,7 +385,10 @@ pub async fn process_msg(ctx: MmArc, _topics: Vec<String>, from_peer: String, ms
#[derive(Debug, Deserialize, Eq, PartialEq, Serialize)]
pub enum OrdermatchRequest {
/// Get an orderbook for the given pair.
GetOrderbook { base: String, rel: String },
GetOrderbook {
base: String,
rel: String,
},
/// Sync specific pubkey orderbook state if our known Patricia trie state doesn't match the latest keep alive message
SyncPubkeyOrderbookState {
pubkey: String,
Expand All @@ -395,6 +400,9 @@ pub enum OrdermatchRequest {
action: BestOrdersAction,
volume: BigRational,
},
OrderbookDepth {
pairs: Vec<(String, String)>,
},
}

#[derive(Debug)]
Expand Down Expand Up @@ -445,6 +453,9 @@ pub async fn process_peer_request(ctx: MmArc, request: OrdermatchRequest) -> Res
OrdermatchRequest::BestOrders { coin, action, volume } => {
best_orders::process_best_orders_p2p_request(ctx, coin, action, volume).await
},
OrdermatchRequest::OrderbookDepth { pairs } => {
orderbook_depth::process_orderbook_depth_p2p_request(ctx, pairs).await
},
}
}

Expand Down
101 changes: 101 additions & 0 deletions mm2src/lp_ordermatch/orderbook_depth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use super::{orderbook_topic_from_base_rel, OrdermatchContext, OrdermatchRequest};
use crate::mm2::lp_network::{request_any_relay, P2PRequest};
use common::{log, mm_ctx::MmArc};
use http::Response;
use serde_json::{self as json, Value as Json};
use std::collections::HashMap;

#[derive(Debug, Deserialize)]
struct OrderbookDepthReq {
pairs: Vec<(String, String)>,
}

#[derive(Debug, Deserialize, Serialize)]
struct PairDepth {
asks: usize,
bids: usize,
}

#[derive(Debug, Deserialize, Serialize)]
struct OrderbookDepthP2PResponse {
depth: HashMap<(String, String), PairDepth>,
}

#[derive(Debug, Deserialize, Serialize)]
struct PairWithDepth {
pair: (String, String),
depth: PairDepth,
}

pub async fn orderbook_depth_rpc(ctx: MmArc, req: Json) -> Result<Response<Vec<u8>>, String> {
let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).unwrap();
let req: OrderbookDepthReq = try_s!(json::from_value(req));
let mut result = Vec::with_capacity(req.pairs.len());

let orderbook = ordermatch_ctx.orderbook.lock().await;

// the Iter::filter uses &Self::Item, which is undesirable, we need owned pair
#[allow(clippy::unnecessary_filter_map)]
let to_request_from_relay: Vec<_> = req
.pairs
.into_iter()
.filter_map(|pair| {
let topic = orderbook_topic_from_base_rel(&pair.0, &pair.1);
if orderbook.is_subscribed_to(&topic) {
let asks = orderbook.unordered.get(&pair).map_or(0, |orders| orders.len());
let reversed = (pair.1.clone(), pair.0.clone());
let bids = orderbook.unordered.get(&reversed).map_or(0, |orders| orders.len());
result.push(PairWithDepth {
pair,
depth: PairDepth { asks, bids },
});
None
} else {
Some(pair)
}
})
.collect();

// avoid locking orderbook for long time during P2P request
drop(orderbook);
if !to_request_from_relay.is_empty() {
let p2p_request = OrdermatchRequest::OrderbookDepth {
pairs: to_request_from_relay,
};
log::debug!("Sending request_any_relay({:?})", p2p_request);
let p2p_response = try_s!(
request_any_relay::<OrderbookDepthP2PResponse>(ctx.clone(), P2PRequest::Ordermatch(p2p_request)).await
);
log::debug!("Received response {:?}", p2p_response);
if let Some((response, _)) = p2p_response {
for (pair, depth) in response.depth {
result.push(PairWithDepth { pair, depth });
}
}
}

let res = json!({ "result": result });
Response::builder()
.body(json::to_vec(&res).expect("Serialization failed"))
.map_err(|e| ERRL!("{}", e))
}

pub async fn process_orderbook_depth_p2p_request(
ctx: MmArc,
pairs: Vec<(String, String)>,
) -> Result<Option<Vec<u8>>, String> {
let ordermatch_ctx = OrdermatchContext::from_ctx(&ctx).expect("ordermatch_ctx must exist at this point");
let orderbook = ordermatch_ctx.orderbook.lock().await;
let depth = pairs
.into_iter()
.map(|pair| {
let asks = orderbook.unordered.get(&pair).map_or(0, |orders| orders.len());
let reversed = (pair.1.clone(), pair.0.clone());
let bids = orderbook.unordered.get(&reversed).map_or(0, |orders| orders.len());
(pair, PairDepth { asks, bids })
})
.collect();
let response = OrderbookDepthP2PResponse { depth };
let encoded = rmp_serde::to_vec(&response).expect("rmp_serde::to_vec should not fail here");
Ok(Some(encoded))
}
139 changes: 139 additions & 0 deletions mm2src/mm2_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5408,6 +5408,145 @@ fn test_best_orders() {
block_on(mm_alice.stop()).unwrap();
}

fn request_and_check_orderbook_depth(mm_alice: &MarketMakerIt) {
let rc = block_on(mm_alice.rpc(json! ({
"userpass": mm_alice.userpass,
"method": "orderbook_depth",
"pairs": [("RICK", "MORTY"), ("RICK", "ETH"), ("MORTY", "ETH")],
})))
.unwrap();
assert!(rc.0.is_success(), "!orderbook_depth: {}", rc.1);
let response: OrderbookDepthResponse = json::from_str(&rc.1).unwrap();
let rick_morty = response
.result
.iter()
.find(|pair_depth| pair_depth.pair.0 == "RICK" && pair_depth.pair.1 == "MORTY")
.unwrap();
assert_eq!(3, rick_morty.depth.asks);
assert_eq!(2, rick_morty.depth.bids);

let rick_eth = response
.result
.iter()
.find(|pair_depth| pair_depth.pair.0 == "RICK" && pair_depth.pair.1 == "ETH")
.unwrap();
assert_eq!(1, rick_eth.depth.asks);
assert_eq!(1, rick_eth.depth.bids);

let morty_eth = response
.result
.iter()
.find(|pair_depth| pair_depth.pair.0 == "MORTY" && pair_depth.pair.1 == "ETH")
.unwrap();
assert_eq!(0, morty_eth.depth.asks);
assert_eq!(0, morty_eth.depth.bids);
}

#[test]
fn test_orderbook_depth() {
let bob_passphrase = get_passphrase(&".env.seed", "BOB_PASSPHRASE").unwrap();

let coins = json!([
{"coin":"RICK","asset":"RICK","rpcport":8923,"txversion":4,"overwintered":1,"protocol":{"type":"UTXO"}},
{"coin":"MORTY","asset":"MORTY","rpcport":11608,"txversion":4,"overwintered":1,"protocol":{"type":"UTXO"}},
{"coin":"ETH","name":"ethereum","protocol":{"type":"ETH"},"rpcport":80},
{"coin":"JST","name":"jst","protocol":{"type":"ERC20", "protocol_data":{"platform":"ETH","contract_address":"0x2b294F029Fde858b2c62184e8390591755521d8E"}}}
]);

// start bob and immediately place the orders
let mut mm_bob = MarketMakerIt::start(
json! ({
"gui": "nogui",
"netid": 9998,
"myipaddr": env::var ("BOB_TRADE_IP") .ok(),
"rpcip": env::var ("BOB_TRADE_IP") .ok(),
"canbind": env::var ("BOB_TRADE_PORT") .ok().map (|s| s.parse::<i64>().unwrap()),
"passphrase": bob_passphrase,
"coins": coins,
"rpc_password": "pass",
"i_am_seed": true,
}),
"pass".into(),
local_start!("bob"),
)
.unwrap();
let (_bob_dump_log, _bob_dump_dashboard) = mm_dump(&mm_bob.log_path);
log!({"Bob log path: {}", mm_bob.log_path.display()});
block_on(mm_bob.wait_for_log(22., |log| log.contains("INFO Listening on"))).unwrap();
block_on(mm_bob.wait_for_log(22., |log| log.contains(">>>>>>>>> DEX stats "))).unwrap();
// Enable coins on Bob side. Print the replies in case we need the "address".
let bob_coins = block_on(enable_coins_eth_electrum(&mm_bob, &["http://195.201.0.6:8565"]));
log!({ "enable_coins (bob): {:?}", bob_coins });
// issue sell request on Bob side by setting base/rel price
log!("Issue bob sell requests");

let bob_orders = [
// (base, rel, price, volume, min_volume)
("RICK", "MORTY", "0.9", "0.9", None),
("RICK", "MORTY", "0.8", "0.9", None),
("RICK", "MORTY", "0.7", "0.9", Some("0.9")),
("RICK", "ETH", "0.8", "0.9", None),
("MORTY", "RICK", "0.8", "0.9", None),
("MORTY", "RICK", "0.9", "0.9", None),
("ETH", "RICK", "0.8", "0.9", None),
];
for (base, rel, price, volume, min_volume) in bob_orders.iter() {
let rc = block_on(mm_bob.rpc(json! ({
"userpass": mm_bob.userpass,
"method": "setprice",
"base": base,
"rel": rel,
"price": price,
"volume": volume,
"min_volume": min_volume.unwrap_or("0.00777"),
"cancel_previous": false,
})))
.unwrap();
assert!(rc.0.is_success(), "!setprice: {}", rc.1);
}

let mut mm_alice = MarketMakerIt::start(
json! ({
"gui": "nogui",
"netid": 9998,
"myipaddr": env::var ("ALICE_TRADE_IP") .ok(),
"rpcip": env::var ("ALICE_TRADE_IP") .ok(),
"passphrase": "alice passphrase",
"coins": coins,
"seednodes": [fomat!((mm_bob.ip))],
"rpc_password": "pass",
}),
"pass".into(),
local_start!("alice"),
)
.unwrap();

let (_alice_dump_log, _alice_dump_dashboard) = mm_dump(&mm_alice.log_path);
log!({ "Alice log path: {}", mm_alice.log_path.display() });

block_on(mm_bob.wait_for_log(22., |log| {
log.contains("DEBUG Handling IncludedTorelaysMesh message for peer")
}))
.unwrap();
block_on(mm_alice.wait_for_log(22., |log| log.contains(">>>>>>>>> DEX stats "))).unwrap();

request_and_check_orderbook_depth(&mm_alice);
// request RICK/MORTY orderbook to subscribe Alice
let rc = block_on(mm_alice.rpc(json! ({
"userpass": mm_alice.userpass,
"method": "orderbook",
"base": "RICK",
"rel": "MORTY",
})))
.unwrap();
assert!(rc.0.is_success(), "!orderbook: {}", rc.1);

request_and_check_orderbook_depth(&mm_alice);

block_on(mm_bob.stop()).unwrap();
block_on(mm_alice.stop()).unwrap();
}

// HOWTO
// 1. Install Firefox.
// 2. Install forked version of wasm-bindgen-cli: cargo install wasm-bindgen-cli --git https://github.com/artemii235/wasm-bindgen.git
Expand Down
17 changes: 17 additions & 0 deletions mm2src/mm2_tests/structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,23 @@ pub struct BestOrdersResponse {
pub result: HashMap<String, Vec<OrderbookEntry>>,
}

#[derive(Deserialize)]
pub struct PairDepth {
pub asks: usize,
pub bids: usize,
}

#[derive(Deserialize)]
pub struct PairWithDepth {
pub pair: (String, String),
pub depth: PairDepth,
}

#[derive(Deserialize)]
pub struct OrderbookDepthResponse {
pub result: Vec<PairWithDepth>,
}

#[derive(Debug, Deserialize)]
pub struct EnableElectrumResponse {
pub coin: String,
Expand Down
5 changes: 2 additions & 3 deletions mm2src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use std::future::Future as Future03;
use std::net::SocketAddr;

use crate::mm2::lp_ordermatch::{best_orders_rpc, buy, cancel_all_orders, cancel_order, my_orders, order_status,
orderbook, sell, set_price};
orderbook, orderbook_depth_rpc, sell, set_price};
use crate::mm2::lp_swap::{active_swaps_rpc, all_swaps_uuids_by_filter, coins_needed_for_kick_start, import_swaps,
list_banned_pubkeys, max_taker_vol, my_recent_swaps, my_swap_status, recover_funds_of_swap,
stats_swap_status, trade_preimage, unban_pubkeys};
Expand Down Expand Up @@ -111,7 +111,6 @@ fn hyres(handler: impl Future03<Output = Result<Response<Vec<u8>>, String>> + Se
///
/// Returns `None` if the requested "method" wasn't found among the ported RPC methods and has to be handled elsewhere.
pub fn dispatcher(req: Json, ctx: MmArc) -> DispatcherRes {
//log! ("dispatcher] " (json::to_string (&req) .unwrap()));
let method = match req["method"].clone() {
Json::String(method) => method,
_ => return DispatcherRes::NoMatch(req),
Expand Down Expand Up @@ -163,6 +162,7 @@ pub fn dispatcher(req: Json, ctx: MmArc) -> DispatcherRes {
"my_tx_history" => my_tx_history(ctx, req),
"order_status" => hyres(order_status(ctx, req)),
"orderbook" => hyres(orderbook(ctx, req)),
"orderbook_depth" => hyres(orderbook_depth_rpc(ctx, req)),
"sim_panic" => hyres(sim_panic(req)),
"recover_funds_of_swap" => {
#[cfg(feature = "native")]
Expand All @@ -174,7 +174,6 @@ pub fn dispatcher(req: Json, ctx: MmArc) -> DispatcherRes {
return DispatcherRes::NoMatch(req);
}
},
// "passphrase" => passphrase (ctx, req),
"sell" => hyres(sell(ctx, req)),
"show_priv_key" => hyres(show_priv_key(ctx, req)),
"send_raw_transaction" => hyres(send_raw_transaction(ctx, req)),
Expand Down

0 comments on commit 9f6398b

Please sign in to comment.