From 167979cd9cacf0689eb41219197e3e1aa5a52a4c Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Mon, 27 May 2024 16:21:50 +0200 Subject: [PATCH] feat: Rewrite order matching strategy Refactors the current trading component into a clearly separated orderbook component and a trade execution component. The linking part is the `ExecutableMatch` which can be derived from the matches stored into the database. At the moment we assume optimistically that the trade execution will succeed. However, we should consider that a pending match may never get filled or it fails at execution in such a scenario we would need to rollback the matched orders. --- coordinator/src/bin/coordinator.rs | 43 +- coordinator/src/node/expired_positions.rs | 29 +- coordinator/src/node/liquidated_positions.rs | 37 +- coordinator/src/orderbook/async_match.rs | 102 +- coordinator/src/orderbook/db/matches.rs | 63 +- coordinator/src/orderbook/db/orders.rs | 39 + coordinator/src/orderbook/mod.rs | 607 ++++++++++++ coordinator/src/orderbook/trading.rs | 929 +++---------------- coordinator/src/orderbook/websocket.rs | 42 +- coordinator/src/routes.rs | 8 +- coordinator/src/routes/orderbook.rs | 246 +++-- coordinator/src/trade/mod.rs | 413 ++++++--- crates/dev-maker/src/main.rs | 2 +- crates/xxi-node/src/commons/message.rs | 4 + crates/xxi-node/src/commons/order.rs | 17 +- crates/xxi-node/src/commons/trade.rs | 13 +- crates/xxi-node/src/message_handler.rs | 2 +- crates/xxi-node/src/tests/dlc_channel.rs | 2 +- mobile/native/src/dlc/node.rs | 2 +- mobile/native/src/trade/order/handler.rs | 2 +- 20 files changed, 1287 insertions(+), 1315 deletions(-) diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index d9ab1344c..2247c1dd6 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -25,6 +25,7 @@ use coordinator::run_migration; use coordinator::scheduler::NotificationScheduler; use coordinator::settings::Settings; use coordinator::storage::CoordinatorTenTenOneStorage; +use coordinator::trade; use coordinator::trade::websocket::InternalPositionUpdateMessage; use diesel::r2d2; use diesel::r2d2::ConnectionManager; @@ -32,6 +33,8 @@ use diesel::PgConnection; use lnd_bridge::LndBridge; use rand::thread_rng; use rand::RngCore; +use rust_decimal::prelude::FromPrimitive; +use rust_decimal::Decimal; use std::backtrace::Backtrace; use std::net::IpAddr; use std::net::Ipv4Addr; @@ -261,21 +264,22 @@ async fn main() -> Result<()> { let (tx_orderbook_feed, _rx) = broadcast::channel(100); - let (_handle, trading_sender) = trading::start( - node.clone(), - tx_orderbook_feed.clone(), - auth_users_notifier.clone(), + let trade_executor = trade::spawn_trade_executor(node.clone(), auth_users_notifier.clone())?; + + let order_matching_fee_rate = + Decimal::from_f32(node.settings.read().await.order_matching_fee_rate).expect("to fit"); + + let orderbook_sender = trading::spawn_orderbook( + node.pool.clone(), notification_service.get_sender(), - network, - node.inner.oracle_pubkey, - ); - let _handle = async_match::monitor( - node.clone(), - node_event_handler.subscribe(), - auth_users_notifier.clone(), - network, - node.inner.oracle_pubkey, - ); + trade_executor.clone(), + tx_orderbook_feed.clone(), + order_matching_fee_rate, + )?; + + let _handle = + async_match::monitor(node.clone(), node_event_handler.subscribe(), trade_executor); + let _handle = rollover::monitor( pool.clone(), node_event_handler.subscribe(), @@ -294,11 +298,12 @@ async fn main() -> Result<()> { tokio::spawn({ let node = node.clone(); - let trading_sender = trading_sender.clone(); + let orderbook_sender = orderbook_sender.clone(); async move { loop { tokio::time::sleep(EXPIRED_POSITION_SYNC_INTERVAL).await; - if let Err(e) = expired_positions::close(node.clone(), trading_sender.clone()).await + if let Err(e) = + expired_positions::close(node.clone(), orderbook_sender.clone()).await { tracing::error!("Failed to close expired positions! Error: {e:#}"); } @@ -308,11 +313,11 @@ async fn main() -> Result<()> { tokio::spawn({ let node = node.clone(); - let trading_sender = trading_sender.clone(); + let orderbook_sender = orderbook_sender.clone(); async move { loop { tokio::time::sleep(LIQUIDATED_POSITION_SYNC_INTERVAL).await; - liquidated_positions::monitor(node.clone(), trading_sender.clone()).await + liquidated_positions::monitor(node.clone(), orderbook_sender.clone()).await } } }); @@ -325,7 +330,7 @@ async fn main() -> Result<()> { settings.clone(), exporter, NODE_ALIAS, - trading_sender, + orderbook_sender, tx_orderbook_feed, tx_position_feed, tx_user_feed, diff --git a/coordinator/src/node/expired_positions.rs b/coordinator/src/node/expired_positions.rs index 56fb094cb..46f6a1a97 100644 --- a/coordinator/src/node/expired_positions.rs +++ b/coordinator/src/node/expired_positions.rs @@ -1,11 +1,9 @@ use crate::db; use crate::node::Node; use crate::orderbook; -use crate::orderbook::db::orders; -use crate::orderbook::trading::NewOrderMessage; +use crate::orderbook::trading::OrderbookMessage; use crate::position::models::Position; use crate::position::models::PositionState; -use anyhow::anyhow; use anyhow::Context; use anyhow::Result; use rust_decimal::prelude::FromPrimitive; @@ -18,6 +16,7 @@ use xxi_node::commons::average_execution_price; use xxi_node::commons::Match; use xxi_node::commons::MatchState; use xxi_node::commons::NewMarketOrder; +use xxi_node::commons::NewOrder; use xxi_node::commons::OrderReason; use xxi_node::commons::OrderState; @@ -25,7 +24,7 @@ use xxi_node::commons::OrderState; /// not be larger than our refund transaction time lock. pub const EXPIRED_POSITION_TIMEOUT: Duration = Duration::days(7); -pub async fn close(node: Node, trading_sender: mpsc::Sender) -> Result<()> { +pub async fn close(node: Node, orderbook_sender: mpsc::Sender) -> Result<()> { let mut conn = node.pool.get()?; let positions = db::positions::Position::get_all_open_positions(&mut conn) @@ -50,8 +49,9 @@ pub async fn close(node: Node, trading_sender: mpsc::Sender) -> if order.expiry < OffsetDateTime::now_utc() { tracing::warn!(trader_id, order_id, "Matched order expired! Giving up on that position, looks like the corresponding dlc channel has to get force closed."); + // TODO(holzeis): It's not ideal that the order and match are updated by the trade + // executor. This should rather get updated by the orderbook. orderbook::db::orders::set_order_state(&mut conn, order.id, OrderState::Expired)?; - orderbook::db::matches::set_match_state_by_order_id( &mut conn, order.id, @@ -75,11 +75,13 @@ pub async fn close(node: Node, trading_sender: mpsc::Sender) -> tracing::debug!(trader_pk=%position.trader, %position.expiry_timestamp, "Attempting to close expired position"); + let order_id = uuid::Uuid::new_v4(); + let trader_pubkey = position.trader; let new_order = NewMarketOrder { - id: uuid::Uuid::new_v4(), + id: order_id, contract_symbol: position.contract_symbol, quantity: Decimal::try_from(position.quantity).expect("to fit into decimal"), - trader_id: position.trader, + trader_id: trader_pubkey, direction: position.trader_direction.opposite(), leverage: Decimal::from_f32(position.trader_leverage).expect("to fit into decimal"), // This order can basically not expire, but if the user does not come back online within @@ -89,18 +91,13 @@ pub async fn close(node: Node, trading_sender: mpsc::Sender) -> stable: position.stable, }; - let order = orders::insert_market_order(&mut conn, new_order.clone(), OrderReason::Expired) - .map_err(|e| anyhow!(e)) - .context("Failed to insert expired order into DB")?; - - let message = NewOrderMessage { - order, - channel_opening_params: None, + let message = OrderbookMessage::NewOrder { + new_order: NewOrder::Market(new_order), order_reason: OrderReason::Expired, }; - if let Err(e) = trading_sender.send(message).await { - tracing::error!(order_id=%new_order.id, trader_id=%new_order.trader_id, "Failed to submit new order for closing expired position. Error: {e:#}"); + if let Err(e) = orderbook_sender.send(message).await { + tracing::error!(%trader_pubkey, %order_id, "Failed to submit new order for closing expired position. Error: {e:#}"); continue; } } diff --git a/coordinator/src/node/liquidated_positions.rs b/coordinator/src/node/liquidated_positions.rs index 6a5b20490..acb01f78e 100644 --- a/coordinator/src/node/liquidated_positions.rs +++ b/coordinator/src/node/liquidated_positions.rs @@ -1,8 +1,7 @@ use crate::db; use crate::node::Node; use crate::orderbook; -use crate::orderbook::db::orders; -use crate::orderbook::trading::NewOrderMessage; +use crate::orderbook::trading::OrderbookMessage; use anyhow::Result; use rust_decimal::prelude::FromPrimitive; use rust_decimal::Decimal; @@ -17,6 +16,7 @@ use xxi_node::commons::Direction; use xxi_node::commons::Match; use xxi_node::commons::MatchState; use xxi_node::commons::NewMarketOrder; +use xxi_node::commons::NewOrder; use xxi_node::commons::OrderReason; use xxi_node::commons::OrderState; @@ -24,9 +24,9 @@ use xxi_node::commons::OrderState; /// should not be larger than our refund transaction time lock. pub const LIQUIDATION_POSITION_TIMEOUT: Duration = Duration::days(7); -pub async fn monitor(node: Node, trading_sender: mpsc::Sender) { +pub async fn monitor(node: Node, orderbook_sender: mpsc::Sender) { if let Err(e) = - check_if_positions_need_to_get_liquidated(trading_sender.clone(), node.clone()).await + check_if_positions_need_to_get_liquidated(orderbook_sender.clone(), node.clone()).await { tracing::error!("Failed to check if positions need to get liquidated. Error: {e:#}"); } @@ -35,7 +35,7 @@ pub async fn monitor(node: Node, trading_sender: mpsc::Sender) /// For all open positions, check if the maintenance margin has been reached. Send a liquidation /// async match to the traders whose positions have been liquidated. async fn check_if_positions_need_to_get_liquidated( - trading_sender: mpsc::Sender, + orderbook_sender: mpsc::Sender, node: Node, ) -> Result<()> { let mut conn = node.pool.get()?; @@ -121,11 +121,13 @@ async fn check_if_positions_need_to_get_liquidated( } } + let trader_pubkey = position.trader; + let order_id = uuid::Uuid::new_v4(); let new_order = NewMarketOrder { - id: uuid::Uuid::new_v4(), + id: order_id, contract_symbol: position.contract_symbol, quantity: Decimal::try_from(position.quantity).expect("to fit into decimal"), - trader_id: position.trader, + trader_id: trader_pubkey, direction: position.trader_direction.opposite(), leverage: Decimal::from_f32(position.trader_leverage).expect("to fit into decimal"), // This order can basically not expire, but if the user does not come back online @@ -140,26 +142,13 @@ async fn check_if_positions_need_to_get_liquidated( false => OrderReason::CoordinatorLiquidated, }; - let order = match orders::insert_market_order( - &mut conn, - new_order.clone(), - order_reason.clone(), - ) { - Ok(order) => order, - Err(e) => { - tracing::error!("Failed to insert liquidation order into DB. Error: {e:#}"); - continue; - } - }; - - let message = NewOrderMessage { - order, - channel_opening_params: None, + let message = OrderbookMessage::NewOrder { + new_order: NewOrder::Market(new_order), order_reason, }; - if let Err(e) = trading_sender.send(message).await { - tracing::error!(order_id=%new_order.id, trader_id=%new_order.trader_id, "Failed to submit new order for closing liquidated position. Error: {e:#}"); + if let Err(e) = orderbook_sender.send(message).await { + tracing::error!(%trader_pubkey, %order_id, "Failed to submit new order for closing liquidated position. Error: {e:#}"); continue; } } diff --git a/coordinator/src/orderbook/async_match.rs b/coordinator/src/orderbook/async_match.rs index 6dcd5e07c..ab9200e2e 100644 --- a/coordinator/src/orderbook/async_match.rs +++ b/coordinator/src/orderbook/async_match.rs @@ -1,55 +1,38 @@ use crate::check_version::check_version; -use crate::db; -use crate::message::TraderMessage; use crate::node::Node; use crate::orderbook::db::matches; use crate::orderbook::db::orders; -use crate::trade::TradeExecutor; -use anyhow::ensure; +use crate::trade::ExecutableMatch; use anyhow::Result; use bitcoin::secp256k1::PublicKey; -use bitcoin::secp256k1::XOnlyPublicKey; -use bitcoin::Network; use futures::future::RemoteHandle; use futures::FutureExt; -use rust_decimal::prelude::ToPrimitive; -use time::OffsetDateTime; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; use tokio::sync::mpsc; use tokio::task::spawn_blocking; -use xxi_node::commons; -use xxi_node::commons::ContractSymbol; -use xxi_node::commons::FilledWith; -use xxi_node::commons::Match; -use xxi_node::commons::Matches; use xxi_node::commons::OrderState; -use xxi_node::commons::TradeAndChannelParams; -use xxi_node::commons::TradeParams; use xxi_node::node::event::NodeEvent; pub fn monitor( node: Node, mut receiver: broadcast::Receiver, - notifier: mpsc::Sender, - network: Network, - oracle_pk: XOnlyPublicKey, + trade_executor: mpsc::Sender, ) -> RemoteHandle<()> { let (fut, remote_handle) = async move { loop { match receiver.recv().await { Ok(NodeEvent::Connected { peer: trader_id }) => { tokio::spawn({ - let notifier = notifier.clone(); let node = node.clone(); + let trade_executor = trade_executor.clone(); async move { tracing::debug!( %trader_id, "Checking if the user needs to be notified about pending matches" ); if let Err(e) = - process_pending_match(node, notifier, trader_id, network, oracle_pk) - .await + process_pending_match(node, trade_executor, trader_id).await { tracing::error!("Failed to process pending match. Error: {e:#}"); } @@ -77,10 +60,8 @@ pub fn monitor( /// Checks if there are any pending matches async fn process_pending_match( node: Node, - notifier: mpsc::Sender, - trader_id: PublicKey, - network: Network, - oracle_pk: XOnlyPublicKey, + trade_executor: mpsc::Sender, + trader_pubkey: PublicKey, ) -> Result<()> { let mut conn = spawn_blocking({ let node = node.clone(); @@ -89,75 +70,26 @@ async fn process_pending_match( .await .expect("task to complete")?; - if check_version(&mut conn, &trader_id).is_err() { - tracing::info!(%trader_id, "User is not on the latest version. Skipping check if user needs to be informed about pending matches."); + if check_version(&mut conn, &trader_pubkey).is_err() { + tracing::info!(%trader_pubkey, "User is not on the latest version. Skipping check if user needs to be informed about pending matches."); return Ok(()); } if let Some(order) = - orders::get_by_trader_id_and_state(&mut conn, trader_id, OrderState::Matched)? + orders::get_by_trader_id_and_state(&mut conn, trader_pubkey, OrderState::Matched)? { - tracing::debug!(%trader_id, order_id=%order.id, "Executing pending match"); + tracing::debug!(%trader_pubkey, order_id=%order.id, "Executing pending match"); let matches = matches::get_matches_by_order_id(&mut conn, order.id)?; - let filled_with = get_filled_with_from_matches(matches, network, oracle_pk)?; - let channel_opening_params = - db::channel_opening_params::get_by_order_id(&mut conn, order.id)?; - - tracing::info!(trader_id = %order.trader_id, order_id = %order.id, order_reason = ?order.order_reason, "Executing trade for match"); - let trade_executor = TradeExecutor::new(node, notifier); - trade_executor - .execute(&TradeAndChannelParams { - trade_params: TradeParams { - pubkey: trader_id, - contract_symbol: ContractSymbol::BtcUsd, - leverage: order.leverage, - quantity: order.quantity.to_f32().expect("to fit into f32"), - direction: order.direction, - filled_with, - }, - trader_reserve: channel_opening_params.map(|c| c.trader_reserve), - coordinator_reserve: channel_opening_params.map(|c| c.coordinator_reserve), - external_funding: channel_opening_params.and_then(|c| c.external_funding), - }) - .await; + tracing::info!(%trader_pubkey, order_id = %order.id, order_reason = ?order.order_reason, "Executing trade for match"); + if let Err(e) = trade_executor + .send(ExecutableMatch { order, matches }) + .await + { + tracing::error!(%trader_pubkey, order_id= %order.id, "Failed to execute trade. Error: {e:#}"); + } } Ok(()) } - -fn get_filled_with_from_matches( - matches: Vec, - network: Network, - oracle_pk: XOnlyPublicKey, -) -> Result { - ensure!( - !matches.is_empty(), - "Need at least one matches record to construct a FilledWith" - ); - - let order_id = matches - .first() - .expect("to have at least one match") - .order_id; - - let expiry_timestamp = commons::calculate_next_expiry(OffsetDateTime::now_utc(), network); - - Ok(FilledWith { - order_id, - expiry_timestamp, - oracle_pk, - matches: matches - .iter() - .map(|m| Match { - id: m.id, - order_id: m.order_id, - quantity: m.quantity, - pubkey: m.match_trader_id, - execution_price: m.execution_price, - matching_fee: m.matching_fee, - }) - .collect(), - }) -} diff --git a/coordinator/src/orderbook/db/matches.rs b/coordinator/src/orderbook/db/matches.rs index 5de73144c..554280132 100644 --- a/coordinator/src/orderbook/db/matches.rs +++ b/coordinator/src/orderbook/db/matches.rs @@ -1,5 +1,4 @@ use crate::orderbook::db::custom_types::MatchState; -use crate::orderbook::trading::TraderMatchParams; use crate::schema::matches; use anyhow::ensure; use anyhow::Result; @@ -37,16 +36,33 @@ struct Matches { pub matching_fee_sats: i64, } -pub fn insert(conn: &mut PgConnection, match_params: &TraderMatchParams) -> Result<()> { - for record in Matches::new(match_params, MatchState::Pending) { - let affected_rows = diesel::insert_into(matches::table) - .values(record.clone()) - .execute(conn)?; - - ensure!(affected_rows > 0, "Could not insert matches"); - } +pub fn insert( + conn: &mut PgConnection, + order: &commons::Order, + matched_order: &commons::Order, + matching_fee: Amount, + match_state: commons::MatchState, + quantity: Decimal, +) -> QueryResult { + let match_ = Matches { + id: Uuid::new_v4(), + match_state: match_state.into(), + order_id: order.id, + trader_id: order.trader_id.to_string(), + match_order_id: matched_order.id, + match_trader_id: matched_order.trader_id.to_string(), + execution_price: matched_order.price.to_f32().expect("to fit into f32"), + quantity: quantity.to_f32().expect("to fit into f32"), + created_at: OffsetDateTime::now_utc(), + updated_at: OffsetDateTime::now_utc(), + matching_fee_sats: matching_fee.to_sat() as i64, + }; + + diesel::insert_into(matches::table) + .values(match_.clone()) + .execute(conn)?; - Ok(()) + Ok(match_.into()) } pub fn set_match_state( @@ -89,33 +105,6 @@ pub fn set_match_state_by_order_id( Ok(()) } -impl Matches { - pub fn new(match_params: &TraderMatchParams, match_state: MatchState) -> Vec { - let order_id = match_params.filled_with.order_id; - let updated_at = OffsetDateTime::now_utc(); - let trader_id = match_params.trader_id; - - match_params - .filled_with - .matches - .iter() - .map(|m| Matches { - id: m.id, - match_state, - order_id, - trader_id: trader_id.to_string(), - match_order_id: m.order_id, - match_trader_id: m.pubkey.to_string(), - execution_price: m.execution_price.to_f32().expect("to fit into f32"), - quantity: m.quantity.to_f32().expect("to fit into f32"), - created_at: updated_at, - updated_at, - matching_fee_sats: m.matching_fee.to_sat() as i64, - }) - .collect() - } -} - impl From for Matches { fn from(value: commons::Matches) -> Self { Matches { diff --git a/coordinator/src/orderbook/db/orders.rs b/coordinator/src/orderbook/db/orders.rs index 02bba5df0..f01cdb7bc 100644 --- a/coordinator/src/orderbook/db/orders.rs +++ b/coordinator/src/orderbook/db/orders.rs @@ -410,6 +410,22 @@ pub fn delete(conn: &mut PgConnection, id: Uuid) -> QueryResult set_order_state(conn, id, commons::OrderState::Deleted) } +pub fn set_order_state_partially_taken( + conn: &mut PgConnection, + id: Uuid, + quantity: Decimal, +) -> QueryResult { + let order: Order = diesel::update(orders::table) + .filter(orders::order_id.eq(id)) + .set(( + orders::order_state.eq(OrderState::Taken), + orders::quantity.eq(quantity.to_f32().expect("to fit")), + )) + .get_result(conn)?; + + Ok(OrderbookOrder::from(order)) +} + /// Returns the number of affected rows: 1. pub fn set_order_state( conn: &mut PgConnection, @@ -511,3 +527,26 @@ pub fn get_all_limit_order_filled_matches( Ok(filled_matches) } + +/// Updates a given order if it is still open. +pub fn update_order(conn: &mut PgConnection, order: OrderbookOrder) -> QueryResult { + let order: Order = diesel::update(orders::table) + .set(( + orders::direction.eq::(order.direction.into()), + orders::timestamp.eq(OffsetDateTime::now_utc()), + orders::quantity.eq(order.quantity.to_f32().expect("to fit")), + orders::price.eq(order.price.to_f32().expect("to fit")), + orders::leverage.eq(order.leverage), + orders::expiry.eq(order.expiry), + )) + .filter( + orders::order_id.eq(order.id).and( + orders::trader_pubkey + .eq(order.trader_id.to_string()) + .and(orders::order_state.eq(OrderState::Open)), + ), + ) + .get_result(conn)?; + + Ok(order.into()) +} diff --git a/coordinator/src/orderbook/mod.rs b/coordinator/src/orderbook/mod.rs index 1450ab5c9..dbf0d1b92 100644 --- a/coordinator/src/orderbook/mod.rs +++ b/coordinator/src/orderbook/mod.rs @@ -1,3 +1,37 @@ +use crate::orderbook::db::matches; +use crate::orderbook::db::orders; +use crate::referrals; +use crate::trade::ExecutableMatch; +use anyhow::anyhow; +use anyhow::bail; +use anyhow::Context; +use anyhow::Result; +use bitcoin::secp256k1::PublicKey; +use bitcoin::Amount; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; +use diesel::result::Error::RollbackTransaction; +use diesel::Connection; +use diesel::PgConnection; +use rust_decimal::prelude::ToPrimitive; +use rust_decimal::Decimal; +use rust_decimal::RoundingStrategy; +use std::cmp::Ordering; +use std::collections::HashMap; +use std::vec; +use tokio::task::spawn_blocking; +use uuid::Uuid; +use xxi_node::commons; +use xxi_node::commons::MatchState; +use xxi_node::commons::Message; +use xxi_node::commons::Message::DeleteOrder; +use xxi_node::commons::NewLimitOrder; +use xxi_node::commons::NewMarketOrder; +use xxi_node::commons::Order; +use xxi_node::commons::OrderReason; +use xxi_node::commons::OrderState; +use xxi_node::commons::OrderType; + pub mod async_match; pub mod collaborative_revert; pub mod db; @@ -6,3 +40,576 @@ pub mod websocket; #[cfg(test)] mod tests; + +struct Orderbook { + pool: Pool>, + shorts: OrderbookSide, + longs: OrderbookSide, + // TODO(holzeis): Split up order matching fee rate into taker and maker fees. + order_matching_fee_rate: Decimal, +} + +struct OrderbookSide { + orders: Vec, +} + +impl OrderbookSide { + pub fn new() -> Self { + Self { orders: vec![] } + } + /// adds the given order to the orderbook. + pub fn add_order(&mut self, order: Order) { + self.orders.push(order); + self.sort(order.direction); + } + /// updates an order in the orderbook. Ignores the update if order does not exist. + pub fn update_order(&mut self, updated_order: Order) { + if let Some(order) = self.orders.iter_mut().find(|o| o.id == updated_order.id) { + *order = updated_order; + } + self.sort(updated_order.direction); + } + /// removes the order by the given id from the orderbook. + pub fn remove_order(&mut self, order_id: Uuid) { + self.orders.retain(|o| o.id != order_id); + } + /// removes the first order from the sorted list representing the best order in the orderbook. + pub fn take_order(&mut self) -> Option { + match self.orders.is_empty() { + true => None, + false => Some(self.orders.remove(0)), + } + } + /// matches orders from the orderbook for the given quantity. + pub fn match_order(&mut self, quantity: Decimal) -> Vec { + let mut matched_orders = vec![]; + + let mut quantity = quantity; + while let Some(mut order) = self.take_order() { + if order.is_expired() { + // ignore expired orders. + continue; + } + + match order.quantity.cmp(&quantity) { + Ordering::Less => { + // if the found order has less quantity we subtract the full quantity from the + // searched quantity and add the limit order to the matched orders. + quantity -= order.quantity; + matched_orders.push(order); + } + Ordering::Greater => { + // if the found order has more quantity than needed, we create a remaining order + // from the left over quantity. Note, this will only be kept in memory and + // applied to the orderbook state by joining the trades. + let mut remaining_order = order; + remaining_order.quantity -= quantity; + self.add_order(remaining_order); + + // update the matched order quantity with the searched for quantity. + order.quantity = quantity; + matched_orders.push(order); + + // we found enough liquidity in the order book to match the order. + break; + } + Ordering::Equal => { + // we found a perfect match for the searched for quantity. + matched_orders.push(order); + break; + } + } + } + + matched_orders + } + /// rollback matched orders + pub fn rollback_matched_orders(&mut self, matched_orders: Vec) { + for order in matched_orders { + if let Some(order) = self.orders.iter_mut().find(|o| o.id == order.id) { + order.quantity += order.quantity; + } else { + self.add_order(order); + } + } + } + + /// Sorts the orders ascending if long and descending if short, + fn sort(&mut self, direction: commons::Direction) { + self.orders.sort_by(|a, b| { + if a.price.cmp(&b.price) == Ordering::Equal { + return a.timestamp.cmp(&b.timestamp); + } + match direction { + // Descending order. + commons::Direction::Short => b.price.cmp(&a.price), + // Ascending order. + commons::Direction::Long => a.price.cmp(&b.price), + } + }) + } +} + +impl Orderbook { + pub fn new( + pool: Pool>, + order_matching_fee_rate: Decimal, + ) -> Self { + Self { + pool, + shorts: OrderbookSide::new(), + longs: OrderbookSide::new(), + order_matching_fee_rate, + } + } + + /// Initializes the orderbook with non expired open limit orders. + async fn initialize(&mut self) -> Result<()> { + let all_orders = spawn_blocking({ + let mut conn = self.pool.clone().get()?; + move || { + // TODO(holzeis): join with trades to get partially matched orders. + let all_orders = + orders::get_all_orders(&mut conn, OrderType::Limit, OrderState::Open, true)?; + anyhow::Ok(all_orders) + } + }) + .await??; + + // TODO(holzeis): maybe consider batch adding all orders and sort afterwards instead of + // adding every order individually. + for order in all_orders { + match order.direction { + commons::Direction::Short => self.shorts.add_order(order), + commons::Direction::Long => self.longs.add_order(order), + } + } + + Ok(()) + } + + /// Adds a limit order to the orderbook. + async fn add_limit_order(&mut self, new_order: NewLimitOrder) -> Result { + let order = spawn_blocking({ + let mut conn = self.pool.clone().get()?; + move || { + let order = orders::insert_limit_order(&mut conn, new_order, OrderReason::Manual) + .map_err(|e| anyhow!(e)) + .context("Failed to insert new order into DB")?; + + anyhow::Ok(order) + } + }) + .await??; + + match order.direction { + commons::Direction::Short => self.shorts.add_order(order), + commons::Direction::Long => self.longs.add_order(order), + } + Ok(Message::NewOrder(order)) + } + + /// Matches a market order against the orderbook. Will fail if the market order can't be fully + /// matched. + async fn match_market_order( + &mut self, + new_order: NewMarketOrder, + order_reason: OrderReason, + ) -> Result> { + let trader_pubkey = new_order.trader_id; + // TODO(holzeis): We might want to delay that action in a regular task allowing us to + // process orderbook updates much faster in memory and postpone the expensive database + // transactions. + let order = spawn_blocking({ + let mut conn = self.pool.clone().get()?; + move || { + let order = orders::insert_market_order(&mut conn, new_order, order_reason) + .map_err(|e| anyhow!(e)) + .context("Failed to insert new order into DB")?; + + anyhow::Ok(order) + } + }) + .await??; + + // find a match for the market order. + let matched_orders = match order.direction.opposite() { + commons::Direction::Short => self.shorts.match_order(order.quantity), + commons::Direction::Long => self.longs.match_order(order.quantity), + }; + + let fail_order = { + let order_id = order.id; + let pool = self.pool.clone(); + move || { + let mut conn = pool.get()?; + orders::set_order_state(&mut conn, order_id, OrderState::Failed)?; + anyhow::Ok(()) + } + }; + + let matched_quantity: Decimal = matched_orders.iter().map(|o| o.quantity).sum(); + if matched_quantity != order.quantity { + // not enough liquidity in the orderbook. + tracing::warn!( + trader_pubkey = %order.trader_id, + order_id = %order.id, + wanted = %order.quantity, + got = %matched_quantity, + "Couldn't match order due to insufficient liquidity in the orderbook." + ); + match order.direction.opposite() { + commons::Direction::Short => self.shorts.rollback_matched_orders(matched_orders), + commons::Direction::Long => self.longs.rollback_matched_orders(matched_orders), + } + + spawn_blocking(fail_order).await??; + bail!("No match found."); + } + + // apply match to database. + let matches = spawn_blocking({ + let mut conn = self.pool.clone().get()?; + let matched_orders = matched_orders.clone(); + let fee_percent = self.order_matching_fee_rate; + move || { + let mut matches: HashMap = HashMap::new(); + conn.transaction(|conn| { + let status = + referrals::get_referral_status(order.trader_id, conn).map_err(|_| RollbackTransaction)?; + let fee_discount = status.referral_fee_bonus; + let fee_percent = fee_percent - (fee_percent * fee_discount); + + tracing::debug!(%trader_pubkey, %fee_discount, total_fee_percent = %fee_percent, "Fee discount calculated"); + + let order = orders::set_order_state(conn, order.id, OrderState::Matched)?; + for matched_order in matched_orders { + let matching_fee = matched_order.quantity / matched_order.price * fee_percent; + let matching_fee = matching_fee.round_dp_with_strategy(8, RoundingStrategy::MidpointAwayFromZero); + let matching_fee = match Amount::from_btc(matching_fee.to_f64().expect("to fit")) { + Ok(fee) => fee, + Err(err) => { + tracing::error!( + trader_pubkey = matched_order.trader_id.to_string(), + order_id = matched_order.id.to_string(), + "Failed calculating order matching fee for order {err:?}. Falling back to 0" + ); + Amount::ZERO + } + }; + + let taker_match = matches::insert(conn, &order, &matched_order, matching_fee, MatchState::Pending, matched_order.quantity)?; + if let Some(taker_matches) = matches.get_mut(&order.trader_id) { + taker_matches.matches.push(taker_match); + } else { + matches.insert(order.trader_id, ExecutableMatch{ + order, + matches: vec![taker_match] + }); + } + + // TODO(holzeis): For now we don't execute the limit order with the maker as the our maker does not + // have a dlc channel with the coordinator, hence we set the match directly to filled. Once we + // introduce actual makers these matches need to execute them. + let _maker_match = matches::insert(conn, &matched_order, &order, matching_fee, MatchState::Filled, matched_order.quantity)?; + orders::set_order_state(conn, matched_order.id, OrderState::Taken)?; + + // TODO(holzeis): Add executeable match once we support actual limit orders. + // if let Some(maker_matches) = matches.get_mut(&matched_order.trader_id) { + // maker_matches.matches.push(taker_match); + // } else { + // matches.insert(order.trader_id, ExecutableMatch{ + // order, + // matches: vec![taker_match] + // }); + // } + + } + + diesel::result::QueryResult::Ok(()) + }).inspect_err(|_| { + if let Err(e) = fail_order() { + tracing::error!(%trader_pubkey, order_id=%order.id, "Failed to fail order. Error: {e:#}"); + } + })?; + + anyhow::Ok(matches) + } + }) + .await? + .inspect_err(move |_| match order.direction.opposite() { + // if something goes wrong we have to rollback the matched orders to the orderbook. + commons::Direction::Short => self.shorts.rollback_matched_orders(matched_orders), + commons::Direction::Long => self.longs.rollback_matched_orders(matched_orders), + })?; + + Ok(matches.into_values().collect::>()) + } + + /// Updates an order in the orderbook. If the order id couldn't be found the update will be + /// ignored. + async fn update_order(&mut self, order: Order) -> Result> { + // TODO(holzeis): We might want to delay that action in a regular task allowing us to + // process orderbook updates much faster in memory and postpone the expensive + // database transactions. + let order = spawn_blocking({ + let mut conn = self.pool.clone().get()?; + move || { + let order = orders::update_order(&mut conn, order)?; + anyhow::Ok(order) + } + }) + .await??; + + let message = match order.order_type { + OrderType::Market => None, + OrderType::Limit => { + match order.direction { + commons::Direction::Short => self.shorts.update_order(order), + commons::Direction::Long => self.longs.update_order(order), + } + + Some(Message::Update(order)) + } + }; + + Ok(message) + } + + /// Removes a limit order from the orderbook. Ignores removing market orders, since they aren't + /// stored into the orderbook. + async fn remove_order(&mut self, order_id: Uuid) -> Result> { + // TODO(holzeis): We might want to delay that action in a regular task allowing us to + // process orderbook updates much faster in memory and postpone the expensive database + // transactions. + let order = spawn_blocking({ + let mut conn = self.pool.clone().get()?; + move || { + let matches = matches::get_matches_by_order_id(&mut conn, order_id)?; + + let order = if !matches.is_empty() { + // order has been at least partially matched. + let matched_quantity = matches.iter().map(|m| m.quantity).sum(); + + orders::set_order_state_partially_taken(&mut conn, order_id, matched_quantity) + } else { + orders::delete(&mut conn, order_id) + }?; + + anyhow::Ok(order) + } + }) + .await??; + + let message = match (order.order_type, order.direction) { + (OrderType::Limit, commons::Direction::Short) => { + self.shorts.remove_order(order_id); + Some(DeleteOrder(order_id)) + } + (OrderType::Limit, commons::Direction::Long) => { + self.longs.remove_order(order_id); + Some(DeleteOrder(order_id)) + } + (OrderType::Market, _) => None, + }; + + Ok(message) + } +} + +#[cfg(test)] +mod orderbook_tests { + use crate::orderbook::OrderbookSide; + use bitcoin::secp256k1::PublicKey; + use rust_decimal::Decimal; + use rust_decimal::RoundingStrategy; + use rust_decimal_macros::dec; + use std::str::FromStr; + use time::Duration; + use time::OffsetDateTime; + use uuid::Uuid; + use xxi_node::commons::ContractSymbol; + use xxi_node::commons::Direction; + use xxi_node::commons::Order; + use xxi_node::commons::OrderReason; + use xxi_node::commons::OrderState; + use xxi_node::commons::OrderType; + + #[test] + pub fn test_add_order_to_orderbook_side() { + let mut longs = OrderbookSide::new(); + + let long_order = dummy_order(dec!(100), dec!(50000), Direction::Long); + longs.add_order(long_order); + + let order = longs.take_order(); + + assert_eq!(Some(long_order), order); + } + + #[test] + pub fn test_update_order_to_orderbook_side() { + let mut longs = OrderbookSide::new(); + + let mut long_order = dummy_order(dec!(100), dec!(50000), Direction::Long); + longs.add_order(long_order); + + long_order.quantity = dec!(200); + longs.update_order(long_order); + + let order = longs.take_order(); + assert_eq!(Some(long_order), order); + + assert_eq!(None, longs.take_order()); + } + + #[test] + pub fn test_remove_order_from_orderbook_side() { + let mut longs = OrderbookSide::new(); + + let long_order = dummy_order(dec!(100), dec!(50000), Direction::Long); + longs.add_order(long_order); + + longs.remove_order(long_order.id); + + let order = longs.take_order(); + assert_eq!(None, order); + } + + #[test] + pub fn test_remove_invalid_order_id_from_orderbook_side() { + let mut longs = OrderbookSide::new(); + + let long_order = dummy_order(dec!(100), dec!(50000), Direction::Long); + longs.add_order(long_order); + + longs.remove_order(Uuid::new_v4()); + + let order = longs.take_order(); + assert_eq!(Some(long_order), order); + } + + #[test] + pub fn test_match_order_exact_match_single_order() { + let mut longs = OrderbookSide::new(); + + let long_order = dummy_order(dec!(100), dec!(50000), Direction::Long); + longs.add_order(long_order); + + let matched_orders = longs.match_order(dec!(100)); + assert_eq!(1, matched_orders.len()); + assert_eq!(dec!(100), matched_orders.iter().map(|m| m.quantity).sum()); + + assert_eq!(None, longs.take_order()); + } + + #[test] + pub fn test_match_order_partial_limit_order_match_single_order() { + let mut longs = OrderbookSide::new(); + + let long_order = dummy_order(dec!(100), dec!(50000), Direction::Long); + longs.add_order(long_order); + + let matched_orders = longs.match_order(dec!(25)); + assert_eq!(1, matched_orders.len()); + assert_eq!(dec!(25), matched_orders.iter().map(|m| m.quantity).sum()); + + assert_eq!( + Some(Order { + quantity: dec!(75), + ..long_order + }), + longs.take_order() + ); + } + + #[test] + pub fn test_match_order_partial_market_order_match_single_order() { + let mut longs = OrderbookSide::new(); + + let long_order = dummy_order(dec!(100), dec!(50000), Direction::Long); + longs.add_order(long_order); + + let matched_orders = longs.match_order(dec!(125)); + assert_eq!(1, matched_orders.len()); + assert_eq!(dec!(100), matched_orders.iter().map(|m| m.quantity).sum()); + + assert_eq!(None, longs.take_order()); + } + + #[test] + pub fn test_match_order_partial_match_multiple_orders() { + let mut longs = OrderbookSide::new(); + + let long_order_1 = dummy_order(dec!(25), dec!(50100), Direction::Long); + longs.add_order(long_order_1); + + let long_order_2 = dummy_order(dec!(40), dec!(50200), Direction::Long); + longs.add_order(long_order_2); + + let long_order_3 = dummy_order(dec!(35), dec!(50300), Direction::Long); + longs.add_order(long_order_3); + + let matched_orders = longs.match_order(dec!(50)); + assert_eq!(2, matched_orders.len()); + assert_eq!(dec!(50), matched_orders.iter().map(|m| m.quantity).sum()); + + assert_eq!(dec!(50149.95), average_entry_price(&matched_orders)); + + assert_eq!( + Some(Order { + quantity: dec!(15), + ..long_order_2 + }), + longs.take_order() + ); + + assert_eq!( + Some(Order { + quantity: dec!(35), + ..long_order_3 + }), + longs.take_order() + ); + } + + fn dummy_order(quantity: Decimal, price: Decimal, direction: Direction) -> Order { + Order { + id: Uuid::new_v4(), + price, + trader_id: dummy_public_key(), + direction, + leverage: 1.0, + contract_symbol: ContractSymbol::BtcUsd, + quantity, + order_type: OrderType::Market, + timestamp: OffsetDateTime::now_utc(), + expiry: OffsetDateTime::now_utc() + Duration::minutes(1), + order_state: OrderState::Open, + order_reason: OrderReason::Manual, + stable: false, + } + } + + fn dummy_public_key() -> PublicKey { + PublicKey::from_str("02bd998ebd176715fe92b7467cf6b1df8023950a4dd911db4c94dfc89cc9f5a655") + .unwrap() + } + + fn average_entry_price(orders: &[Order]) -> Decimal { + if orders.is_empty() { + return Decimal::ZERO; + } + if orders.len() == 1 { + return orders.first().expect("to be exactly one").price; + } + let sum_quantity = orders.iter().fold(dec!(0), |acc, m| acc + m.quantity); + + let nominal_prices = orders + .iter() + .fold(dec!(0), |acc, m| acc + (m.quantity / m.price)); + + (sum_quantity / nominal_prices) + .round_dp_with_strategy(2, RoundingStrategy::MidpointAwayFromZero) + } +} diff --git a/coordinator/src/orderbook/trading.rs b/coordinator/src/orderbook/trading.rs index 7dfbf4997..a745e3ec5 100644 --- a/coordinator/src/orderbook/trading.rs +++ b/coordinator/src/orderbook/trading.rs @@ -1,836 +1,161 @@ -use crate::db; -use crate::message::TraderMessage; -use crate::node::Node; use crate::notifications::Notification; use crate::notifications::NotificationKind; -use crate::orderbook::db::matches; -use crate::orderbook::db::orders; -use crate::referrals; -use crate::trade::TradeExecutor; -use crate::ChannelOpeningParams; -use anyhow::anyhow; -use anyhow::bail; -use anyhow::Context; +use crate::orderbook::Orderbook; +use crate::trade::ExecutableMatch; use anyhow::Result; use bitcoin::secp256k1::PublicKey; -use bitcoin::secp256k1::XOnlyPublicKey; -use bitcoin::Amount; -use bitcoin::Network; -use futures::future::RemoteHandle; -use futures::FutureExt; -use rust_decimal::prelude::ToPrimitive; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; +use diesel::PgConnection; use rust_decimal::Decimal; -use rust_decimal::RoundingStrategy; -use std::cmp::Ordering; -use time::OffsetDateTime; use tokio::sync::broadcast; use tokio::sync::mpsc; -use tokio::task::spawn_blocking; use uuid::Uuid; -use xxi_node::commons; -use xxi_node::commons::ContractSymbol; -use xxi_node::commons::Direction; -use xxi_node::commons::FilledWith; -use xxi_node::commons::Match; use xxi_node::commons::Message; -use xxi_node::commons::Message::TradeError; +use xxi_node::commons::NewOrder; use xxi_node::commons::Order; use xxi_node::commons::OrderReason; -use xxi_node::commons::OrderState; -use xxi_node::commons::OrderType; -use xxi_node::commons::TradeAndChannelParams; -use xxi_node::commons::TradeParams; -use xxi_node::commons::TradingError; /// This value is arbitrarily set to 100 and defines the number of new order messages buffered in /// the channel. -const NEW_ORDERS_BUFFER_SIZE: usize = 100; - -pub struct NewOrderMessage { - pub order: Order, - pub order_reason: OrderReason, - pub channel_opening_params: Option, -} - -#[derive(Clone)] -pub struct MatchParams { - pub taker_match: TraderMatchParams, - pub makers_matches: Vec, -} - -#[derive(Clone)] -pub struct TraderMatchParams { - pub trader_id: PublicKey, - pub filled_with: FilledWith, +const ORDERBOOK_BUFFER_SIZE: usize = 100; + +#[derive(Debug)] +pub enum OrderbookMessage { + NewOrder { + new_order: NewOrder, + order_reason: OrderReason, + }, + DeleteOrder(Uuid), + Update(Order), } -/// Spawn a task that processes [`NewOrderMessage`]s. -/// -/// To feed messages to this task, the caller can use the corresponding -/// [`mpsc::Sender`] returned. -pub fn start( - node: Node, - tx_orderbook_feed: broadcast::Sender, - trade_notifier: mpsc::Sender, +pub fn spawn_orderbook( + pool: Pool>, notifier: mpsc::Sender, - network: Network, - oracle_pk: XOnlyPublicKey, -) -> (RemoteHandle<()>, mpsc::Sender) { - let (sender, mut receiver) = mpsc::channel::(NEW_ORDERS_BUFFER_SIZE); - - let (fut, remote_handle) = async move { - while let Some(new_order_msg) = receiver.recv().await { - tokio::spawn({ - let tx_orderbook_feed = tx_orderbook_feed.clone(); - let notifier = notifier.clone(); - let trade_notifier = trade_notifier.clone(); - let node = node.clone(); - async move { - let new_order = new_order_msg.order; - let trader_id = new_order.trader_id; - let order_id = new_order.id; - let channel_opening_params = new_order_msg.channel_opening_params; - - tracing::trace!( - %trader_id, - %order_id, - order_type = ?new_order.order_type, - "Processing new order", - ); - - if let Err(error) = match &new_order.order_type { - OrderType::Market => { - process_new_market_order( - node, - notifier.clone(), - trade_notifier.clone(), - &new_order, - network, - oracle_pk, - channel_opening_params - ) - .await + trade_executor: mpsc::Sender, + tx_orderbook_feed: broadcast::Sender, + order_matching_fee_rate: Decimal, +) -> Result> { + let mut orderbook = Orderbook::new(pool.clone(), order_matching_fee_rate); + + let (sender, mut receiver) = mpsc::channel::(ORDERBOOK_BUFFER_SIZE); + + tokio::spawn({ + let notifier = notifier.clone(); + async move { + if let Err(e) = orderbook.initialize().await { + tracing::error!("Failed to initialize orderbook. Error: {e:#}"); + return; + } + while let Some(message) = receiver.recv().await { + let msg = match message { + OrderbookMessage::NewOrder { + new_order: NewOrder::Market(new_order), + order_reason, + } => { + let trader_pubkey = new_order.trader_id; + let order_id = new_order.id; + match orderbook.match_market_order(new_order, order_reason).await { + Ok(executable_matches) => { + // in case of an async match (e.g. expired, liquidation) the user + // will get a push notification. + if let Err(e) = + notify_user(notifier.clone(), trader_pubkey, order_reason).await + { + tracing::warn!(%trader_pubkey, %order_id, ?order_reason, "Failed to send push notification. Error: {e:#}"); + } + + for executable_match in executable_matches { + if let Err(e) = trade_executor.send(executable_match).await { + tracing::error!(%trader_pubkey, %order_id, ?order_reason, "Failed to execute trade. Error: {e:#}"); + } + } + + None + } + Err(e) => { + tracing::error!(%trader_pubkey, %order_id, "Failed to match new market order. Error: {e:#}"); + Some(Message::TradeError { + order_id, + error: e.into(), + }) + } } - OrderType::Limit => { - process_new_limit_order( - tx_orderbook_feed, - new_order.clone(), - ) - .await + } + OrderbookMessage::NewOrder { + new_order: NewOrder::Limit(new_order), + .. + } => { + let trader_pubkey = new_order.trader_id; + let order_id = new_order.id; + match orderbook.add_limit_order(new_order).await { + Ok(message) => Some(message), + Err(e) => { + tracing::error!(%trader_pubkey, %order_id, "Failed to process new limit order. Error: {e:#}"); + continue; + } } - } { - - if new_order.order_reason == OrderReason::Manual { - // TODO(holzeis): the maker is currently not subscribed to the websocket - // api, hence it wouldn't receive the error message. - if let Err(e) = trade_notifier - .send(TraderMessage { - trader_id, - message: TradeError { order_id, error }, - notification: None, - }) - .await - { - tracing::error!(%trader_id, %order_id, "Failed to send trade error. Error: {e:#}"); + } + OrderbookMessage::DeleteOrder(order_id) => { + match orderbook.remove_order(order_id).await { + Ok(message) => message, + Err(e) => { + tracing::error!(%order_id, "Failed to process remove order. Error: {e:#}"); + continue; } } } - } - }); - } - - tracing::error!("Channel closed"); - } - .remote_handle(); - - tokio::spawn(fut); - - (remote_handle, sender) -} - -pub async fn process_new_limit_order( - tx_orderbook_feed: broadcast::Sender, - order: Order, -) -> Result<(), TradingError> { - tx_orderbook_feed - .send(Message::NewOrder(order)) - .map_err(|e| anyhow!(e)) - .context("Could not update price feed")?; - - Ok(()) -} - -// TODO(holzeis): This functions runs multiple inserts in separate db transactions. This should only -// happen in a single transaction to ensure either all data or nothing is stored to the database. -pub async fn process_new_market_order( - node: Node, - notifier: mpsc::Sender, - trade_notifier: mpsc::Sender, - order: &Order, - network: Network, - oracle_pk: XOnlyPublicKey, - channel_opening_params: Option, -) -> Result<(), TradingError> { - let mut conn = spawn_blocking({ - let node = node.clone(); - move || node.pool.get() - }) - .await - .expect("task to complete") - .map_err(|e| anyhow!("{e:#}"))?; - - // Reject new order if there is already a matched order waiting for execution. - if let Some(order) = - orders::get_by_trader_id_and_state(&mut conn, order.trader_id, OrderState::Matched) - .map_err(|e| anyhow!("{e:#}"))? - { - return Err(TradingError::InvalidOrder(format!( - "trader_id={}, order_id={}. Order is currently in execution. \ - Can't accept new orders until the order execution is finished", - order.trader_id, order.id - ))); - } - - let opposite_direction_limit_orders = orders::all_by_direction_and_type( - &mut conn, - order.direction.opposite(), - OrderType::Limit, - true, - ) - .map_err(|e| anyhow!("{e:#}"))?; - - let fee_percent = { node.settings.read().await.order_matching_fee_rate }; - let fee_percent = Decimal::try_from(fee_percent).expect("to fit into decimal"); - - let trader_pubkey_string = order.trader_id.to_string(); - let status = referrals::get_referral_status(order.trader_id, &mut conn)?; - let fee_discount = status.referral_fee_bonus; - let fee_percent = fee_percent - (fee_percent * fee_discount); - - tracing::debug!( - trader_pubkey = trader_pubkey_string, - %fee_discount, total_fee_percent = %fee_percent, "Fee discount calculated"); - - let matched_orders = match match_order( - order, - opposite_direction_limit_orders, - network, - oracle_pk, - fee_percent, - ) { - Ok(Some(matched_orders)) => matched_orders, - Ok(None) => { - // TODO(holzeis): Currently we still respond to the user immediately if there - // has been a match or not, that's the reason why we also have to set the order - // to failed here. But actually we could keep the order until either expired or - // a match has been found and then update the state accordingly. - - orders::set_order_state(&mut conn, order.id, OrderState::Failed) - .map_err(|e| anyhow!("{e:#}"))?; - return Err(TradingError::NoMatchFound(format!( - "Could not match order {}", - order.id - ))); - } - Err(e) => { - orders::set_order_state(&mut conn, order.id, OrderState::Failed) - .map_err(|e| anyhow!("{e:#}"))?; - return Err(TradingError::Other(format!("Failed to match order: {e:#}"))); - } - }; - - tracing::info!( - trader_id=%order.trader_id, - order_id=%order.id, - "Found a match with {} makers for new order", - matched_orders.taker_match.filled_with.matches.len() - ); - - for match_param in matched_orders.matches() { - matches::insert(&mut conn, match_param)?; - - let trader_id = match_param.trader_id; - let order_id = match_param.filled_with.order_id.to_string(); - - tracing::info!(%trader_id, order_id, "Notifying trader about match"); - - let notification = match &order.order_reason { - OrderReason::Expired => Some(NotificationKind::PositionExpired), - OrderReason::TraderLiquidated => Some(NotificationKind::Custom { - title: "Oops, you got liquidated 💸".to_string(), - message: "Open your app to execute the liquidation".to_string(), - }), - OrderReason::CoordinatorLiquidated => Some(NotificationKind::Custom { - title: "Your counterparty got liquidated 💸".to_string(), - message: "Open your app to execute the liquidation".to_string(), - }), - OrderReason::Manual => None, - }; - - if let Some(notification) = notification { - // send user a push notification - notifier - .send(Notification::new(order.trader_id, notification)) - .await - .with_context(|| { - format!( - "Failed to send push notification. trader_id = {}", - order.trader_id - ) - })?; - } - - let order_state = if order.order_type == OrderType::Limit { - // FIXME: The maker is currently not connected to the WebSocket so we can't - // notify him about a trade. However, trades are always accepted by the - // maker at the moment so in order to not have all limit orders in order - // state `Match` we are setting the order to `Taken` even if we couldn't - // notify the maker. - OrderState::Taken - } else { - OrderState::Matched - }; - - tracing::debug!(%trader_id, order_id, "Updating the order state to {order_state:?}"); - - orders::set_order_state(&mut conn, match_param.filled_with.order_id, order_state) - .map_err(|e| anyhow!("{e:#}"))?; - } - - if let Some(channel_opening_params) = channel_opening_params { - db::channel_opening_params::insert(&mut conn, order.id, channel_opening_params) - .map_err(|e| anyhow!("{e:#}"))?; - } - - if node.inner.is_connected(order.trader_id) { - tracing::info!(trader_id = %order.trader_id, order_id = %order.id, order_reason = ?order.order_reason, "Executing trade for match"); - let trade_executor = TradeExecutor::new(node.clone(), trade_notifier); - - trade_executor - .execute(&TradeAndChannelParams { - trade_params: TradeParams { - pubkey: order.trader_id, - contract_symbol: ContractSymbol::BtcUsd, - leverage: order.leverage, - quantity: order.quantity.to_f32().expect("to fit into f32"), - direction: order.direction, - filled_with: matched_orders.taker_match.filled_with, - }, - trader_reserve: channel_opening_params.map(|p| p.trader_reserve), - coordinator_reserve: channel_opening_params.map(|p| p.coordinator_reserve), - external_funding: channel_opening_params.and_then(|c| c.external_funding), - }) - .await; - } else { - match order.order_reason { - OrderReason::Manual => { - tracing::warn!(trader_id = %order.trader_id, order_id = %order.id, order_reason = ?order.order_reason, "Skipping trade execution as trader is not connected") - } - OrderReason::Expired - | OrderReason::TraderLiquidated - | OrderReason::CoordinatorLiquidated => { - tracing::info!(trader_id = %order.trader_id, order_id = %order.id, order_reason = ?order.order_reason, "Skipping trade execution as trader is not connected") - } - } - } - - Ok(()) -} - -/// Matches an [`Order`] of [`OrderType::Market`] with a list of [`Order`]s of [`OrderType::Limit`]. -/// -/// The caller is expected to provide a list of `opposite_direction_orders` of [`OrderType::Limit`] -/// and opposite [`Direction`] to the `market_order`. We nevertheless ensure that this is the case -/// to be on the safe side. -fn match_order( - market_order: &Order, - opposite_direction_orders: Vec, - network: Network, - oracle_pk: XOnlyPublicKey, - fee_percent: Decimal, -) -> Result> { - if market_order.order_type == OrderType::Limit { - // We don't match limit orders with other limit orders at the moment. - return Ok(None); - } - - let opposite_direction_orders = opposite_direction_orders - .into_iter() - .filter(|o| !o.direction.eq(&market_order.direction)) - .collect(); - - let mut orders = sort_orders(opposite_direction_orders, market_order.direction); - - let mut remaining_quantity = market_order.quantity; - let mut matched_orders = vec![]; - while !orders.is_empty() { - let matched_order = orders.remove(0); - remaining_quantity -= matched_order.quantity; - matched_orders.push(matched_order); - - if remaining_quantity <= Decimal::ZERO { - break; - } - } - - // For the time being we do not want to support multi-matches. - if matched_orders.len() > 1 { - bail!("More than one matched order, please reduce order quantity"); - } - - if matched_orders.is_empty() { - return Ok(None); - } - - let expiry_timestamp = commons::calculate_next_expiry(OffsetDateTime::now_utc(), network); - - let matches = matched_orders - .iter() - .map(|maker_order| { - let matching_fee = market_order.quantity / maker_order.price * fee_percent; - let matching_fee = matching_fee.round_dp_with_strategy(8, RoundingStrategy::MidpointAwayFromZero); - let matching_fee = match Amount::from_btc(matching_fee.to_f64().expect("to fit")) { - Ok(fee) => {fee} - Err(err) => { - tracing::error!( - trader_pubkey = maker_order.trader_id.to_string(), - order_id = maker_order.id.to_string(), - "Failed calculating order matching fee for order {err:?}. Falling back to 0"); - Amount::ZERO - } - }; - ( - TraderMatchParams { - trader_id: maker_order.trader_id, - filled_with: FilledWith { - order_id: maker_order.id, - expiry_timestamp, - oracle_pk, - matches: vec![Match { - id: Uuid::new_v4(), - order_id: market_order.id, - quantity: market_order.quantity, - pubkey: market_order.trader_id, - execution_price: maker_order.price, - matching_fee, - }], + OrderbookMessage::Update(order) => match orderbook.update_order(order).await { + Ok(message) => message, + Err(e) => { + tracing::error!(trader_pubkey=%order.trader_id, order_id=%order.id, + "Failed to process update order. Error: {e:#}"); + continue; + } }, - }, - Match { - id: Uuid::new_v4(), - order_id: maker_order.id, - quantity: market_order.quantity, - pubkey: maker_order.trader_id, - execution_price: maker_order.price, - matching_fee, - }, - ) - }) - .collect::>(); - - let mut maker_matches = vec![]; - let mut taker_matches = vec![]; + }; - for (mm, taker_match) in matches { - maker_matches.push(mm); - taker_matches.push(taker_match); - } - - Ok(Some(MatchParams { - taker_match: TraderMatchParams { - trader_id: market_order.trader_id, - filled_with: FilledWith { - order_id: market_order.id, - expiry_timestamp, - oracle_pk, - matches: taker_matches, - }, - }, - makers_matches: maker_matches, - })) -} - -/// Sort the provided list of limit [`Order`]s based on the [`Direction`] of the market order to be -/// matched. -/// -/// For matching a market order and limit orders we have to -/// -/// - take the highest rate if the market order is short; and -/// -/// - take the lowest rate if the market order is long. -/// -/// Hence, the orders are sorted accordingly: -/// -/// - If the market order is short, the limit orders are sorted in descending order of -/// price. -/// -/// - If the market order is long, the limit orders are sorted in ascending order of price. -/// -/// Additionally, if two orders have the same price, the one with the earlier `timestamp` takes -/// precedence. -fn sort_orders(mut limit_orders: Vec, market_order_direction: Direction) -> Vec { - limit_orders.sort_by(|a, b| { - if a.price.cmp(&b.price) == Ordering::Equal { - return a.timestamp.cmp(&b.timestamp); - } + if let Some(msg) = msg { + if let Err(e) = tx_orderbook_feed.send(msg) { + tracing::error!("Failed to send orderbook message. Error: {e:#}"); + } + } + } - match market_order_direction { - // Ascending order. - Direction::Long => a.price.cmp(&b.price), - // Descending order. - Direction::Short => b.price.cmp(&a.price), + tracing::warn!("Orderbook channel has been closed."); } }); - limit_orders + Ok(sender) } -impl MatchParams { - fn matches(&self) -> Vec<&TraderMatchParams> { - std::iter::once(&self.taker_match) - .chain(self.makers_matches.iter()) - .collect() - } -} - -impl From<&TradeParams> for TraderMatchParams { - fn from(value: &TradeParams) -> Self { - TraderMatchParams { - trader_id: value.pubkey, - filled_with: value.filled_with.clone(), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use rust_decimal_macros::dec; - use std::str::FromStr; - use time::Duration; - use xxi_node::commons::ContractSymbol; - - #[test] - fn when_short_then_sort_desc() { - let order1 = dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - Default::default(), - Duration::seconds(0), - ); - let order2 = dummy_long_order( - dec!(21_000), - Uuid::new_v4(), - Default::default(), - Duration::seconds(0), - ); - let order3 = dummy_long_order( - dec!(20_500), - Uuid::new_v4(), - Default::default(), - Duration::seconds(0), - ); - - let orders = vec![order3.clone(), order1.clone(), order2.clone()]; - - let orders = sort_orders(orders, Direction::Short); - assert_eq!(orders[0], order2); - assert_eq!(orders[1], order3); - assert_eq!(orders[2], order1); - } - - #[test] - fn when_long_then_sort_asc() { - let order1 = dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - Default::default(), - Duration::seconds(0), - ); - let order2 = dummy_long_order( - dec!(21_000), - Uuid::new_v4(), - Default::default(), - Duration::seconds(0), - ); - let order3 = dummy_long_order( - dec!(20_500), - Uuid::new_v4(), - Default::default(), - Duration::seconds(0), - ); - - let orders = vec![order3.clone(), order1.clone(), order2.clone()]; - - let orders = sort_orders(orders, Direction::Long); - assert_eq!(orders[0], order1); - assert_eq!(orders[1], order3); - assert_eq!(orders[2], order2); - } - - #[test] - fn when_all_same_price_sort_by_id() { - let order1 = dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - Default::default(), - Duration::seconds(0), - ); - let order2 = dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - Default::default(), - Duration::seconds(1), - ); - let order3 = dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - Default::default(), - Duration::seconds(2), - ); - - let orders = vec![order3.clone(), order1.clone(), order2.clone()]; - - let orders = sort_orders(orders, Direction::Long); - assert_eq!(orders[0], order1); - assert_eq!(orders[1], order2); - assert_eq!(orders[2], order3); - - let orders = sort_orders(orders, Direction::Short); - assert_eq!(orders[0], order1); - assert_eq!(orders[1], order2); - assert_eq!(orders[2], order3); - } - - #[test] - fn given_limit_and_market_with_same_amount_then_match() { - let all_orders = vec![ - dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - dec!(100), - Duration::seconds(0), - ), - dummy_long_order( - dec!(21_000), - Uuid::new_v4(), - dec!(200), - Duration::seconds(0), - ), - dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - dec!(300), - Duration::seconds(0), - ), - dummy_long_order( - dec!(22_000), - Uuid::new_v4(), - dec!(400), - Duration::seconds(0), - ), - ]; - - let order = Order { - id: Uuid::new_v4(), - price: Default::default(), - trader_id: PublicKey::from_str( - "027f31ebc5462c1fdce1b737ecff52d37d75dea43ce11c74d25aa297165faa2007", - ) - .unwrap(), - direction: Direction::Short, - leverage: 1.0, - contract_symbol: ContractSymbol::BtcUsd, - quantity: dec!(100), - order_type: OrderType::Market, - timestamp: OffsetDateTime::now_utc(), - expiry: OffsetDateTime::now_utc() + Duration::minutes(1), - order_state: OrderState::Open, - order_reason: OrderReason::Manual, - stable: false, - }; - - let matched_orders = match_order( - &order, - all_orders, - Network::Bitcoin, - get_oracle_public_key(), - Decimal::ZERO, - ) - .unwrap() - .unwrap(); - - assert_eq!(matched_orders.makers_matches.len(), 1); - let maker_matches = matched_orders - .makers_matches - .first() - .unwrap() - .filled_with - .matches - .clone(); - assert_eq!(maker_matches.len(), 1); - assert_eq!(maker_matches.first().unwrap().quantity, dec!(100)); - - assert_eq!(matched_orders.taker_match.filled_with.order_id, order.id); - assert_eq!(matched_orders.taker_match.filled_with.matches.len(), 1); - assert_eq!( - matched_orders - .taker_match - .filled_with - .matches - .first() - .unwrap() - .quantity, - order.quantity - ); - } - - /// This test is for safety reasons only. Once we want multiple matches we should update it - #[test] - fn given_limit_and_market_with_smaller_amount_then_error() { - let order1 = dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - dec!(400), - Duration::seconds(0), - ); - let order2 = dummy_long_order( - dec!(21_000), - Uuid::new_v4(), - dec!(200), - Duration::seconds(0), - ); - let order3 = dummy_long_order( - dec!(22_000), - Uuid::new_v4(), - dec!(100), - Duration::seconds(0), - ); - let order4 = dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - dec!(300), - Duration::seconds(0), - ); - let all_orders = vec![order1, order2, order3, order4]; - - let order = Order { - id: Uuid::new_v4(), - price: Default::default(), - trader_id: PublicKey::from_str( - "027f31ebc5462c1fdce1b737ecff52d37d75dea43ce11c74d25aa297165faa2007", - ) - .unwrap(), - direction: Direction::Short, - leverage: 1.0, - contract_symbol: ContractSymbol::BtcUsd, - quantity: dec!(200), - order_type: OrderType::Market, - timestamp: OffsetDateTime::now_utc(), - expiry: OffsetDateTime::now_utc() + Duration::minutes(1), - order_state: OrderState::Open, - order_reason: OrderReason::Manual, - stable: false, - }; - - assert!(match_order( - &order, - all_orders, - Network::Bitcoin, - get_oracle_public_key(), - Decimal::ZERO, - ) - .is_err()); - } - - #[test] - fn given_long_when_needed_short_direction_then_no_match() { - let all_orders = vec![ - dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - dec!(100), - Duration::seconds(0), - ), - dummy_long_order( - dec!(21_000), - Uuid::new_v4(), - dec!(200), - Duration::seconds(0), - ), - dummy_long_order( - dec!(22_000), - Uuid::new_v4(), - dec!(400), - Duration::seconds(0), - ), - dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - dec!(300), - Duration::seconds(0), - ), - ]; - - let order = Order { - id: Uuid::new_v4(), - price: Default::default(), - trader_id: PublicKey::from_str( - "027f31ebc5462c1fdce1b737ecff52d37d75dea43ce11c74d25aa297165faa2007", - ) - .unwrap(), - direction: Direction::Long, - leverage: 1.0, - contract_symbol: ContractSymbol::BtcUsd, - quantity: dec!(200), - order_type: OrderType::Market, - timestamp: OffsetDateTime::now_utc(), - expiry: OffsetDateTime::now_utc() + Duration::minutes(1), - order_state: OrderState::Open, - order_reason: OrderReason::Manual, - stable: false, - }; - - let matched_orders = match_order( - &order, - all_orders, - Network::Bitcoin, - get_oracle_public_key(), - Decimal::ZERO, - ) - .unwrap(); - - assert!(matched_orders.is_none()); - } +/// Sends a push notification to the user in case of an expiry or liquidation. +async fn notify_user( + notifier: mpsc::Sender, + trader_pubkey: PublicKey, + order_reason: OrderReason, +) -> Result<()> { + let notification = match order_reason { + OrderReason::Expired => Some(NotificationKind::PositionExpired), + OrderReason::TraderLiquidated => Some(NotificationKind::Custom { + title: "Oops, you got liquidated 💸".to_string(), + message: "Open your app to execute the liquidation".to_string(), + }), + OrderReason::CoordinatorLiquidated => Some(NotificationKind::Custom { + title: "Your counterparty got liquidated 💸".to_string(), + message: "Open your app to execute the liquidation".to_string(), + }), + OrderReason::Manual => None, + }; - fn dummy_long_order( - price: Decimal, - id: Uuid, - quantity: Decimal, - timestamp_delay: Duration, - ) -> Order { - Order { - id, - price, - trader_id: PublicKey::from_str( - "027f31ebc5462c1fdce1b737ecff52d37d75dea43ce11c74d25aa297165faa2007", - ) - .unwrap(), - direction: Direction::Long, - leverage: 1.0, - contract_symbol: ContractSymbol::BtcUsd, - quantity, - order_type: OrderType::Limit, - timestamp: OffsetDateTime::now_utc() + timestamp_delay, - expiry: OffsetDateTime::now_utc() + Duration::minutes(1), - order_state: OrderState::Open, - order_reason: OrderReason::Manual, - stable: false, - } + if let Some(notification) = notification { + tracing::info!(%trader_pubkey, ?order_reason, "Notifying trader about match"); + // send user a push notification + notifier + .send(Notification::new(trader_pubkey, notification)) + .await?; } - fn get_oracle_public_key() -> XOnlyPublicKey { - XOnlyPublicKey::from_str("16f88cf7d21e6c0f46bcbc983a4e3b19726c6c98858cc31c83551a88fde171c0") - .unwrap() - } + Ok(()) } diff --git a/coordinator/src/orderbook/websocket.rs b/coordinator/src/orderbook/websocket.rs index f4f30b2a2..4fdf155fc 100644 --- a/coordinator/src/orderbook/websocket.rs +++ b/coordinator/src/orderbook/websocket.rs @@ -2,7 +2,7 @@ use crate::db; use crate::db::user; use crate::message::NewUserMessage; use crate::orderbook::db::orders; -use crate::orderbook::trading::NewOrderMessage; +use crate::orderbook::trading::OrderbookMessage; use crate::referrals; use crate::routes::AppState; use anyhow::bail; @@ -16,11 +16,11 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast::error::RecvError; use tokio::sync::mpsc; -use tokio::task::spawn_blocking; use uuid::Uuid; use xxi_node::commons::create_sign_message; use xxi_node::commons::Message; use xxi_node::commons::NewLimitOrder; +use xxi_node::commons::NewOrder; use xxi_node::commons::OrderReason; use xxi_node::commons::OrderbookRequest; use xxi_node::commons::ReferralStatus; @@ -38,23 +38,10 @@ async fn handle_insert_order( bail!("Maker {trader_id} tried to trade on behalf of someone else: {order:?}"); } - tracing::trace!(?order, "Inserting order"); - - let order = spawn_blocking({ - let mut conn = state.pool.clone().get()?; - move || { - let order = orders::insert_limit_order(&mut conn, order, OrderReason::Manual)?; - - anyhow::Ok(order) - } - }) - .await??; - let _ = state - .trading_sender - .send(NewOrderMessage { - order, - channel_opening_params: None, + .orderbook_sender + .send(OrderbookMessage::NewOrder { + new_order: NewOrder::Limit(order), order_reason: OrderReason::Manual, }) .await; @@ -64,22 +51,15 @@ async fn handle_insert_order( async fn handle_delete_order( state: Arc, - trader_id: PublicKey, + trader_pubkey: PublicKey, order_id: Uuid, ) -> Result<()> { - tracing::trace!(%order_id, "Deleting order"); - - spawn_blocking({ - let mut conn = state.pool.clone().get()?; - move || { - orders::delete_trader_order(&mut conn, order_id, trader_id)?; - - anyhow::Ok(()) - } - }) - .await??; + tracing::trace!(%trader_pubkey, %order_id, "Deleting order"); - let _ = state.tx_orderbook_feed.send(Message::DeleteOrder(order_id)); + state + .orderbook_sender + .send(OrderbookMessage::DeleteOrder(order_id)) + .await?; Ok(()) } diff --git a/coordinator/src/routes.rs b/coordinator/src/routes.rs index b52bf80f0..ea5200b9e 100644 --- a/coordinator/src/routes.rs +++ b/coordinator/src/routes.rs @@ -13,7 +13,7 @@ use crate::message::TraderMessage; use crate::node::invoice; use crate::node::Node; use crate::notifications::Notification; -use crate::orderbook::trading::NewOrderMessage; +use crate::orderbook::trading::OrderbookMessage; use crate::parse_dlc_channel_id; use crate::settings::Settings; use crate::trade::websocket::InternalPositionUpdateMessage; @@ -102,12 +102,12 @@ mod orderbook; pub struct AppState { pub node: Node, + pub orderbook_sender: mpsc::Sender, // Channel used to send messages to all connected clients. pub tx_orderbook_feed: broadcast::Sender, /// A channel used to send messages about position updates pub tx_position_feed: broadcast::Sender, pub tx_user_feed: broadcast::Sender, - pub trading_sender: mpsc::Sender, pub pool: Pool>, pub settings: RwLock, pub exporter: PrometheusExporter, @@ -126,7 +126,7 @@ pub fn router( settings: Settings, exporter: PrometheusExporter, node_alias: &str, - trading_sender: mpsc::Sender, + orderbook_sender: mpsc::Sender, tx_orderbook_feed: broadcast::Sender, tx_position_feed: broadcast::Sender, tx_user_feed: broadcast::Sender, @@ -144,7 +144,7 @@ pub fn router( tx_orderbook_feed, tx_position_feed, tx_user_feed, - trading_sender, + orderbook_sender, exporter, node_alias: node_alias.to_string(), auth_users_notifier, diff --git a/coordinator/src/routes/orderbook.rs b/coordinator/src/routes/orderbook.rs index bf719e2b9..98169c75c 100644 --- a/coordinator/src/routes/orderbook.rs +++ b/coordinator/src/routes/orderbook.rs @@ -1,12 +1,10 @@ use crate::check_version::check_version; use crate::db; use crate::orderbook; -use crate::orderbook::db::orders; -use crate::orderbook::trading::NewOrderMessage; +use crate::orderbook::trading::OrderbookMessage; use crate::orderbook::websocket::websocket_connection; use crate::routes::AppState; use crate::AppError; -use anyhow::anyhow; use anyhow::Context; use anyhow::Result; use axum::extract::ws::WebSocketUpgrade; @@ -19,12 +17,10 @@ use diesel::r2d2::PooledConnection; use diesel::PgConnection; use rust_decimal::Decimal; use std::sync::Arc; -use tokio::sync::broadcast::Sender; use tokio::task::spawn_blocking; use tracing::instrument; use uuid::Uuid; use xxi_node::commons; -use xxi_node::commons::Message; use xxi_node::commons::NewOrder; use xxi_node::commons::NewOrderRequest; use xxi_node::commons::Order; @@ -77,152 +73,152 @@ pub async fn post_order( .map_err(|_| AppError::Unauthorized)?; let new_order = new_order_request.value; - let trader_pubkey_string = new_order.trader_id().to_string(); + let trader_pubkey = new_order.trader_id(); + let order_id = new_order.id(); // TODO(holzeis): We should add a similar check eventually for limit orders (makers). - if let NewOrder::Market(new_order) = &new_order { - let mut conn = state - .pool - .get() - .map_err(|e| AppError::InternalServerError(e.to_string()))?; - check_version(&mut conn, &new_order.trader_id) - .map_err(|e| AppError::BadRequest(e.to_string()))?; - } - - let settings = state.settings.read().await; - if let NewOrder::Limit(new_order) = &new_order { - if settings.whitelist_enabled && !settings.whitelisted_makers.contains(&new_order.trader_id) - { - tracing::warn!( - trader_id = %new_order.trader_id, - "Trader tried to post limit order but was not whitelisted" - ); - return Err(AppError::Unauthorized); + match new_order { + NewOrder::Market(new_order) => { + spawn_blocking({ + let pool = state.pool.clone(); + move || { + let mut conn = pool + .get() + .context("Could not acquire database connection")?; + check_version(&mut conn, &new_order.trader_id) + } + }) + .await + .expect("task to finish") + .map_err(|e| AppError::BadRequest(e.to_string()))?; } + NewOrder::Limit(new_order) => { + if new_order.price == Decimal::ZERO { + return Err(AppError::BadRequest( + "Limit orders with zero price are not allowed".to_string(), + )); + } - if new_order.price == Decimal::ZERO { - return Err(AppError::BadRequest( - "Limit orders with zero price are not allowed".to_string(), - )); + let (whitelist_enabled, whitelisted_makers) = { + let settings = state.settings.read().await; + ( + settings.whitelist_enabled, + settings.whitelisted_makers.clone(), + ) + }; + if whitelist_enabled && !whitelisted_makers.contains(&new_order.trader_id) { + tracing::warn!( + trader_id = %new_order.trader_id, + "Trader tried to post limit order but was not whitelisted" + ); + return Err(AppError::Unauthorized); + } } } - let pool = state.pool.clone(); - let external_funding = match new_order_request - .channel_opening_params - .clone() - .and_then(|c| c.pre_image) - { - Some(pre_image) => { - let pre_image = commons::PreImage::from_url_safe_encoded_pre_image(pre_image.as_str()) - .map_err(|_| AppError::BadRequest("Invalid pre_image provided".to_string()))?; - let inner_pre_image = pre_image.get_pre_image_as_string(); - - tracing::debug!( - pre_image = inner_pre_image, - hash = pre_image.hash, - "Received pre-image, updating records" - ); - - let inner_hash = pre_image.hash.clone(); - let funding_amount = spawn_blocking(move || { + if let Some(channel_opening_params) = new_order_request.channel_opening_params { + let pool = state.pool.clone(); + let external_funding = match channel_opening_params.pre_image { + Some(pre_image) => { + let pre_image = + commons::PreImage::from_url_safe_encoded_pre_image(pre_image.as_str()) + .map_err(|_| { + AppError::BadRequest("Invalid pre_image provided".to_string()) + })?; + let inner_pre_image = pre_image.get_pre_image_as_string(); + + tracing::debug!( + pre_image = inner_pre_image, + hash = pre_image.hash, + "Received pre-image, updating records" + ); + + let inner_hash = pre_image.hash.clone(); + let funding_amount = spawn_blocking(move || { + let mut conn = pool.get()?; + + let amount = db::hodl_invoice::update_hodl_invoice_pre_image( + &mut conn, + inner_hash.as_str(), + inner_pre_image.as_str(), + )?; + + anyhow::Ok(amount) + }) + .await + .expect("task to complete") + .map_err(|e| AppError::BadRequest(format!("Invalid preimage provided: {e:#}")))?; + + state + .lnd_bridge + .settle_invoice(pre_image.get_base64_encoded_pre_image()) + .await + .map_err(|err| { + AppError::BadRequest(format!("Could not settle invoice {err:#}")) + })?; + + tracing::info!(hash = pre_image.hash, %trader_pubkey, "Settled invoice"); + + // we have received funding via lightning and can now open the channel with funding + // only from the coordinator + Some(funding_amount) + } + None => None, + }; + + // FIXME(holzeis): We shouldn't blindly trust the user about the coordinator reserve. Note, + // we already ignore the trader reserve parameter when the channel is externally + // funded. + spawn_blocking({ + let pool = state.pool.clone(); + move || { let mut conn = pool.get()?; - - let amount = db::hodl_invoice::update_hodl_invoice_pre_image( + db::channel_opening_params::insert( &mut conn, - inner_hash.as_str(), - inner_pre_image.as_str(), + order_id, + crate::ChannelOpeningParams { + trader_reserve: channel_opening_params.trader_reserve, + coordinator_reserve: channel_opening_params.coordinator_reserve, + external_funding, + }, )?; - - anyhow::Ok(amount) - }) - .await - .expect("task to complete") - .map_err(|e| AppError::BadRequest(format!("Invalid preimage provided: {e:#}")))?; - - state - .lnd_bridge - .settle_invoice(pre_image.get_base64_encoded_pre_image()) - .await - .map_err(|err| AppError::BadRequest(format!("Could not settle invoice {err:#}")))?; - - tracing::info!( - hash = pre_image.hash, - trader_pubkey = trader_pubkey_string, - "Settled invoice" - ); - // we have received funding via lightning and can now open the channel with funding - // only from the coordinator - Some(funding_amount) - } - None => None, - }; - - let pool = state.pool.clone(); - let new_order = new_order.clone(); - let order = spawn_blocking(move || { - let mut conn = pool.get()?; - - let order = match new_order { - NewOrder::Market(o) => { - orders::insert_market_order(&mut conn, o.clone(), OrderReason::Manual) - } - NewOrder::Limit(o) => orders::insert_limit_order(&mut conn, o, OrderReason::Manual), - } - .map_err(|e| anyhow!(e)) - .context("Failed to insert new order into DB")?; - - anyhow::Ok(order) - }) - .await - .expect("task to complete") - .map_err(|e| AppError::InternalServerError(e.to_string()))?; - - // FIXME(holzeis): We shouldn't blindly trust the user about the coordinator reserve. Note, we - // already ignore the trader reserve parameter when the channel is externally funded. - let message = NewOrderMessage { - order, - channel_opening_params: new_order_request.channel_opening_params.map(|params| { - crate::ChannelOpeningParams { - trader_reserve: params.trader_reserve, - coordinator_reserve: params.coordinator_reserve, - external_funding, + anyhow::Ok(()) } - }), + }) + .await + .expect("task to complete") + .map_err(|e| { + AppError::InternalServerError(format!("Failed to store channel opening params: {e:#}")) + })?; + } + + let message = OrderbookMessage::NewOrder { + new_order, order_reason: OrderReason::Manual, }; - state.trading_sender.send(message).await.map_err(|e| { + state.orderbook_sender.send(message).await.map_err(|e| { AppError::InternalServerError(format!("Failed to send new order message: {e:#}")) })?; Ok(()) } -fn update_pricefeed(pricefeed_msg: Message, sender: Sender) { - match sender.send(pricefeed_msg) { - Ok(_) => { - tracing::trace!("Pricefeed updated") - } - Err(error) => { - tracing::warn!("Could not update pricefeed due to '{error}'") - } - } -} - #[instrument(skip_all, err(Debug))] pub async fn delete_order( Path(order_id): Path, State(state): State>, -) -> Result, AppError> { - let mut conn = get_db_connection(&state)?; - let order = orderbook::db::orders::delete(&mut conn, order_id) - .map_err(|e| AppError::InternalServerError(format!("Failed to delete order: {e:#}")))?; - let sender = state.tx_orderbook_feed.clone(); - update_pricefeed(Message::DeleteOrder(order_id), sender); +) -> Result<(), AppError> { + state + .orderbook_sender + .send(OrderbookMessage::DeleteOrder(order_id)) + .await + .map_err(|e| { + AppError::InternalServerError(format!("Failed to send delete order message: {e:#}")) + })?; - Ok(Json(order)) + Ok(()) } pub async fn websocket_handler( diff --git a/coordinator/src/trade/mod.rs b/coordinator/src/trade/mod.rs index 1f69807fb..6f881f858 100644 --- a/coordinator/src/trade/mod.rs +++ b/coordinator/src/trade/mod.rs @@ -20,7 +20,6 @@ use bitcoin::secp256k1::PublicKey; use bitcoin::Amount; use bitcoin::SignedAmount; use diesel::Connection; -use diesel::PgConnection; use dlc_manager::channel::signed_channel::SignedChannel; use dlc_manager::channel::signed_channel::SignedChannelState; use dlc_manager::channel::Channel; @@ -35,6 +34,7 @@ use rust_decimal::prelude::ToPrimitive; use rust_decimal::Decimal; use time::OffsetDateTime; use tokio::sync::mpsc; +use tokio::task::spawn_blocking; use uuid::Uuid; use xxi_node::bitcoin_conversion::to_secp_pk_29; use xxi_node::bitcoin_conversion::to_xonly_pk_29; @@ -43,11 +43,16 @@ use xxi_node::cfd::calculate_margin; use xxi_node::cfd::calculate_pnl; use xxi_node::cfd::calculate_short_liquidation_price; use xxi_node::commons; +use xxi_node::commons::ContractSymbol; use xxi_node::commons::Direction; +use xxi_node::commons::FilledWith; +use xxi_node::commons::Match; use xxi_node::commons::MatchState; +use xxi_node::commons::Matches; use xxi_node::commons::Message; +use xxi_node::commons::Order; +use xxi_node::commons::OrderReason; use xxi_node::commons::OrderState; -use xxi_node::commons::TradeAndChannelParams; use xxi_node::commons::TradeParams; use xxi_node::node::dlc_channel::estimated_dlc_channel_fee_reserve; use xxi_node::node::dlc_channel::estimated_funding_transaction_fee; @@ -60,9 +65,6 @@ pub mod websocket; enum TradeAction { OpenDlcChannel, - OpenSingleFundedChannel { - external_funding: Amount, - }, OpenPosition { channel_id: DlcChannelId, own_payout: u64, @@ -98,7 +100,66 @@ enum ResizeAction { }, } -pub struct TradeExecutor { +pub struct ExecutableMatch { + pub order: Order, + pub matches: Vec, +} + +impl ExecutableMatch { + pub fn build_matches(&self) -> Vec { + self.matches + .clone() + .into_iter() + .map(|m| m.into()) + .collect::>() + } + + pub fn sum_quantity(&self) -> Decimal { + self.matches.iter().map(|m| m.quantity).sum() + } +} + +pub fn spawn_trade_executor( + node: Node, + trade_notifier: mpsc::Sender, +) -> Result> { + let trade_executor = TradeExecutor::new(node.clone(), trade_notifier); + + let (sender, mut receiver) = mpsc::channel::(100); + + tokio::spawn({ + let node = node.clone(); + async move { + while let Some(executeable_match) = receiver.recv().await { + let trader_pubkey = executeable_match.order.trader_id; + let order_id = executeable_match.order.id; + let order_reason = executeable_match.order.order_reason; + + if node.inner.is_connected(trader_pubkey) { + tracing::info!(%trader_pubkey, %order_id, ?order_reason, "Executing trade for match"); + trade_executor.execute(executeable_match).await; + } else { + match order_reason { + OrderReason::Manual => { + tracing::warn!(%trader_pubkey, %order_id, ?order_reason, "Skipping trade execution as trader is not connected") + } + OrderReason::Expired + | OrderReason::TraderLiquidated + | OrderReason::CoordinatorLiquidated => { + tracing::info!(%trader_pubkey, %order_id, ?order_reason, "Skipping trade execution as trader is not connected") + } + } + } + } + + tracing::error!("Trade executor channel closed"); + } + }); + + Ok(sender) +} + +struct TradeExecutor { node: Node, notifier: mpsc::Sender, } @@ -116,27 +177,29 @@ enum TraderRequiredLiquidity { } impl TradeExecutor { - pub fn new(node: Node, notifier: mpsc::Sender) -> Self { + fn new(node: Node, notifier: mpsc::Sender) -> Self { Self { node, notifier } } - pub async fn execute(&self, params: &TradeAndChannelParams) { - let trader_id = params.trade_params.pubkey; - let order_id = params.trade_params.filled_with.order_id; + pub async fn execute(&self, executeable_match: ExecutableMatch) { + let trader_pubkey = executeable_match.order.trader_id; + let order_id = executeable_match.order.id; - match self.execute_internal(params).await { + match self.execute_internal(executeable_match).await { Ok(()) => { tracing::info!( - %trader_id, + %trader_pubkey, %order_id, "Successfully processed match, setting match to Filled" ); + // TODO(holzeis): It's not ideal that the order and match are updated by the trade + // executor. This should rather get updated by the orderbook. if let Err(e) = self.update_order_and_match(order_id, MatchState::Filled, OrderState::Taken) { tracing::error!( - %trader_id, + %trader_pubkey, %order_id, "Failed to update order and match state. Error: {e:#}" ); @@ -147,19 +210,23 @@ impl TradeExecutor { self.node .inner .event_handler - .publish(NodeEvent::SendLastDlcMessage { peer: trader_id }); + .publish(NodeEvent::SendLastDlcMessage { + peer: trader_pubkey, + }); } Err(e) => { - tracing::error!(%trader_id, %order_id,"Failed to execute trade. Error: {e:#}"); + tracing::error!(%trader_pubkey, %order_id,"Failed to execute trade. Error: {e:#}"); + // TODO(holzeis): It's not ideal that the order and match are updated by the trade + // executor. This should rather get updated by the orderbook. if let Err(e) = self.update_order_and_match(order_id, MatchState::Failed, OrderState::Failed) { - tracing::error!(%trader_id, %order_id, "Failed to update order and match: {e}"); + tracing::error!(%trader_pubkey, %order_id, "Failed to update order and match: {e}"); }; let message = TraderMessage { - trader_id, + trader_id: trader_pubkey, message: Message::TradeError { order_id, error: e.into(), @@ -185,13 +252,32 @@ impl TradeExecutor { /// 2. If no position is found, we open a position. /// /// 3. If a position of differing quantity is found, we resize the position. - async fn execute_internal(&self, params: &TradeAndChannelParams) -> Result<()> { - let mut connection = self.node.pool.get()?; + async fn execute_internal(&self, executable_match: ExecutableMatch) -> Result<()> { + let trader_pubkey = executable_match.order.trader_id; + let order_id = executable_match.order.id; + + let expiry_timestamp = + commons::calculate_next_expiry(OffsetDateTime::now_utc(), self.node.inner.network); + + let order = executable_match.order; - let order_id = params.trade_params.filled_with.order_id; - let trader_id = params.trade_params.pubkey; - let order = - orders::get_with_id(&mut connection, order_id)?.context("Could not find order")?; + let filled_with = FilledWith { + order_id, + expiry_timestamp, + oracle_pk: self.node.inner.oracle_pubkey, + matches: executable_match.build_matches(), + }; + + let trade_params = TradeParams { + pubkey: trader_pubkey, + contract_symbol: ContractSymbol::BtcUsd, + leverage: order.leverage, + quantity: executable_match.sum_quantity().to_f32().expect("to fit"), + direction: order.direction, + filled_with, + }; + + let trader_pubkey = trade_params.pubkey; let is_stable_order = order.stable; ensure!( @@ -200,13 +286,13 @@ impl TradeExecutor { ); ensure!( order.order_state == OrderState::Matched, - "Can't execute trade with in invalid state {:?}", + "Can't execute trade with an invalid state {:?}", order.order_state ); - tracing::info!(%trader_id, %order_id, "Executing match"); + tracing::info!(%trader_pubkey, %order_id, "Executing match"); - let trade_action = self.determine_trade_action(&mut connection, params).await?; + let trade_action = self.determine_trade_action(&trade_params).await?; ensure!( matches!(trade_action, TradeAction::ClosePosition { .. }) @@ -216,70 +302,93 @@ impl TradeExecutor { match trade_action { TradeAction::OpenDlcChannel => { - let collateral_reserve_coordinator = params - .coordinator_reserve - .context("Missing coordinator collateral reserve")?; - let collateral_reserve_trader = params - .trader_reserve - .context("Missing trader collateral reserve")?; + let channel_opening_params = spawn_blocking({ + let pool = self.node.pool.clone(); + move || { + let mut conn = pool.get()?; + let params = + db::channel_opening_params::get_by_order_id(&mut conn, order_id)?; + anyhow::Ok(params) + } + }) + .await?? + .context("Missing channel opening params")?; - self.open_dlc_channel( - &mut connection, - ¶ms.trade_params, + let ( collateral_reserve_coordinator, collateral_reserve_trader, - is_stable_order, - TraderRequiredLiquidity::ForTradeCostAndTxFees, - ) - .await - .context("Failed to open DLC channel")?; - } - TradeAction::OpenSingleFundedChannel { external_funding } => { - let collateral_reserve_coordinator = params - .coordinator_reserve - .context("Missing coordinator collateral reserve")?; - let order_matching_fee = params.trade_params.order_matching_fee(); - let margin_trader = Amount::from_sat(margin_trader(¶ms.trade_params)); - - let fee_rate = self - .node - .inner - .fee_rate_estimator - .get(ConfirmationTarget::Normal); - - // The on chain fees are split evenly between the two parties. - let funding_transaction_fee = - estimated_funding_transaction_fee(fee_rate.as_sat_per_vb() as f64) / 2; - - let channel_fee_reserve = - estimated_dlc_channel_fee_reserve(fee_rate.as_sat_per_vb() as f64) / 2; - - // If the user funded the channel externally we derive the collateral reserve - // trader from the difference of the trader margin and the - // externally received funds. - // - // TODO(holzeis): Introduce margin orders to directly use the - // external_funding_sats for the position instead of failing here. We need - // to do this though as a malicious actor could otherwise drain us. - // - // Note, we add a min trader reserve to the external funding to ensure that - // minor price movements are covered. - let collateral_reserve_trader = external_funding - .checked_sub( - margin_trader - + order_matching_fee - + funding_transaction_fee - + channel_fee_reserve, - ) - .context("Not enough external funds to open position")?; + trader_required_utxos, + ) = match channel_opening_params.external_funding { + Some(external_funding) => { + let order_matching_fee = trade_params.order_matching_fee(); + let margin_trader = Amount::from_sat(margin_trader(&trade_params)); + + let fee_rate = self + .node + .inner + .fee_rate_estimator + .get(ConfirmationTarget::Normal); + + // The on chain fees are split evenly between the two parties. + let funding_transaction_fee = + estimated_funding_transaction_fee(fee_rate.as_sat_per_vb() as f64) / 2; + + let channel_fee_reserve = + estimated_dlc_channel_fee_reserve(fee_rate.as_sat_per_vb() as f64) / 2; + + tracing::info!("=====external_funding=====> {}", external_funding.to_sat()); + tracing::info!("=====margin_trader=====> {}", margin_trader.to_sat()); + tracing::info!( + "=====order_matching_fee=====> {}", + order_matching_fee.to_sat() + ); + tracing::info!( + "=====funding_transaction_fee=====> {}", + funding_transaction_fee.to_sat() + ); + tracing::info!( + "=====channel_fee_reserve=====> {}", + channel_fee_reserve.to_sat() + ); + + // If the user funded the channel externally we derive the collateral + // reserve trader from the difference of the trader + // margin and the externally received funds. + // + // TODO(holzeis): Introduce margin orders to directly use the + // external_funding_sats for the position instead of failing here. We need + // to do this though as a malicious actor could otherwise drain us. + // + // Note, we add a min trader reserve to the external funding to ensure that + // minor price movements are covered. + let collateral_reserve_trader = external_funding + .checked_sub( + margin_trader + + order_matching_fee + + funding_transaction_fee + + channel_fee_reserve, + ) + .context("Not enough external funds to open position")?; + + ( + channel_opening_params.coordinator_reserve, + collateral_reserve_trader, + TraderRequiredLiquidity::None, + ) + } + None => ( + channel_opening_params.coordinator_reserve, + channel_opening_params.trader_reserve, + TraderRequiredLiquidity::ForTradeCostAndTxFees, + ), + }; self.open_dlc_channel( - &mut connection, - ¶ms.trade_params, + &trade_params, collateral_reserve_coordinator, collateral_reserve_trader, is_stable_order, - TraderRequiredLiquidity::None, + trader_required_utxos, ) .await .context("Failed to open DLC channel")?; @@ -290,9 +399,8 @@ impl TradeExecutor { counter_payout, } => self .open_position( - &mut connection, channel_id, - ¶ms.trade_params, + &trade_params, own_payout, counter_payout, is_stable_order, @@ -303,13 +411,7 @@ impl TradeExecutor { channel_id, position, } => self - .start_closing_position( - &mut connection, - order, - &position, - ¶ms.trade_params, - channel_id, - ) + .start_closing_position(order, &position, &trade_params, channel_id) .await .with_context(|| format!("Failed to close position {}", position.id))?, TradeAction::ResizePosition { @@ -317,13 +419,7 @@ impl TradeExecutor { position, resize_action, } => self - .resize_position( - &mut connection, - channel_id, - &position, - ¶ms.trade_params, - resize_action, - ) + .resize_position(channel_id, &position, &trade_params, resize_action) .await .with_context(|| format!("Failed to resize position {}", position.id))?, }; @@ -333,7 +429,6 @@ impl TradeExecutor { async fn open_dlc_channel( &self, - conn: &mut PgConnection, trade_params: &TradeParams, collateral_reserve_coordinator: Amount, collateral_reserve_trader: Amount, @@ -486,7 +581,6 @@ impl TradeExecutor { // TODO(holzeis): The position should only get created after the dlc protocol has finished // successfully. self.persist_position( - conn, trade_params, temporary_contract_id, leverage_coordinator, @@ -498,7 +592,6 @@ impl TradeExecutor { async fn open_position( &self, - conn: &mut PgConnection, dlc_channel_id: DlcChannelId, trade_params: &TradeParams, coordinator_dlc_channel_collateral: u64, @@ -654,7 +747,6 @@ impl TradeExecutor { // TODO(holzeis): The position should only get created after the dlc protocol has finished // successfully. self.persist_position( - conn, trade_params, temporary_contract_id, leverage_coordinator, @@ -666,7 +758,6 @@ impl TradeExecutor { async fn resize_position( &self, - conn: &mut PgConnection, dlc_channel_id: DlcChannelId, position: &Position, trade_params: &TradeParams, @@ -824,28 +915,36 @@ impl TradeExecutor { DlcProtocolType::resize_position(trade_params, protocol_id, realized_pnl), )?; - db::positions::Position::set_position_to_resizing( - conn, - peer_id, - temporary_contract_id, - contracts, - coordinator_direction.opposite(), - margin_trader, - margin_coordinator, - average_execution_price, - expiry_timestamp, - coordinator_liquidation_price, - trader_liquidation_price, - realized_pnl, - order_matching_fee, - )?; + spawn_blocking({ + let pool = self.node.pool.clone(); + move || { + let mut conn = pool.get()?; + db::positions::Position::set_position_to_resizing( + &mut conn, + peer_id, + temporary_contract_id, + contracts, + coordinator_direction.opposite(), + margin_trader, + margin_coordinator, + average_execution_price, + expiry_timestamp, + coordinator_liquidation_price, + trader_liquidation_price, + realized_pnl, + order_matching_fee, + )?; + + anyhow::Ok(()) + } + }) + .await??; Ok(()) } async fn persist_position( &self, - connection: &mut PgConnection, trade_params: &TradeParams, temporary_contract_id: ContractId, coordinator_leverage: f32, @@ -900,15 +999,20 @@ impl TradeExecutor { // TODO(holzeis): We should only create the position once the dlc protocol finished // successfully. - db::positions::Position::insert(connection, new_position.clone())?; - - Ok(()) + spawn_blocking({ + let pool = self.node.pool.clone(); + move || { + let mut conn = pool.get()?; + db::positions::Position::insert(&mut conn, new_position.clone())?; + anyhow::Ok(()) + } + }) + .await? } pub async fn start_closing_position( &self, - conn: &mut PgConnection, - order: commons::Order, + order: Order, position: &Position, trade_params: &TradeParams, channel_id: DlcChannelId, @@ -980,11 +1084,20 @@ impl TradeExecutor { DlcProtocolType::settle(trade_params, protocol_id), )?; - db::positions::Position::set_open_position_to_closing( - conn, - &position.trader, - Some(closing_price), - )?; + spawn_blocking({ + let pool = self.node.pool.clone(); + let trader_pubkey = position.trader; + move || { + let mut conn = pool.get()?; + db::positions::Position::set_open_position_to_closing( + &mut conn, + &trader_pubkey, + Some(closing_price), + )?; + anyhow::Ok(()) + } + }) + .await??; Ok(()) } @@ -1007,17 +1120,13 @@ impl TradeExecutor { .map_err(|e| anyhow!("Failed to update order and match. Error: {e:#}")) } - async fn determine_trade_action( - &self, - connection: &mut PgConnection, - params: &TradeAndChannelParams, - ) -> Result { - let trader_id = params.trade_params.pubkey; + async fn determine_trade_action(&self, trade_params: &TradeParams) -> Result { + let trader_pubkey = trade_params.pubkey; let trade_action = match self .node .inner - .get_signed_dlc_channel_by_counterparty(&trader_id)? + .get_signed_dlc_channel_by_counterparty(&trader_pubkey)? { None => { ensure!( @@ -1026,17 +1135,12 @@ impl TradeExecutor { .inner .list_dlc_channels()? .iter() - .filter(|c| c.get_counter_party_id() == to_secp_pk_29(trader_id)) + .filter(|c| c.get_counter_party_id() == to_secp_pk_29(trader_pubkey)) .any(|c| matches!(c, Channel::Offered(_) | Channel::Accepted(_))), "Previous DLC Channel offer still pending." ); - match params.external_funding { - Some(external_funding) => { - TradeAction::OpenSingleFundedChannel { external_funding } - } - None => TradeAction::OpenDlcChannel, - } + TradeAction::OpenDlcChannel } Some(SignedChannel { channel_id, @@ -1057,14 +1161,21 @@ impl TradeExecutor { channel_id, .. }) => { - let trade_params = ¶ms.trade_params; - - let position = db::positions::Position::get_position_by_trader( - connection, - trader_id, - vec![PositionState::Open], - )? - .context("Failed to find open position")?; + let position = spawn_blocking({ + let pool = self.node.pool.clone(); + move || { + let mut conn = pool.get()?; + let position = db::positions::Position::get_position_by_trader( + &mut conn, + trader_pubkey, + vec![PositionState::Open], + )? + .context("Failed to find open position")?; + + anyhow::Ok(position) + } + }) + .await??; let position_contracts = { let contracts = decimal_from_f32(position.quantity); diff --git a/crates/dev-maker/src/main.rs b/crates/dev-maker/src/main.rs index 321b2674e..f62ec9f68 100644 --- a/crates/dev-maker/src/main.rs +++ b/crates/dev-maker/src/main.rs @@ -113,7 +113,7 @@ async fn post_order( id: uuid, contract_symbol: ContractSymbol::BtcUsd, price, - quantity: Decimal::from(5000), + quantity: Decimal::from(1000), trader_id: public_key, direction, leverage: Decimal::from(2), diff --git a/crates/xxi-node/src/commons/message.rs b/crates/xxi-node/src/commons/message.rs index 23f4489e3..da2b06144 100644 --- a/crates/xxi-node/src/commons/message.rs +++ b/crates/xxi-node/src/commons/message.rs @@ -85,7 +85,11 @@ pub enum OrderbookRequest { os: Option, signature: Signature, }, + // TODO(holzeis): This orderbook request seems to be unused and a duplicate to our http + // endpoint. I guess we can remove it. InsertOrder(NewLimitOrder), + // TODO(holzeis): This orderbook request seems to be unused and a duplicate to our http + // endpoint. I guess we can remove it. DeleteOrder(Uuid), } diff --git a/crates/xxi-node/src/commons/order.rs b/crates/xxi-node/src/commons/order.rs index 266ad35a2..849ffec73 100644 --- a/crates/xxi-node/src/commons/order.rs +++ b/crates/xxi-node/src/commons/order.rs @@ -31,7 +31,7 @@ impl NewOrderRequest { } } -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, Copy)] pub enum NewOrder { Market(NewMarketOrder), Limit(NewLimitOrder), @@ -82,7 +82,7 @@ impl NewOrder { } } -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Copy, Debug)] pub struct NewMarketOrder { pub id: Uuid, pub contract_symbol: ContractSymbol, @@ -188,7 +188,7 @@ impl OrderType { } } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)] pub enum OrderState { Open, Matched, @@ -198,7 +198,7 @@ pub enum OrderState { Deleted, } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)] pub enum OrderReason { Manual, Expired, @@ -206,7 +206,7 @@ pub enum OrderReason { TraderLiquidated, } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Copy)] pub struct Order { pub id: Uuid, #[serde(with = "rust_decimal::serde::float")] @@ -227,6 +227,13 @@ pub struct Order { pub stable: bool, } +impl Order { + /// Returns true if the order is expired. + pub fn is_expired(&self) -> bool { + OffsetDateTime::now_utc() > self.expiry + } +} + /// Extra information required to open a DLC channel, independent of the [`TradeParams`] associated /// with the filled order. /// diff --git a/crates/xxi-node/src/commons/trade.rs b/crates/xxi-node/src/commons/trade.rs index a79c27800..7a28b6dce 100644 --- a/crates/xxi-node/src/commons/trade.rs +++ b/crates/xxi-node/src/commons/trade.rs @@ -9,17 +9,6 @@ use serde::Serialize; use time::OffsetDateTime; use uuid::Uuid; -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct TradeAndChannelParams { - pub trade_params: TradeParams, - #[serde(with = "bitcoin::amount::serde::as_sat::opt")] - pub trader_reserve: Option, - #[serde(with = "bitcoin::amount::serde::as_sat::opt")] - pub coordinator_reserve: Option, - #[serde(with = "bitcoin::amount::serde::as_sat::opt")] - pub external_funding: Option, -} - /// The trade parameters defining the trade execution. /// /// Emitted by the orderbook when a match is found. @@ -187,12 +176,14 @@ pub fn average_execution_price(matches: Vec) -> Decimal { sum_quantity / nominal_prices } +#[derive(Clone, Copy)] pub enum MatchState { Pending, Filled, Failed, } +#[derive(Clone, Copy)] pub struct Matches { pub id: Uuid, pub match_state: MatchState, diff --git a/crates/xxi-node/src/message_handler.rs b/crates/xxi-node/src/message_handler.rs index f418be614..195eddde4 100644 --- a/crates/xxi-node/src/message_handler.rs +++ b/crates/xxi-node/src/message_handler.rs @@ -635,7 +635,7 @@ impl TenTenOneMessage { | TenTenOneMessage::SettleAccept(TenTenOneSettleAccept { order_reason, .. }) | TenTenOneMessage::SettleConfirm(TenTenOneSettleConfirm { order_reason, .. }) | TenTenOneMessage::SettleFinalize(TenTenOneSettleFinalize { order_reason, .. }) => { - Some(order_reason.clone()) + Some(*order_reason) } TenTenOneMessage::Offer(_) | TenTenOneMessage::Accept(_) diff --git a/crates/xxi-node/src/tests/dlc_channel.rs b/crates/xxi-node/src/tests/dlc_channel.rs index 883d89ce4..960a3455d 100644 --- a/crates/xxi-node/src/tests/dlc_channel.rs +++ b/crates/xxi-node/src/tests/dlc_channel.rs @@ -587,7 +587,7 @@ async fn open_channel_and_position_and_settle_position( let order = dummy_order(); coordinator .propose_dlc_channel_collaborative_settlement( - order.clone(), + order, filled_with.clone(), &coordinator_signed_channel.channel_id, coordinator_dlc_collateral.to_sat() / 2, diff --git a/mobile/native/src/dlc/node.rs b/mobile/native/src/dlc/node.rs index ffac0ef14..e8529b8ea 100644 --- a/mobile/native/src/dlc/node.rs +++ b/mobile/native/src/dlc/node.rs @@ -596,7 +596,7 @@ impl Node { #[instrument(fields(channel_id = hex::encode(offer.settle_offer.channel_id)),skip_all, err(Debug))] pub fn process_settle_offer(&self, offer: &TenTenOneSettleOffer) -> Result<()> { // TODO(holzeis): We should check if the offered amounts are expected. - let order_reason = offer.order.clone().order_reason; + let order_reason = offer.order.order_reason; let order_id = offer.order.id; match order_reason { diff --git a/mobile/native/src/trade/order/handler.rs b/mobile/native/src/trade/order/handler.rs index cbda52730..c4e28d38d 100644 --- a/mobile/native/src/trade/order/handler.rs +++ b/mobile/native/src/trade/order/handler.rs @@ -230,7 +230,7 @@ pub(crate) fn async_order_filling( }, creation_timestamp: order.timestamp, order_expiry_timestamp: order.expiry, - reason: order.order_reason.clone().into(), + reason: order.order_reason.into(), stable: order.stable, failure_reason: None, };