diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index 0ddb2e258..6908236c0 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -17,7 +17,6 @@ use coordinator::node::storage::NodeStorage; use coordinator::node::unrealized_pnl; use coordinator::node::Node; use coordinator::notifications::NotificationService; -use coordinator::orderbook; use coordinator::orderbook::async_match; use coordinator::orderbook::collaborative_revert; use coordinator::orderbook::trading; @@ -26,12 +25,15 @@ use coordinator::run_migration; use coordinator::scheduler::NotificationScheduler; use coordinator::settings::Settings; use coordinator::storage::CoordinatorTenTenOneStorage; +use coordinator::trade; use coordinator::trade::websocket::InternalPositionUpdateMessage; use diesel::r2d2; use diesel::r2d2::ConnectionManager; use diesel::PgConnection; use rand::thread_rng; use rand::RngCore; +use rust_decimal::prelude::FromPrimitive; +use rust_decimal::Decimal; use std::backtrace::Backtrace; use std::net::IpAddr; use std::net::Ipv4Addr; @@ -257,24 +259,22 @@ async fn main() -> Result<()> { let (tx_orderbook_feed, _rx) = broadcast::channel(100); - let orderbook_sender = - orderbook::spawn_orderbook(node.pool.clone(), tx_orderbook_feed.clone())?; + let trade_executor = trade::spawn_trade_executor(node.clone(), auth_users_notifier.clone())?; - let (_handle, trading_sender) = trading::start( - node.clone(), - orderbook_sender.clone(), - auth_users_notifier.clone(), + let order_matching_fee_rate = + Decimal::from_f32(node.settings.read().await.order_matching_fee_rate).expect("to fit"); + + let orderbook_sender = trading::spawn_orderbook( + node.pool.clone(), notification_service.get_sender(), - network, - node.inner.oracle_pubkey, - ); - let _handle = async_match::monitor( - node.clone(), - node_event_handler.subscribe(), - auth_users_notifier.clone(), - network, - node.inner.oracle_pubkey, - ); + trade_executor.clone(), + tx_orderbook_feed.clone(), + order_matching_fee_rate, + )?; + + let _handle = + async_match::monitor(node.clone(), node_event_handler.subscribe(), trade_executor); + let _handle = rollover::monitor( pool.clone(), node_event_handler.subscribe(), @@ -293,11 +293,12 @@ async fn main() -> Result<()> { tokio::spawn({ let node = node.clone(); - let trading_sender = trading_sender.clone(); + let orderbook_sender = orderbook_sender.clone(); async move { loop { tokio::time::sleep(EXPIRED_POSITION_SYNC_INTERVAL).await; - if let Err(e) = expired_positions::close(node.clone(), trading_sender.clone()).await + if let Err(e) = + expired_positions::close(node.clone(), orderbook_sender.clone()).await { tracing::error!("Failed to close expired positions! Error: {e:#}"); } @@ -307,11 +308,11 @@ async fn main() -> Result<()> { tokio::spawn({ let node = node.clone(); - let trading_sender = trading_sender.clone(); + let orderbook_sender = orderbook_sender.clone(); async move { loop { tokio::time::sleep(LIQUIDATED_POSITION_SYNC_INTERVAL).await; - liquidated_positions::monitor(node.clone(), trading_sender.clone()).await + liquidated_positions::monitor(node.clone(), orderbook_sender.clone()).await } } }); @@ -326,7 +327,6 @@ async fn main() -> Result<()> { opts.p2p_announcement_addresses(), NODE_ALIAS, orderbook_sender, - trading_sender, tx_orderbook_feed, tx_position_feed, tx_user_feed, diff --git a/coordinator/src/node/expired_positions.rs b/coordinator/src/node/expired_positions.rs index ba8c7a7e0..46f6a1a97 100644 --- a/coordinator/src/node/expired_positions.rs +++ b/coordinator/src/node/expired_positions.rs @@ -1,7 +1,7 @@ use crate::db; use crate::node::Node; use crate::orderbook; -use crate::orderbook::trading::NewOrderMessage; +use crate::orderbook::trading::OrderbookMessage; use crate::position::models::Position; use crate::position::models::PositionState; use anyhow::Context; @@ -24,7 +24,7 @@ use xxi_node::commons::OrderState; /// not be larger than our refund transaction time lock. pub const EXPIRED_POSITION_TIMEOUT: Duration = Duration::days(7); -pub async fn close(node: Node, trading_sender: mpsc::Sender) -> Result<()> { +pub async fn close(node: Node, orderbook_sender: mpsc::Sender) -> Result<()> { let mut conn = node.pool.get()?; let positions = db::positions::Position::get_all_open_positions(&mut conn) @@ -49,8 +49,9 @@ pub async fn close(node: Node, trading_sender: mpsc::Sender) -> if order.expiry < OffsetDateTime::now_utc() { tracing::warn!(trader_id, order_id, "Matched order expired! Giving up on that position, looks like the corresponding dlc channel has to get force closed."); + // TODO(holzeis): It's not ideal that the order and match are updated by the trade + // executor. This should rather get updated by the orderbook. orderbook::db::orders::set_order_state(&mut conn, order.id, OrderState::Expired)?; - orderbook::db::matches::set_match_state_by_order_id( &mut conn, order.id, @@ -90,13 +91,12 @@ pub async fn close(node: Node, trading_sender: mpsc::Sender) -> stable: position.stable, }; - let message = NewOrderMessage { + let message = OrderbookMessage::NewOrder { new_order: NewOrder::Market(new_order), - channel_opening_params: None, order_reason: OrderReason::Expired, }; - if let Err(e) = trading_sender.send(message).await { + if let Err(e) = orderbook_sender.send(message).await { tracing::error!(%trader_pubkey, %order_id, "Failed to submit new order for closing expired position. Error: {e:#}"); continue; } diff --git a/coordinator/src/node/liquidated_positions.rs b/coordinator/src/node/liquidated_positions.rs index 73719b7f6..acb01f78e 100644 --- a/coordinator/src/node/liquidated_positions.rs +++ b/coordinator/src/node/liquidated_positions.rs @@ -1,7 +1,7 @@ use crate::db; use crate::node::Node; use crate::orderbook; -use crate::orderbook::trading::NewOrderMessage; +use crate::orderbook::trading::OrderbookMessage; use anyhow::Result; use rust_decimal::prelude::FromPrimitive; use rust_decimal::Decimal; @@ -24,9 +24,9 @@ use xxi_node::commons::OrderState; /// should not be larger than our refund transaction time lock. pub const LIQUIDATION_POSITION_TIMEOUT: Duration = Duration::days(7); -pub async fn monitor(node: Node, trading_sender: mpsc::Sender) { +pub async fn monitor(node: Node, orderbook_sender: mpsc::Sender) { if let Err(e) = - check_if_positions_need_to_get_liquidated(trading_sender.clone(), node.clone()).await + check_if_positions_need_to_get_liquidated(orderbook_sender.clone(), node.clone()).await { tracing::error!("Failed to check if positions need to get liquidated. Error: {e:#}"); } @@ -35,7 +35,7 @@ pub async fn monitor(node: Node, trading_sender: mpsc::Sender) /// For all open positions, check if the maintenance margin has been reached. Send a liquidation /// async match to the traders whose positions have been liquidated. async fn check_if_positions_need_to_get_liquidated( - trading_sender: mpsc::Sender, + orderbook_sender: mpsc::Sender, node: Node, ) -> Result<()> { let mut conn = node.pool.get()?; @@ -142,13 +142,12 @@ async fn check_if_positions_need_to_get_liquidated( false => OrderReason::CoordinatorLiquidated, }; - let message = NewOrderMessage { + let message = OrderbookMessage::NewOrder { new_order: NewOrder::Market(new_order), - channel_opening_params: None, order_reason, }; - if let Err(e) = trading_sender.send(message).await { + if let Err(e) = orderbook_sender.send(message).await { tracing::error!(%trader_pubkey, %order_id, "Failed to submit new order for closing liquidated position. Error: {e:#}"); continue; } diff --git a/coordinator/src/orderbook/async_match.rs b/coordinator/src/orderbook/async_match.rs index ff7c850d5..ab9200e2e 100644 --- a/coordinator/src/orderbook/async_match.rs +++ b/coordinator/src/orderbook/async_match.rs @@ -1,55 +1,38 @@ use crate::check_version::check_version; -use crate::db; -use crate::message::TraderMessage; use crate::node::Node; use crate::orderbook::db::matches; use crate::orderbook::db::orders; -use crate::trade::TradeExecutor; -use anyhow::ensure; +use crate::trade::ExecutableMatch; use anyhow::Result; use bitcoin::secp256k1::PublicKey; -use bitcoin::secp256k1::XOnlyPublicKey; -use bitcoin::Network; use futures::future::RemoteHandle; use futures::FutureExt; -use rust_decimal::prelude::ToPrimitive; -use time::OffsetDateTime; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; use tokio::sync::mpsc; use tokio::task::spawn_blocking; -use xxi_node::commons; -use xxi_node::commons::ContractSymbol; -use xxi_node::commons::FilledWith; -use xxi_node::commons::Match; -use xxi_node::commons::Matches; use xxi_node::commons::OrderState; -use xxi_node::commons::TradeAndChannelParams; -use xxi_node::commons::TradeParams; use xxi_node::node::event::NodeEvent; pub fn monitor( node: Node, mut receiver: broadcast::Receiver, - notifier: mpsc::Sender, - network: Network, - oracle_pk: XOnlyPublicKey, + trade_executor: mpsc::Sender, ) -> RemoteHandle<()> { let (fut, remote_handle) = async move { loop { match receiver.recv().await { Ok(NodeEvent::Connected { peer: trader_id }) => { tokio::spawn({ - let notifier = notifier.clone(); let node = node.clone(); + let trade_executor = trade_executor.clone(); async move { tracing::debug!( %trader_id, "Checking if the user needs to be notified about pending matches" ); if let Err(e) = - process_pending_match(node, notifier, trader_id, network, oracle_pk) - .await + process_pending_match(node, trade_executor, trader_id).await { tracing::error!("Failed to process pending match. Error: {e:#}"); } @@ -77,10 +60,8 @@ pub fn monitor( /// Checks if there are any pending matches async fn process_pending_match( node: Node, - notifier: mpsc::Sender, - trader_id: PublicKey, - network: Network, - oracle_pk: XOnlyPublicKey, + trade_executor: mpsc::Sender, + trader_pubkey: PublicKey, ) -> Result<()> { let mut conn = spawn_blocking({ let node = node.clone(); @@ -89,74 +70,26 @@ async fn process_pending_match( .await .expect("task to complete")?; - if check_version(&mut conn, &trader_id).is_err() { - tracing::info!(%trader_id, "User is not on the latest version. Skipping check if user needs to be informed about pending matches."); + if check_version(&mut conn, &trader_pubkey).is_err() { + tracing::info!(%trader_pubkey, "User is not on the latest version. Skipping check if user needs to be informed about pending matches."); return Ok(()); } if let Some(order) = - orders::get_by_trader_id_and_state(&mut conn, trader_id, OrderState::Matched)? + orders::get_by_trader_id_and_state(&mut conn, trader_pubkey, OrderState::Matched)? { - tracing::debug!(%trader_id, order_id=%order.id, "Executing pending match"); + tracing::debug!(%trader_pubkey, order_id=%order.id, "Executing pending match"); let matches = matches::get_matches_by_order_id(&mut conn, order.id)?; - let filled_with = get_filled_with_from_matches(matches, network, oracle_pk)?; - let channel_opening_params = - db::channel_opening_params::get_by_order_id(&mut conn, order.id)?; - - tracing::info!(trader_id = %order.trader_id, order_id = %order.id, order_reason = ?order.order_reason, "Executing trade for match"); - let trade_executor = TradeExecutor::new(node, notifier); - trade_executor - .execute(&TradeAndChannelParams { - trade_params: TradeParams { - pubkey: trader_id, - contract_symbol: ContractSymbol::BtcUsd, - leverage: order.leverage, - quantity: order.quantity.to_f32().expect("to fit into f32"), - direction: order.direction, - filled_with, - }, - trader_reserve: channel_opening_params.map(|c| c.trader_reserve), - coordinator_reserve: channel_opening_params.map(|c| c.coordinator_reserve), - }) - .await; + tracing::info!(%trader_pubkey, order_id = %order.id, order_reason = ?order.order_reason, "Executing trade for match"); + if let Err(e) = trade_executor + .send(ExecutableMatch { order, matches }) + .await + { + tracing::error!(%trader_pubkey, order_id= %order.id, "Failed to execute trade. Error: {e:#}"); + } } Ok(()) } - -fn get_filled_with_from_matches( - matches: Vec, - network: Network, - oracle_pk: XOnlyPublicKey, -) -> Result { - ensure!( - !matches.is_empty(), - "Need at least one matches record to construct a FilledWith" - ); - - let order_id = matches - .first() - .expect("to have at least one match") - .order_id; - - let expiry_timestamp = commons::calculate_next_expiry(OffsetDateTime::now_utc(), network); - - Ok(FilledWith { - order_id, - expiry_timestamp, - oracle_pk, - matches: matches - .iter() - .map(|m| Match { - id: m.id, - order_id: m.order_id, - quantity: m.quantity, - pubkey: m.match_trader_id, - execution_price: m.execution_price, - matching_fee: m.matching_fee, - }) - .collect(), - }) -} diff --git a/coordinator/src/orderbook/db/matches.rs b/coordinator/src/orderbook/db/matches.rs index 5de73144c..554280132 100644 --- a/coordinator/src/orderbook/db/matches.rs +++ b/coordinator/src/orderbook/db/matches.rs @@ -1,5 +1,4 @@ use crate::orderbook::db::custom_types::MatchState; -use crate::orderbook::trading::TraderMatchParams; use crate::schema::matches; use anyhow::ensure; use anyhow::Result; @@ -37,16 +36,33 @@ struct Matches { pub matching_fee_sats: i64, } -pub fn insert(conn: &mut PgConnection, match_params: &TraderMatchParams) -> Result<()> { - for record in Matches::new(match_params, MatchState::Pending) { - let affected_rows = diesel::insert_into(matches::table) - .values(record.clone()) - .execute(conn)?; - - ensure!(affected_rows > 0, "Could not insert matches"); - } +pub fn insert( + conn: &mut PgConnection, + order: &commons::Order, + matched_order: &commons::Order, + matching_fee: Amount, + match_state: commons::MatchState, + quantity: Decimal, +) -> QueryResult { + let match_ = Matches { + id: Uuid::new_v4(), + match_state: match_state.into(), + order_id: order.id, + trader_id: order.trader_id.to_string(), + match_order_id: matched_order.id, + match_trader_id: matched_order.trader_id.to_string(), + execution_price: matched_order.price.to_f32().expect("to fit into f32"), + quantity: quantity.to_f32().expect("to fit into f32"), + created_at: OffsetDateTime::now_utc(), + updated_at: OffsetDateTime::now_utc(), + matching_fee_sats: matching_fee.to_sat() as i64, + }; + + diesel::insert_into(matches::table) + .values(match_.clone()) + .execute(conn)?; - Ok(()) + Ok(match_.into()) } pub fn set_match_state( @@ -89,33 +105,6 @@ pub fn set_match_state_by_order_id( Ok(()) } -impl Matches { - pub fn new(match_params: &TraderMatchParams, match_state: MatchState) -> Vec { - let order_id = match_params.filled_with.order_id; - let updated_at = OffsetDateTime::now_utc(); - let trader_id = match_params.trader_id; - - match_params - .filled_with - .matches - .iter() - .map(|m| Matches { - id: m.id, - match_state, - order_id, - trader_id: trader_id.to_string(), - match_order_id: m.order_id, - match_trader_id: m.pubkey.to_string(), - execution_price: m.execution_price.to_f32().expect("to fit into f32"), - quantity: m.quantity.to_f32().expect("to fit into f32"), - created_at: updated_at, - updated_at, - matching_fee_sats: m.matching_fee.to_sat() as i64, - }) - .collect() - } -} - impl From for Matches { fn from(value: commons::Matches) -> Self { Matches { diff --git a/coordinator/src/orderbook/db/orders.rs b/coordinator/src/orderbook/db/orders.rs index c407474b9..f01cdb7bc 100644 --- a/coordinator/src/orderbook/db/orders.rs +++ b/coordinator/src/orderbook/db/orders.rs @@ -410,6 +410,22 @@ pub fn delete(conn: &mut PgConnection, id: Uuid) -> QueryResult set_order_state(conn, id, commons::OrderState::Deleted) } +pub fn set_order_state_partially_taken( + conn: &mut PgConnection, + id: Uuid, + quantity: Decimal, +) -> QueryResult { + let order: Order = diesel::update(orders::table) + .filter(orders::order_id.eq(id)) + .set(( + orders::order_state.eq(OrderState::Taken), + orders::quantity.eq(quantity.to_f32().expect("to fit")), + )) + .get_result(conn)?; + + Ok(OrderbookOrder::from(order)) +} + /// Returns the number of affected rows: 1. pub fn set_order_state( conn: &mut PgConnection, diff --git a/coordinator/src/orderbook/mod.rs b/coordinator/src/orderbook/mod.rs index 8cdf2d2bf..25787e0b9 100644 --- a/coordinator/src/orderbook/mod.rs +++ b/coordinator/src/orderbook/mod.rs @@ -1,21 +1,32 @@ +use crate::orderbook::db::matches; use crate::orderbook::db::orders; +use crate::referrals; +use crate::trade::ExecutableMatch; use anyhow::anyhow; +use anyhow::bail; use anyhow::Context; use anyhow::Result; use bitcoin::secp256k1::PublicKey; +use bitcoin::Amount; use diesel::r2d2::ConnectionManager; use diesel::r2d2::Pool; +use diesel::result::Error::RollbackTransaction; +use diesel::Connection; use diesel::PgConnection; +use rust_decimal::prelude::ToPrimitive; +use rust_decimal::Decimal; +use rust_decimal::RoundingStrategy; use std::cmp::Ordering; -use tokio::sync::broadcast; -use tokio::sync::mpsc; -use tokio::sync::mpsc::Sender; +use std::collections::HashMap; +use std::vec; use tokio::task::spawn_blocking; use uuid::Uuid; use xxi_node::commons; +use xxi_node::commons::MatchState; use xxi_node::commons::Message; use xxi_node::commons::Message::DeleteOrder; -use xxi_node::commons::NewOrder; +use xxi_node::commons::NewLimitOrder; +use xxi_node::commons::NewMarketOrder; use xxi_node::commons::Order; use xxi_node::commons::OrderReason; use xxi_node::commons::OrderState; @@ -30,66 +41,134 @@ pub mod websocket; #[cfg(test)] mod tests; -pub struct Orderbook { +struct Orderbook { pool: Pool>, - shorts: Shorts, - longs: Longs, + shorts: OrderbookSide, + longs: OrderbookSide, + // TODO(holzeis): Split up order matching fee rate into taker and maker fees. + order_matching_fee_rate: Decimal, } -pub struct Shorts(Vec); +struct OrderbookSide { + orders: Vec, +} -impl Shorts { - pub fn add(&mut self, order: Order) { - self.0.push(order); +impl OrderbookSide { + pub fn new() -> Self { + Self { orders: vec![] } } - pub fn remove(&mut self, order_id: Uuid) { - self.0.retain(|o| o.id != order_id) + /// adds the given order to the orderbook. + pub fn add_order(&mut self, order: Order) { + self.orders.push(order); + self.sort(order.direction); } - pub fn sort(&mut self) { - self.0.sort_by(|a, b| { - if a.price.cmp(&b.price) == Ordering::Equal { - return a.timestamp.cmp(&b.timestamp); - } - // Descending order. - b.price.cmp(&a.price) - }) + /// updates an order in the orderbook. Ignores the update if order does not exist. + pub fn update_order(&mut self, updated_order: Order) { + if let Some(order) = self.orders.iter_mut().find(|o| o.id == updated_order.id) { + *order = updated_order; + } + self.sort(updated_order.direction); } -} + /// removes the order by the given id from the orderbook. + pub fn remove_order(&mut self, order_id: Uuid) { + self.orders.retain(|o| o.id != order_id); + } + /// removes the first order from the sorted list representing the best order in the orderbook. + pub fn take_order(&mut self) -> Option { + match self.orders.is_empty() { + true => None, + false => Some(self.orders.remove(0)), + } + } + /// matches orders from the orderbook for the given quantity. + pub fn match_order(&mut self, quantity: Decimal) -> Vec { + let mut matched_orders = vec![]; + + let mut quantity = quantity; + while let Some(mut order) = self.take_order() { + if order.is_expired() { + // ignore expired orders. + continue; + } + + match order.quantity.cmp(&quantity) { + Ordering::Less => { + // if the found order has less quantity we subtract the full quantity from the + // searched quantity and add the limit order to the matched orders. + quantity -= order.quantity; + matched_orders.push(order); + } + Ordering::Greater => { + // if the found order has more quantity than needed, we create a remaining order + // from the left over quantity. Note, this will only be kept in memory and + // applied to the orderbook state by joining the trades. + let mut remaining_order = order; + remaining_order.quantity -= quantity; + self.add_order(remaining_order); -pub struct Longs(Vec); + // update the matched order quantity with the searched for quantity. + order.quantity = quantity; + matched_orders.push(order); -impl Longs { - pub fn add(&mut self, order: Order) { - self.0.push(order); - self.sort(); + // we found enough liquidity in the order book to match the order. + break; + } + Ordering::Equal => { + // we found a perfect match for the searched for quantity. + matched_orders.push(order); + break; + } + } + } + + matched_orders } - pub fn remove(&mut self, order_id: Uuid) { - self.0.retain(|o| o.id != order_id) + /// rollback matched orders + pub fn rollback_matched_orders(&mut self, matched_orders: Vec) { + for order in matched_orders { + if let Some(order) = self.orders.iter_mut().find(|o| o.id == order.id) { + order.quantity += order.quantity; + } else { + self.add_order(order); + } + } } - pub fn sort(&mut self) { - self.0.sort_by(|a, b| { + + /// Sorts the orders ascending if long and descending if short, + fn sort(&mut self, direction: commons::Direction) { + self.orders.sort_by(|a, b| { if a.price.cmp(&b.price) == Ordering::Equal { return a.timestamp.cmp(&b.timestamp); } - // Ascending order. - a.price.cmp(&b.price) - }); + match direction { + // Descending order. + commons::Direction::Short => b.price.cmp(&a.price), + // Ascending order. + commons::Direction::Long => a.price.cmp(&b.price), + } + }) } } impl Orderbook { - pub fn new(pool: Pool>) -> Self { + pub fn new( + pool: Pool>, + order_matching_fee_rate: Decimal, + ) -> Self { Self { pool, - shorts: Shorts(vec![]), - longs: Longs(vec![]), + shorts: OrderbookSide::new(), + longs: OrderbookSide::new(), + order_matching_fee_rate, } } + /// Initializes the orderbook with non expired open limit orders. async fn initialize(&mut self) -> Result<()> { let all_orders = spawn_blocking({ let mut conn = self.pool.clone().get()?; move || { + // TODO(holzeis): join with trades to get partially matched orders. let all_orders = orders::get_all_orders(&mut conn, OrderType::Limit, OrderState::Open, true)?; anyhow::Ok(all_orders) @@ -97,53 +176,182 @@ impl Orderbook { }) .await??; + // TODO(holzeis): maybe consider batch adding all orders and sort afterwards instead of + // adding every order individually. for order in all_orders { match order.direction { - commons::Direction::Short => self.shorts.add(order.clone()), - commons::Direction::Long => self.longs.add(order.clone()), + commons::Direction::Short => self.shorts.add_order(order), + commons::Direction::Long => self.longs.add_order(order), } } Ok(()) } - async fn add_order( + /// Adds a limit order to the orderbook. + async fn add_limit_order(&mut self, new_order: NewLimitOrder) -> Result { + let order = spawn_blocking({ + let mut conn = self.pool.clone().get()?; + move || { + let order = orders::insert_limit_order(&mut conn, new_order, OrderReason::Manual) + .map_err(|e| anyhow!(e)) + .context("Failed to insert new order into DB")?; + + anyhow::Ok(order) + } + }) + .await??; + + match order.direction { + commons::Direction::Short => self.shorts.add_order(order), + commons::Direction::Long => self.longs.add_order(order), + } + Ok(Message::NewOrder(order)) + } + + /// Matches a market order against the orderbook. Will fail if the market order can't be fully + /// matched. + async fn match_market_order( &mut self, - new_order: NewOrder, + new_order: NewMarketOrder, order_reason: OrderReason, - ) -> Result> { + ) -> Result> { + let trader_pubkey = new_order.trader_id; + // TODO(holzeis): We might want to delay that action in a regular task allowing us to + // process orderbook updates much faster in memory and postpone the expensive database + // transactions. let order = spawn_blocking({ let mut conn = self.pool.clone().get()?; move || { - let order = match new_order { - NewOrder::Market(o) => { - orders::insert_market_order(&mut conn, o.clone(), order_reason) - } - NewOrder::Limit(o) => orders::insert_limit_order(&mut conn, o, order_reason), - } - .map_err(|e| anyhow!(e)) - .context("Failed to insert new order into DB")?; + let order = orders::insert_market_order(&mut conn, new_order, order_reason) + .map_err(|e| anyhow!(e)) + .context("Failed to insert new order into DB")?; anyhow::Ok(order) } }) .await??; - let message = match order.order_type { - OrderType::Market => None, - OrderType::Limit => { - match order.direction { - commons::Direction::Short => self.shorts.add(order.clone()), - commons::Direction::Long => self.longs.add(order.clone()), - } - Some(Message::NewOrder(order)) + // find a match for the market order. + let matched_orders = match order.direction.opposite() { + commons::Direction::Short => self.shorts.match_order(order.quantity), + commons::Direction::Long => self.longs.match_order(order.quantity), + }; + + let fail_order = { + let order_id = order.id; + let pool = self.pool.clone(); + move || { + let mut conn = pool.get()?; + orders::set_order_state(&mut conn, order_id, OrderState::Failed)?; + anyhow::Ok(()) } }; - Ok(message) + let matched_quantity: Decimal = matched_orders.iter().map(|o| o.quantity).sum(); + if matched_quantity != order.quantity { + // not enough liquidity in the orderbook. + tracing::warn!( + trader_pubkey = %order.trader_id, + order_id = %order.id, + wanted = %order.quantity, + got = %matched_quantity, + "Couldn't match order due to insufficient liquidity in the orderbook." + ); + match order.direction.opposite() { + commons::Direction::Short => self.shorts.rollback_matched_orders(matched_orders), + commons::Direction::Long => self.longs.rollback_matched_orders(matched_orders), + } + + spawn_blocking(fail_order).await??; + bail!("No match found."); + } + + // apply match to database. + let matches = spawn_blocking({ + let mut conn = self.pool.clone().get()?; + let matched_orders = matched_orders.clone(); + let fee_percent = self.order_matching_fee_rate; + move || { + let mut matches: HashMap = HashMap::new(); + conn.transaction(|conn| { + let status = + referrals::get_referral_status(order.trader_id, conn).map_err(|_| RollbackTransaction)?; + let fee_discount = status.referral_fee_bonus; + let fee_percent = fee_percent - (fee_percent * fee_discount); + + tracing::debug!(%trader_pubkey, %fee_discount, total_fee_percent = %fee_percent, "Fee discount calculated"); + + let order = orders::set_order_state(conn, order.id, OrderState::Matched)?; + for matched_order in matched_orders { + let matching_fee = order.quantity / matched_order.price * fee_percent; + let matching_fee = matching_fee.round_dp_with_strategy(8, RoundingStrategy::MidpointAwayFromZero); + let matching_fee = match Amount::from_btc(matching_fee.to_f64().expect("to fit")) { + Ok(fee) => fee, + Err(err) => { + tracing::error!( + trader_pubkey = matched_order.trader_id.to_string(), + order_id = matched_order.id.to_string(), + "Failed calculating order matching fee for order {err:?}. Falling back to 0" + ); + Amount::ZERO + } + }; + + let taker_match = matches::insert(conn, &order, &matched_order, matching_fee, MatchState::Pending, matched_order.quantity)?; + if let Some(taker_matches) = matches.get_mut(&order.trader_id) { + taker_matches.matches.push(taker_match); + } else { + matches.insert(order.trader_id, ExecutableMatch{ + order, + matches: vec![taker_match] + }); + } + + // TODO(holzeis): For now we don't execute the limit order with the maker as the our maker does not + // have a dlc channel with the coordinator, hence we set the match directly to filled. Once we + // introduce actual makers these matches need to execute them. + let _maker_match = matches::insert(conn, &matched_order, &order, matching_fee, MatchState::Filled, matched_order.quantity)?; + orders::set_order_state(conn, matched_order.id, OrderState::Taken)?; + + // TODO(holzeis): Add executeable match once we support actual limit orders. + // if let Some(maker_matches) = matches.get_mut(&matched_order.trader_id) { + // maker_matches.matches.push(taker_match); + // } else { + // matches.insert(order.trader_id, ExecutableMatch{ + // order, + // matches: vec![taker_match] + // }); + // } + + } + + diesel::result::QueryResult::Ok(()) + }).inspect_err(|_| { + if let Err(e) = fail_order() { + tracing::error!(%trader_pubkey, order_id=%order.id, "Failed to fail order. Error: {e:#}"); + } + })?; + + anyhow::Ok(matches) + } + }) + .await? + .inspect_err(move |_| match order.direction.opposite() { + // if something goes wrong we have to rollback the matched orders to the orderbook. + commons::Direction::Short => self.shorts.rollback_matched_orders(matched_orders), + commons::Direction::Long => self.longs.rollback_matched_orders(matched_orders), + })?; + + Ok(matches.into_values().collect::>()) } + /// Updates an order in the orderbook. If the order id couldn't be found the update will be + /// ignored. async fn update_order(&mut self, order: Order) -> Result> { + // TODO(holzeis): We might want to delay that action in a regular task allowing us to + // process orderbook updates much faster in memory and postpone the expensive + // database transactions. let order = spawn_blocking({ let mut conn = self.pool.clone().get()?; move || { @@ -156,12 +364,9 @@ impl Orderbook { let message = match order.order_type { OrderType::Market => None, OrderType::Limit => { - self.shorts.remove(order.id); - self.longs.remove(order.id); - match order.direction { - commons::Direction::Short => self.shorts.add(order.clone()), - commons::Direction::Long => self.longs.add(order.clone()), + commons::Direction::Short => self.shorts.update_order(order), + commons::Direction::Long => self.longs.update_order(order), } Some(Message::Update(order)) @@ -171,15 +376,26 @@ impl Orderbook { Ok(message) } - async fn remove_order( - &mut self, - trader_pubkey: PublicKey, - order_id: Uuid, - ) -> Result> { + /// Removes a limit order from the orderbook. Ignores removing market orders, since they aren't + /// stored into the orderbook. + async fn remove_order(&mut self, order_id: Uuid) -> Result> { + // TODO(holzeis): We might want to delay that action in a regular task allowing us to + // process orderbook updates much faster in memory and postpone the expensive database + // transactions. let order = spawn_blocking({ let mut conn = self.pool.clone().get()?; move || { - let order = orders::delete_trader_order(&mut conn, order_id, trader_pubkey)?; + let matches = matches::get_matches_by_order_id(&mut conn, order_id)?; + + let order = if !matches.is_empty() { + // order has been at least partially matched. + let matched_quantity = matches.iter().map(|m| m.quantity).sum(); + + orders::set_order_state_partially_taken(&mut conn, order_id, matched_quantity) + } else { + orders::delete(&mut conn, order_id) + }?; + anyhow::Ok(order) } }) @@ -187,11 +403,11 @@ impl Orderbook { let message = match (order.order_type, order.direction) { (OrderType::Limit, commons::Direction::Short) => { - self.shorts.remove(order_id); + self.shorts.remove_order(order_id); Some(DeleteOrder(order_id)) } (OrderType::Limit, commons::Direction::Long) => { - self.longs.remove(order_id); + self.longs.remove_order(order_id); Some(DeleteOrder(order_id)) } (OrderType::Market, _) => None, @@ -201,81 +417,199 @@ impl Orderbook { } } -#[derive(Debug)] -pub enum OrderbookMessage { - NewOrder { - new_order: NewOrder, - order_reason: OrderReason, - }, - DeleteOrder { - trader_pubkey: PublicKey, - order_id: Uuid, - }, - Update(Order), -} +#[cfg(test)] +mod orderbook_tests { + use crate::orderbook::OrderbookSide; + use bitcoin::secp256k1::PublicKey; + use rust_decimal::Decimal; + use rust_decimal::RoundingStrategy; + use rust_decimal_macros::dec; + use std::str::FromStr; + use time::Duration; + use time::OffsetDateTime; + use uuid::Uuid; + use xxi_node::commons::ContractSymbol; + use xxi_node::commons::Direction; + use xxi_node::commons::Order; + use xxi_node::commons::OrderReason; + use xxi_node::commons::OrderState; + use xxi_node::commons::OrderType; -pub fn spawn_orderbook( - pool: Pool>, - tx_orderbook_feed: broadcast::Sender, -) -> Result> { - let mut orderbook = Orderbook::new(pool.clone()); + #[test] + pub fn test_add_order_to_orderbook_side() { + let mut longs = OrderbookSide::new(); - let (sender, mut receiver) = mpsc::channel::(100); + let long_order = dummy_order(dec!(100), dec!(50000), Direction::Long); + longs.add_order(long_order); - tokio::spawn({ - async move { - if let Err(e) = orderbook.initialize().await { - tracing::error!("Failed to initialize orderbook. Error: {e:#}"); - return; - } - while let Some(message) = receiver.recv().await { - let msg = match message { - OrderbookMessage::NewOrder { - new_order, - order_reason, - } => { - let trader_pubkey = new_order.trader_id(); - let order_id = new_order.id(); - match orderbook.add_order(new_order, order_reason).await { - Ok(message) => message, - Err(e) => { - tracing::error!(%trader_pubkey, %order_id, "Failed to insert new order. Error: {e:#}"); - continue; - } - } - } - OrderbookMessage::DeleteOrder { - trader_pubkey, - order_id, - } => match orderbook.remove_order(trader_pubkey, order_id).await { - Ok(message) => message, - Err(e) => { - tracing::error!(%trader_pubkey, %order_id, "Failed to remove order. Error: {e:#}"); - continue; - } - }, - OrderbookMessage::Update(order) => { - match orderbook.update_order(order.clone()).await { - Ok(message) => message, - Err(e) => { - tracing::error!(trader_pubkey=%order.trader_id, order_id=%order.id, - "Failed to update order. Error: {e:#}"); - continue; - } - } - } - }; + let order = longs.take_order(); - if let Some(msg) = msg { - if let Err(e) = tx_orderbook_feed.send(msg) { - tracing::error!("Failed to send orderbook message. Error: {e:#}"); - } - } - } + assert_eq!(Some(long_order), order); + } + + #[test] + pub fn test_update_order_to_orderbook_side() { + let mut longs = OrderbookSide::new(); + + let mut long_order = dummy_order(dec!(100), dec!(50000), Direction::Long); + longs.add_order(long_order); + + long_order.quantity = dec!(200); + longs.update_order(long_order); + + let order = longs.take_order(); + assert_eq!(Some(long_order), order); + + assert_eq!(None, longs.take_order()); + } + + #[test] + pub fn test_remove_order_from_orderbook_side() { + let mut longs = OrderbookSide::new(); + + let long_order = dummy_order(dec!(100), dec!(50000), Direction::Long); + longs.add_order(long_order); + + longs.remove_order(long_order.id); + + let order = longs.take_order(); + assert_eq!(None, order); + } + + #[test] + pub fn test_remove_invalid_order_id_from_orderbook_side() { + let mut longs = OrderbookSide::new(); + + let long_order = dummy_order(dec!(100), dec!(50000), Direction::Long); + longs.add_order(long_order); + + longs.remove_order(Uuid::new_v4()); + + let order = longs.take_order(); + assert_eq!(Some(long_order), order); + } - tracing::warn!("Orderbook channel has been closed."); + #[test] + pub fn test_match_order_exact_match_single_order() { + let mut longs = OrderbookSide::new(); + + let long_order = dummy_order(dec!(100), dec!(50000), Direction::Long); + longs.add_order(long_order); + + let matched_orders = longs.match_order(dec!(100)); + assert_eq!(1, matched_orders.len()); + assert_eq!(dec!(100), matched_orders.iter().map(|m| m.quantity).sum()); + + assert_eq!(None, longs.take_order()); + } + + #[test] + pub fn test_match_order_partial_limit_order_match_single_order() { + let mut longs = OrderbookSide::new(); + + let long_order = dummy_order(dec!(100), dec!(50000), Direction::Long); + longs.add_order(long_order); + + let matched_orders = longs.match_order(dec!(25)); + assert_eq!(1, matched_orders.len()); + assert_eq!(dec!(25), matched_orders.iter().map(|m| m.quantity).sum()); + + assert_eq!( + Some(Order { + quantity: dec!(75), + ..long_order + }), + longs.take_order() + ); + } + + #[test] + pub fn test_match_order_partial_market_order_match_single_order() { + let mut longs = OrderbookSide::new(); + + let long_order = dummy_order(dec!(100), dec!(50000), Direction::Long); + longs.add_order(long_order); + + let matched_orders = longs.match_order(dec!(125)); + assert_eq!(1, matched_orders.len()); + assert_eq!(dec!(100), matched_orders.iter().map(|m| m.quantity).sum()); + + assert_eq!(None, longs.take_order()); + } + + #[test] + pub fn test_match_order_partial_match_multiple_orders() { + let mut longs = OrderbookSide::new(); + + let long_order_1 = dummy_order(dec!(25), dec!(50100), Direction::Long); + longs.add_order(long_order_1); + + let long_order_2 = dummy_order(dec!(40), dec!(50200), Direction::Long); + longs.add_order(long_order_2); + + let long_order_3 = dummy_order(dec!(35), dec!(50300), Direction::Long); + longs.add_order(long_order_3); + + let matched_orders = longs.match_order(dec!(50)); + assert_eq!(2, matched_orders.len()); + assert_eq!(dec!(50), matched_orders.iter().map(|m| m.quantity).sum()); + + assert_eq!(dec!(50149.95), average_entry_price(&matched_orders)); + + assert_eq!( + Some(Order { + quantity: dec!(15), + ..long_order_2 + }), + longs.take_order() + ); + + assert_eq!( + Some(Order { + quantity: dec!(35), + ..long_order_3 + }), + longs.take_order() + ); + } + + fn dummy_order(quantity: Decimal, price: Decimal, direction: Direction) -> Order { + Order { + id: Uuid::new_v4(), + price, + trader_id: dummy_public_key(), + direction, + leverage: 1.0, + contract_symbol: ContractSymbol::BtcUsd, + quantity, + order_type: OrderType::Market, + timestamp: OffsetDateTime::now_utc(), + expiry: OffsetDateTime::now_utc() + Duration::minutes(1), + order_state: OrderState::Open, + order_reason: OrderReason::Manual, + stable: false, } - }); + } + + fn dummy_public_key() -> PublicKey { + PublicKey::from_str("02bd998ebd176715fe92b7467cf6b1df8023950a4dd911db4c94dfc89cc9f5a655") + .unwrap() + } + + fn average_entry_price(orders: &[Order]) -> Decimal { + if orders.is_empty() { + return Decimal::ZERO; + } + if orders.len() == 1 { + return orders.first().expect("to be exactly one").price; + } + let sum_quantity = orders.iter().fold(dec!(0), |acc, m| acc + m.quantity); - Ok(sender) + let nominal_prices = orders + .iter() + .fold(dec!(0), |acc, m| acc + (m.quantity / m.price)); + + (sum_quantity / nominal_prices) + .round_dp_with_strategy(2, RoundingStrategy::MidpointAwayFromZero) + } } diff --git a/coordinator/src/orderbook/trading.rs b/coordinator/src/orderbook/trading.rs index 6c3d950a6..a745e3ec5 100644 --- a/coordinator/src/orderbook/trading.rs +++ b/coordinator/src/orderbook/trading.rs @@ -1,878 +1,161 @@ -use crate::db; -use crate::message::TraderMessage; -use crate::node::Node; use crate::notifications::Notification; use crate::notifications::NotificationKind; -use crate::orderbook::db::matches; -use crate::orderbook::db::orders; -use crate::orderbook::OrderbookMessage; -use crate::referrals; -use crate::trade::TradeExecutor; -use anyhow::anyhow; -use anyhow::bail; -use anyhow::Context; +use crate::orderbook::Orderbook; +use crate::trade::ExecutableMatch; use anyhow::Result; use bitcoin::secp256k1::PublicKey; -use bitcoin::secp256k1::XOnlyPublicKey; -use bitcoin::Amount; -use bitcoin::Network; -use futures::future::RemoteHandle; -use futures::FutureExt; -use rust_decimal::prelude::ToPrimitive; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; +use diesel::PgConnection; use rust_decimal::Decimal; -use rust_decimal::RoundingStrategy; -use std::cmp::Ordering; -use time::OffsetDateTime; +use tokio::sync::broadcast; use tokio::sync::mpsc; -use tokio::task::spawn_blocking; use uuid::Uuid; -use xxi_node::commons; -use xxi_node::commons::ChannelOpeningParams; -use xxi_node::commons::ContractSymbol; -use xxi_node::commons::Direction; -use xxi_node::commons::FilledWith; -use xxi_node::commons::Match; -use xxi_node::commons::Message::TradeError; -use xxi_node::commons::NewLimitOrder; +use xxi_node::commons::Message; use xxi_node::commons::NewOrder; use xxi_node::commons::Order; use xxi_node::commons::OrderReason; -use xxi_node::commons::OrderState; -use xxi_node::commons::OrderType; -use xxi_node::commons::TradeAndChannelParams; -use xxi_node::commons::TradeParams; -use xxi_node::commons::TradingError; /// This value is arbitrarily set to 100 and defines the number of new order messages buffered in /// the channel. -const NEW_ORDERS_BUFFER_SIZE: usize = 100; - -#[derive(Clone)] -pub struct NewOrderMessage { - pub new_order: NewOrder, - pub order_reason: OrderReason, - pub channel_opening_params: Option, -} - -impl From for Order { - fn from(value: NewOrderMessage) -> Self { - match value.new_order { - NewOrder::Market(new_order) => Self { - id: new_order.id, - price: Decimal::ZERO, - leverage: new_order.leverage.to_f32().expect("to fit"), - contract_symbol: new_order.contract_symbol, - trader_id: new_order.trader_id, - direction: new_order.direction, - quantity: new_order.quantity, - order_type: OrderType::Market, - timestamp: OffsetDateTime::now_utc(), - expiry: new_order.expiry, - order_state: OrderState::Open, - order_reason: value.order_reason, - stable: new_order.stable, - }, - NewOrder::Limit(new_order) => Self { - id: new_order.id, - price: new_order.price, - leverage: new_order.leverage.to_f32().expect("to fit"), - contract_symbol: new_order.contract_symbol, - trader_id: new_order.trader_id, - direction: new_order.direction, - quantity: new_order.quantity, - order_type: OrderType::Limit, - timestamp: OffsetDateTime::now_utc(), - expiry: new_order.expiry, - order_state: OrderState::Open, - order_reason: value.order_reason, - stable: new_order.stable, - }, - } - } -} - -#[derive(Clone)] -pub struct MatchParams { - pub taker_match: TraderMatchParams, - pub makers_matches: Vec, -} - -#[derive(Clone)] -pub struct TraderMatchParams { - pub trader_id: PublicKey, - pub filled_with: FilledWith, +const ORDERBOOK_BUFFER_SIZE: usize = 100; + +#[derive(Debug)] +pub enum OrderbookMessage { + NewOrder { + new_order: NewOrder, + order_reason: OrderReason, + }, + DeleteOrder(Uuid), + Update(Order), } -/// Spawn a task that processes [`NewOrderMessage`]s. -/// -/// To feed messages to this task, the caller can use the corresponding -/// [`mpsc::Sender`] returned. -pub fn start( - node: Node, - orderbook_sender: mpsc::Sender, - trade_notifier: mpsc::Sender, +pub fn spawn_orderbook( + pool: Pool>, notifier: mpsc::Sender, - network: Network, - oracle_pk: XOnlyPublicKey, -) -> (RemoteHandle<()>, mpsc::Sender) { - let (sender, mut receiver) = mpsc::channel::(NEW_ORDERS_BUFFER_SIZE); - - let (fut, remote_handle) = async move { - while let Some(new_order_msg) = receiver.recv().await { - tokio::spawn({ - let orderbook_sender = orderbook_sender.clone(); - let notifier = notifier.clone(); - let trade_notifier = trade_notifier.clone(); - let node = node.clone(); - async move { - let order: Order = new_order_msg.clone().into(); - let order_reason = &new_order_msg.order_reason; - let new_order = &new_order_msg.new_order; - let channel_opening_params = &new_order_msg.channel_opening_params; - let trader_id = new_order.trader_id(); - let order_id = new_order.id(); - - tracing::trace!( - %trader_id, - %order_id, - order_type = new_order.order_type(), - "Processing new order", - ); - - if let Err(error) = match new_order { - NewOrder::Market(_) => { - process_new_market_order( - node, - notifier.clone(), - trade_notifier.clone(), - &order, - network, - oracle_pk, - channel_opening_params - ).await + trade_executor: mpsc::Sender, + tx_orderbook_feed: broadcast::Sender, + order_matching_fee_rate: Decimal, +) -> Result> { + let mut orderbook = Orderbook::new(pool.clone(), order_matching_fee_rate); + + let (sender, mut receiver) = mpsc::channel::(ORDERBOOK_BUFFER_SIZE); + + tokio::spawn({ + let notifier = notifier.clone(); + async move { + if let Err(e) = orderbook.initialize().await { + tracing::error!("Failed to initialize orderbook. Error: {e:#}"); + return; + } + while let Some(message) = receiver.recv().await { + let msg = match message { + OrderbookMessage::NewOrder { + new_order: NewOrder::Market(new_order), + order_reason, + } => { + let trader_pubkey = new_order.trader_id; + let order_id = new_order.id; + match orderbook.match_market_order(new_order, order_reason).await { + Ok(executable_matches) => { + // in case of an async match (e.g. expired, liquidation) the user + // will get a push notification. + if let Err(e) = + notify_user(notifier.clone(), trader_pubkey, order_reason).await + { + tracing::warn!(%trader_pubkey, %order_id, ?order_reason, "Failed to send push notification. Error: {e:#}"); + } + + for executable_match in executable_matches { + if let Err(e) = trade_executor.send(executable_match).await { + tracing::error!(%trader_pubkey, %order_id, ?order_reason, "Failed to execute trade. Error: {e:#}"); + } + } + + None + } + Err(e) => { + tracing::error!(%trader_pubkey, %order_id, "Failed to match new market order. Error: {e:#}"); + Some(Message::TradeError { + order_id, + error: e.into(), + }) + } } - NewOrder::Limit(new_order) => { - process_new_limit_order( - orderbook_sender, - *new_order, - ) - .await + } + OrderbookMessage::NewOrder { + new_order: NewOrder::Limit(new_order), + .. + } => { + let trader_pubkey = new_order.trader_id; + let order_id = new_order.id; + match orderbook.add_limit_order(new_order).await { + Ok(message) => Some(message), + Err(e) => { + tracing::error!(%trader_pubkey, %order_id, "Failed to process new limit order. Error: {e:#}"); + continue; + } } - } { - if order_reason == &OrderReason::Manual { - // TODO(holzeis): the maker is currently not subscribed to the websocket - // api, hence it wouldn't receive the error message. - if let Err(e) = trade_notifier - .send(TraderMessage { - trader_id, - message: TradeError { order_id, error }, - notification: None, - }) - .await - { - tracing::error!(%trader_id, %order_id, "Failed to send trade error. Error: {e:#}"); + } + OrderbookMessage::DeleteOrder(order_id) => { + match orderbook.remove_order(order_id).await { + Ok(message) => message, + Err(e) => { + tracing::error!(%order_id, "Failed to process remove order. Error: {e:#}"); + continue; } } } - } - }); - } - - tracing::error!("Channel closed"); - } - .remote_handle(); - - tokio::spawn(fut); - - (remote_handle, sender) -} - -pub async fn process_new_limit_order( - orderbook_sender: mpsc::Sender, - new_order: NewLimitOrder, -) -> Result<(), TradingError> { - orderbook_sender - .send(OrderbookMessage::NewOrder { - new_order: NewOrder::Limit(new_order), - order_reason: OrderReason::Manual, - }) - .await - .map_err(|e| anyhow!(e)) - .context("Failed to send new order message to orderbook")?; - - Ok(()) -} - -// TODO(holzeis): This functions runs multiple inserts in separate db transactions. This should only -// happen in a single transaction to ensure either all data or nothing is stored to the database. -pub async fn process_new_market_order( - node: Node, - notifier: mpsc::Sender, - trade_notifier: mpsc::Sender, - order: &Order, - network: Network, - oracle_pk: XOnlyPublicKey, - channel_opening_params: &Option, -) -> Result<(), TradingError> { - let mut conn = spawn_blocking({ - let node = node.clone(); - move || node.pool.get() - }) - .await - .expect("task to complete") - .map_err(|e| anyhow!("{e:#}"))?; - - // Reject new order if there is already a matched order waiting for execution. - if let Some(order) = - orders::get_by_trader_id_and_state(&mut conn, order.trader_id, OrderState::Matched) - .map_err(|e| anyhow!("{e:#}"))? - { - return Err(TradingError::InvalidOrder(format!( - "trader_id={}, order_id={}. Order is currently in execution. \ - Can't accept new orders until the order execution is finished", - order.trader_id, order.id - ))); - } - - let opposite_direction_limit_orders = orders::all_by_direction_and_type( - &mut conn, - order.direction.opposite(), - OrderType::Limit, - true, - ) - .map_err(|e| anyhow!("{e:#}"))?; - - let fee_percent = { node.settings.read().await.order_matching_fee_rate }; - let fee_percent = Decimal::try_from(fee_percent).expect("to fit into decimal"); - - let trader_pubkey_string = order.trader_id.to_string(); - let status = referrals::get_referral_status(order.trader_id, &mut conn)?; - let fee_discount = status.referral_fee_bonus; - let fee_percent = fee_percent - (fee_percent * fee_discount); - - tracing::debug!( - trader_pubkey = trader_pubkey_string, - %fee_discount, total_fee_percent = %fee_percent, "Fee discount calculated"); - - let matched_orders = match match_order( - order, - opposite_direction_limit_orders, - network, - oracle_pk, - fee_percent, - ) { - Ok(Some(matched_orders)) => matched_orders, - Ok(None) => { - // TODO(holzeis): Currently we still respond to the user immediately if there - // has been a match or not, that's the reason why we also have to set the order - // to failed here. But actually we could keep the order until either expired or - // a match has been found and then update the state accordingly. - - orders::set_order_state(&mut conn, order.id, OrderState::Failed) - .map_err(|e| anyhow!("{e:#}"))?; - return Err(TradingError::NoMatchFound(format!( - "Could not match order {}", - order.id - ))); - } - Err(e) => { - orders::set_order_state(&mut conn, order.id, OrderState::Failed) - .map_err(|e| anyhow!("{e:#}"))?; - return Err(TradingError::Other(format!("Failed to match order: {e:#}"))); - } - }; - - tracing::info!( - trader_id=%order.trader_id, - order_id=%order.id, - "Found a match with {} makers for new order", - matched_orders.taker_match.filled_with.matches.len() - ); - - for match_param in matched_orders.matches() { - matches::insert(&mut conn, match_param)?; - - let trader_id = match_param.trader_id; - let order_id = match_param.filled_with.order_id.to_string(); - - tracing::info!(%trader_id, order_id, "Notifying trader about match"); - - let notification = match &order.order_reason { - OrderReason::Expired => Some(NotificationKind::PositionExpired), - OrderReason::TraderLiquidated => Some(NotificationKind::Custom { - title: "Oops, you got liquidated 💸".to_string(), - message: "Open your app to execute the liquidation".to_string(), - }), - OrderReason::CoordinatorLiquidated => Some(NotificationKind::Custom { - title: "Your counterparty got liquidated 💸".to_string(), - message: "Open your app to execute the liquidation".to_string(), - }), - OrderReason::Manual => None, - }; - - if let Some(notification) = notification { - // send user a push notification - notifier - .send(Notification::new(order.trader_id, notification)) - .await - .with_context(|| { - format!( - "Failed to send push notification. trader_id = {}", - order.trader_id - ) - })?; - } - - let order_state = if order.order_type == OrderType::Limit { - // FIXME: The maker is currently not connected to the WebSocket so we can't - // notify him about a trade. However, trades are always accepted by the - // maker at the moment so in order to not have all limit orders in order - // state `Match` we are setting the order to `Taken` even if we couldn't - // notify the maker. - OrderState::Taken - } else { - OrderState::Matched - }; - - tracing::debug!(%trader_id, order_id, "Updating the order state to {order_state:?}"); - - orders::set_order_state(&mut conn, match_param.filled_with.order_id, order_state) - .map_err(|e| anyhow!("{e:#}"))?; - } - - if let Some(channel_opening_params) = channel_opening_params { - db::channel_opening_params::insert(&mut conn, order.id, *channel_opening_params) - .map_err(|e| anyhow!("{e:#}"))?; - } - - if node.inner.is_connected(order.trader_id) { - tracing::info!(trader_id = %order.trader_id, order_id = %order.id, order_reason = ?order.order_reason, "Executing trade for match"); - let trade_executor = TradeExecutor::new(node.clone(), trade_notifier); - - trade_executor - .execute(&TradeAndChannelParams { - trade_params: TradeParams { - pubkey: order.trader_id, - contract_symbol: ContractSymbol::BtcUsd, - leverage: order.leverage, - quantity: order.quantity.to_f32().expect("to fit into f32"), - direction: order.direction, - filled_with: matched_orders.taker_match.filled_with, - }, - trader_reserve: channel_opening_params.map(|p| p.trader_reserve), - coordinator_reserve: channel_opening_params.map(|p| p.coordinator_reserve), - }) - .await; - } else { - match order.order_reason { - OrderReason::Manual => { - tracing::warn!(trader_id = %order.trader_id, order_id = %order.id, order_reason = ?order.order_reason, "Skipping trade execution as trader is not connected") - } - OrderReason::Expired - | OrderReason::TraderLiquidated - | OrderReason::CoordinatorLiquidated => { - tracing::info!(trader_id = %order.trader_id, order_id = %order.id, order_reason = ?order.order_reason, "Skipping trade execution as trader is not connected") - } - } - } - - Ok(()) -} - -/// Matches an [`Order`] of [`OrderType::Market`] with a list of [`Order`]s of [`OrderType::Limit`]. -/// -/// The caller is expected to provide a list of `opposite_direction_orders` of [`OrderType::Limit`] -/// and opposite [`Direction`] to the `market_order`. We nevertheless ensure that this is the case -/// to be on the safe side. -fn match_order( - market_order: &Order, - opposite_direction_orders: Vec, - network: Network, - oracle_pk: XOnlyPublicKey, - fee_percent: Decimal, -) -> Result> { - if market_order.order_type == OrderType::Limit { - // We don't match limit orders with other limit orders at the moment. - return Ok(None); - } - - let opposite_direction_orders = opposite_direction_orders - .into_iter() - .filter(|o| !o.direction.eq(&market_order.direction)) - .collect(); - - let mut orders = sort_orders(opposite_direction_orders, market_order.direction); - - let mut remaining_quantity = market_order.quantity; - let mut matched_orders = vec![]; - while !orders.is_empty() { - let matched_order = orders.remove(0); - remaining_quantity -= matched_order.quantity; - matched_orders.push(matched_order); - - if remaining_quantity <= Decimal::ZERO { - break; - } - } - - // For the time being we do not want to support multi-matches. - if matched_orders.len() > 1 { - bail!("More than one matched order, please reduce order quantity"); - } - - if matched_orders.is_empty() { - return Ok(None); - } - - let expiry_timestamp = commons::calculate_next_expiry(OffsetDateTime::now_utc(), network); - - let matches = matched_orders - .iter() - .map(|maker_order| { - let matching_fee = market_order.quantity / maker_order.price * fee_percent; - let matching_fee = matching_fee.round_dp_with_strategy(8, RoundingStrategy::MidpointAwayFromZero); - let matching_fee = match Amount::from_btc(matching_fee.to_f64().expect("to fit")) { - Ok(fee) => {fee} - Err(err) => { - tracing::error!( - trader_pubkey = maker_order.trader_id.to_string(), - order_id = maker_order.id.to_string(), - "Failed calculating order matching fee for order {err:?}. Falling back to 0"); - Amount::ZERO - } - }; - ( - TraderMatchParams { - trader_id: maker_order.trader_id, - filled_with: FilledWith { - order_id: maker_order.id, - expiry_timestamp, - oracle_pk, - matches: vec![Match { - id: Uuid::new_v4(), - order_id: market_order.id, - quantity: market_order.quantity, - pubkey: market_order.trader_id, - execution_price: maker_order.price, - matching_fee, - }], + OrderbookMessage::Update(order) => match orderbook.update_order(order).await { + Ok(message) => message, + Err(e) => { + tracing::error!(trader_pubkey=%order.trader_id, order_id=%order.id, + "Failed to process update order. Error: {e:#}"); + continue; + } }, - }, - Match { - id: Uuid::new_v4(), - order_id: maker_order.id, - quantity: market_order.quantity, - pubkey: maker_order.trader_id, - execution_price: maker_order.price, - matching_fee, - }, - ) - }) - .collect::>(); - - let mut maker_matches = vec![]; - let mut taker_matches = vec![]; + }; - for (mm, taker_match) in matches { - maker_matches.push(mm); - taker_matches.push(taker_match); - } - - Ok(Some(MatchParams { - taker_match: TraderMatchParams { - trader_id: market_order.trader_id, - filled_with: FilledWith { - order_id: market_order.id, - expiry_timestamp, - oracle_pk, - matches: taker_matches, - }, - }, - makers_matches: maker_matches, - })) -} - -/// Sort the provided list of limit [`Order`]s based on the [`Direction`] of the market order to be -/// matched. -/// -/// For matching a market order and limit orders we have to -/// -/// - take the highest rate if the market order is short; and -/// -/// - take the lowest rate if the market order is long. -/// -/// Hence, the orders are sorted accordingly: -/// -/// - If the market order is short, the limit orders are sorted in descending order of -/// price. -/// -/// - If the market order is long, the limit orders are sorted in ascending order of price. -/// -/// Additionally, if two orders have the same price, the one with the earlier `timestamp` takes -/// precedence. -fn sort_orders(mut limit_orders: Vec, market_order_direction: Direction) -> Vec { - limit_orders.sort_by(|a, b| { - if a.price.cmp(&b.price) == Ordering::Equal { - return a.timestamp.cmp(&b.timestamp); - } + if let Some(msg) = msg { + if let Err(e) = tx_orderbook_feed.send(msg) { + tracing::error!("Failed to send orderbook message. Error: {e:#}"); + } + } + } - match market_order_direction { - // Ascending order. - Direction::Long => a.price.cmp(&b.price), - // Descending order. - Direction::Short => b.price.cmp(&a.price), + tracing::warn!("Orderbook channel has been closed."); } }); - limit_orders + Ok(sender) } -impl MatchParams { - fn matches(&self) -> Vec<&TraderMatchParams> { - std::iter::once(&self.taker_match) - .chain(self.makers_matches.iter()) - .collect() - } -} - -impl From<&TradeParams> for TraderMatchParams { - fn from(value: &TradeParams) -> Self { - TraderMatchParams { - trader_id: value.pubkey, - filled_with: value.filled_with.clone(), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use rust_decimal_macros::dec; - use std::str::FromStr; - use time::Duration; - use xxi_node::commons::ContractSymbol; - - #[test] - fn when_short_then_sort_desc() { - let order1 = dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - Default::default(), - Duration::seconds(0), - ); - let order2 = dummy_long_order( - dec!(21_000), - Uuid::new_v4(), - Default::default(), - Duration::seconds(0), - ); - let order3 = dummy_long_order( - dec!(20_500), - Uuid::new_v4(), - Default::default(), - Duration::seconds(0), - ); - - let orders = vec![order3, order1, order2]; - - let orders = sort_orders(orders, Direction::Short); - assert_eq!(orders[0], order2); - assert_eq!(orders[1], order3); - assert_eq!(orders[2], order1); - } - - #[test] - fn when_long_then_sort_asc() { - let order1 = dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - Default::default(), - Duration::seconds(0), - ); - let order2 = dummy_long_order( - dec!(21_000), - Uuid::new_v4(), - Default::default(), - Duration::seconds(0), - ); - let order3 = dummy_long_order( - dec!(20_500), - Uuid::new_v4(), - Default::default(), - Duration::seconds(0), - ); - - let orders = vec![order3, order1, order2]; - - let orders = sort_orders(orders, Direction::Long); - assert_eq!(orders[0], order1); - assert_eq!(orders[1], order3); - assert_eq!(orders[2], order2); - } - - #[test] - fn when_all_same_price_sort_by_id() { - let order1 = dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - Default::default(), - Duration::seconds(0), - ); - let order2 = dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - Default::default(), - Duration::seconds(1), - ); - let order3 = dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - Default::default(), - Duration::seconds(2), - ); - - let orders = vec![order3, order1, order2]; - - let orders = sort_orders(orders, Direction::Long); - assert_eq!(orders[0], order1); - assert_eq!(orders[1], order2); - assert_eq!(orders[2], order3); - - let orders = sort_orders(orders, Direction::Short); - assert_eq!(orders[0], order1); - assert_eq!(orders[1], order2); - assert_eq!(orders[2], order3); - } - - #[test] - fn given_limit_and_market_with_same_amount_then_match() { - let all_orders = vec![ - dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - dec!(100), - Duration::seconds(0), - ), - dummy_long_order( - dec!(21_000), - Uuid::new_v4(), - dec!(200), - Duration::seconds(0), - ), - dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - dec!(300), - Duration::seconds(0), - ), - dummy_long_order( - dec!(22_000), - Uuid::new_v4(), - dec!(400), - Duration::seconds(0), - ), - ]; - - let order = Order { - id: Uuid::new_v4(), - price: Default::default(), - trader_id: PublicKey::from_str( - "027f31ebc5462c1fdce1b737ecff52d37d75dea43ce11c74d25aa297165faa2007", - ) - .unwrap(), - direction: Direction::Short, - leverage: 1.0, - contract_symbol: ContractSymbol::BtcUsd, - quantity: dec!(100), - order_type: OrderType::Market, - timestamp: OffsetDateTime::now_utc(), - expiry: OffsetDateTime::now_utc() + Duration::minutes(1), - order_state: OrderState::Open, - order_reason: OrderReason::Manual, - stable: false, - }; - - let matched_orders = match_order( - &order, - all_orders, - Network::Bitcoin, - get_oracle_public_key(), - Decimal::ZERO, - ) - .unwrap() - .unwrap(); - - assert_eq!(matched_orders.makers_matches.len(), 1); - let maker_matches = matched_orders - .makers_matches - .first() - .unwrap() - .filled_with - .matches - .clone(); - assert_eq!(maker_matches.len(), 1); - assert_eq!(maker_matches.first().unwrap().quantity, dec!(100)); - - assert_eq!(matched_orders.taker_match.filled_with.order_id, order.id); - assert_eq!(matched_orders.taker_match.filled_with.matches.len(), 1); - assert_eq!( - matched_orders - .taker_match - .filled_with - .matches - .first() - .unwrap() - .quantity, - order.quantity - ); - } - - /// This test is for safety reasons only. Once we want multiple matches we should update it - #[test] - fn given_limit_and_market_with_smaller_amount_then_error() { - let order1 = dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - dec!(400), - Duration::seconds(0), - ); - let order2 = dummy_long_order( - dec!(21_000), - Uuid::new_v4(), - dec!(200), - Duration::seconds(0), - ); - let order3 = dummy_long_order( - dec!(22_000), - Uuid::new_v4(), - dec!(100), - Duration::seconds(0), - ); - let order4 = dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - dec!(300), - Duration::seconds(0), - ); - let all_orders = vec![order1, order2, order3, order4]; - - let order = Order { - id: Uuid::new_v4(), - price: Default::default(), - trader_id: PublicKey::from_str( - "027f31ebc5462c1fdce1b737ecff52d37d75dea43ce11c74d25aa297165faa2007", - ) - .unwrap(), - direction: Direction::Short, - leverage: 1.0, - contract_symbol: ContractSymbol::BtcUsd, - quantity: dec!(200), - order_type: OrderType::Market, - timestamp: OffsetDateTime::now_utc(), - expiry: OffsetDateTime::now_utc() + Duration::minutes(1), - order_state: OrderState::Open, - order_reason: OrderReason::Manual, - stable: false, - }; - - assert!(match_order( - &order, - all_orders, - Network::Bitcoin, - get_oracle_public_key(), - Decimal::ZERO, - ) - .is_err()); - } - - #[test] - fn given_long_when_needed_short_direction_then_no_match() { - let all_orders = vec![ - dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - dec!(100), - Duration::seconds(0), - ), - dummy_long_order( - dec!(21_000), - Uuid::new_v4(), - dec!(200), - Duration::seconds(0), - ), - dummy_long_order( - dec!(22_000), - Uuid::new_v4(), - dec!(400), - Duration::seconds(0), - ), - dummy_long_order( - dec!(20_000), - Uuid::new_v4(), - dec!(300), - Duration::seconds(0), - ), - ]; - - let order = Order { - id: Uuid::new_v4(), - price: Default::default(), - trader_id: PublicKey::from_str( - "027f31ebc5462c1fdce1b737ecff52d37d75dea43ce11c74d25aa297165faa2007", - ) - .unwrap(), - direction: Direction::Long, - leverage: 1.0, - contract_symbol: ContractSymbol::BtcUsd, - quantity: dec!(200), - order_type: OrderType::Market, - timestamp: OffsetDateTime::now_utc(), - expiry: OffsetDateTime::now_utc() + Duration::minutes(1), - order_state: OrderState::Open, - order_reason: OrderReason::Manual, - stable: false, - }; - - let matched_orders = match_order( - &order, - all_orders, - Network::Bitcoin, - get_oracle_public_key(), - Decimal::ZERO, - ) - .unwrap(); - - assert!(matched_orders.is_none()); - } +/// Sends a push notification to the user in case of an expiry or liquidation. +async fn notify_user( + notifier: mpsc::Sender, + trader_pubkey: PublicKey, + order_reason: OrderReason, +) -> Result<()> { + let notification = match order_reason { + OrderReason::Expired => Some(NotificationKind::PositionExpired), + OrderReason::TraderLiquidated => Some(NotificationKind::Custom { + title: "Oops, you got liquidated 💸".to_string(), + message: "Open your app to execute the liquidation".to_string(), + }), + OrderReason::CoordinatorLiquidated => Some(NotificationKind::Custom { + title: "Your counterparty got liquidated 💸".to_string(), + message: "Open your app to execute the liquidation".to_string(), + }), + OrderReason::Manual => None, + }; - fn dummy_long_order( - price: Decimal, - id: Uuid, - quantity: Decimal, - timestamp_delay: Duration, - ) -> Order { - Order { - id, - price, - trader_id: PublicKey::from_str( - "027f31ebc5462c1fdce1b737ecff52d37d75dea43ce11c74d25aa297165faa2007", - ) - .unwrap(), - direction: Direction::Long, - leverage: 1.0, - contract_symbol: ContractSymbol::BtcUsd, - quantity, - order_type: OrderType::Limit, - timestamp: OffsetDateTime::now_utc() + timestamp_delay, - expiry: OffsetDateTime::now_utc() + Duration::minutes(1), - order_state: OrderState::Open, - order_reason: OrderReason::Manual, - stable: false, - } + if let Some(notification) = notification { + tracing::info!(%trader_pubkey, ?order_reason, "Notifying trader about match"); + // send user a push notification + notifier + .send(Notification::new(trader_pubkey, notification)) + .await?; } - fn get_oracle_public_key() -> XOnlyPublicKey { - XOnlyPublicKey::from_str("16f88cf7d21e6c0f46bcbc983a4e3b19726c6c98858cc31c83551a88fde171c0") - .unwrap() - } + Ok(()) } diff --git a/coordinator/src/orderbook/websocket.rs b/coordinator/src/orderbook/websocket.rs index 932ad3dbd..4fdf155fc 100644 --- a/coordinator/src/orderbook/websocket.rs +++ b/coordinator/src/orderbook/websocket.rs @@ -2,8 +2,7 @@ use crate::db; use crate::db::user; use crate::message::NewUserMessage; use crate::orderbook::db::orders; -use crate::orderbook::trading::NewOrderMessage; -use crate::orderbook::OrderbookMessage; +use crate::orderbook::trading::OrderbookMessage; use crate::referrals; use crate::routes::AppState; use anyhow::bail; @@ -40,10 +39,9 @@ async fn handle_insert_order( } let _ = state - .trading_sender - .send(NewOrderMessage { + .orderbook_sender + .send(OrderbookMessage::NewOrder { new_order: NewOrder::Limit(order), - channel_opening_params: None, order_reason: OrderReason::Manual, }) .await; @@ -60,10 +58,7 @@ async fn handle_delete_order( state .orderbook_sender - .send(OrderbookMessage::DeleteOrder { - trader_pubkey, - order_id, - }) + .send(OrderbookMessage::DeleteOrder(order_id)) .await?; Ok(()) diff --git a/coordinator/src/routes.rs b/coordinator/src/routes.rs index 99ae532d3..c74cb0806 100644 --- a/coordinator/src/routes.rs +++ b/coordinator/src/routes.rs @@ -12,8 +12,7 @@ use crate::message::NewUserMessage; use crate::message::TraderMessage; use crate::node::Node; use crate::notifications::Notification; -use crate::orderbook::trading::NewOrderMessage; -use crate::orderbook::OrderbookMessage; +use crate::orderbook::trading::OrderbookMessage; use crate::parse_dlc_channel_id; use crate::settings::Settings; use crate::trade::websocket::InternalPositionUpdateMessage; @@ -102,7 +101,6 @@ pub struct AppState { /// A channel used to send messages about position updates pub tx_position_feed: broadcast::Sender, pub tx_user_feed: broadcast::Sender, - pub trading_sender: mpsc::Sender, pub pool: Pool>, pub settings: RwLock, pub exporter: PrometheusExporter, @@ -123,7 +121,6 @@ pub fn router( announcement_addresses: Vec, node_alias: &str, orderbook_sender: mpsc::Sender, - trading_sender: mpsc::Sender, tx_orderbook_feed: broadcast::Sender, tx_position_feed: broadcast::Sender, tx_user_feed: broadcast::Sender, @@ -141,7 +138,6 @@ pub fn router( tx_position_feed, tx_user_feed, orderbook_sender, - trading_sender, exporter, announcement_addresses, node_alias: node_alias.to_string(), diff --git a/coordinator/src/routes/orderbook.rs b/coordinator/src/routes/orderbook.rs index 0ad0d6640..7a47bfc78 100644 --- a/coordinator/src/routes/orderbook.rs +++ b/coordinator/src/routes/orderbook.rs @@ -1,6 +1,7 @@ use crate::check_version::check_version; +use crate::db; use crate::orderbook; -use crate::orderbook::trading::NewOrderMessage; +use crate::orderbook::trading::OrderbookMessage; use crate::orderbook::websocket::websocket_connection; use crate::routes::AppState; use crate::AppError; @@ -16,10 +17,9 @@ use diesel::r2d2::PooledConnection; use diesel::PgConnection; use rust_decimal::Decimal; use std::sync::Arc; -use tokio::sync::broadcast::Sender; +use tokio::task::spawn_blocking; use tracing::instrument; use uuid::Uuid; -use xxi_node::commons::Message; use xxi_node::commons::NewOrder; use xxi_node::commons::NewOrderRequest; use xxi_node::commons::Order; @@ -102,42 +102,49 @@ pub async fn post_order( } } - let message = NewOrderMessage { + if let Some(channel_opening_params) = new_order_request.channel_opening_params { + spawn_blocking({ + let pool = state.pool.clone(); + let order_id = new_order.id(); + move || { + let mut conn = pool.get()?; + db::channel_opening_params::insert(&mut conn, order_id, channel_opening_params)?; + anyhow::Ok(()) + } + }) + .await + .expect("task to complete") + .map_err(|e| { + AppError::InternalServerError(format!("Failed to store channel opening params: {e:#}")) + })?; + } + + let message = OrderbookMessage::NewOrder { new_order, - channel_opening_params: new_order_request.channel_opening_params, order_reason: OrderReason::Manual, }; - state.trading_sender.send(message).await.map_err(|e| { + state.orderbook_sender.send(message).await.map_err(|e| { AppError::InternalServerError(format!("Failed to send new order message: {e:#}")) })?; Ok(()) } -fn update_pricefeed(pricefeed_msg: Message, sender: Sender) { - match sender.send(pricefeed_msg) { - Ok(_) => { - tracing::trace!("Pricefeed updated") - } - Err(error) => { - tracing::warn!("Could not update pricefeed due to '{error}'") - } - } -} - #[instrument(skip_all, err(Debug))] pub async fn delete_order( Path(order_id): Path, State(state): State>, -) -> Result, AppError> { - let mut conn = get_db_connection(&state)?; - let order = orderbook::db::orders::delete(&mut conn, order_id) - .map_err(|e| AppError::InternalServerError(format!("Failed to delete order: {e:#}")))?; - let sender = state.tx_orderbook_feed.clone(); - update_pricefeed(Message::DeleteOrder(order_id), sender); +) -> Result<(), AppError> { + state + .orderbook_sender + .send(OrderbookMessage::DeleteOrder(order_id)) + .await + .map_err(|e| { + AppError::InternalServerError(format!("Failed to send delete order message: {e:#}")) + })?; - Ok(Json(order)) + Ok(()) } pub async fn websocket_handler( diff --git a/coordinator/src/trade/mod.rs b/coordinator/src/trade/mod.rs index e8f61368a..27bf49f39 100644 --- a/coordinator/src/trade/mod.rs +++ b/coordinator/src/trade/mod.rs @@ -20,7 +20,6 @@ use bitcoin::secp256k1::PublicKey; use bitcoin::Amount; use bitcoin::SignedAmount; use diesel::Connection; -use diesel::PgConnection; use dlc_manager::channel::signed_channel::SignedChannel; use dlc_manager::channel::signed_channel::SignedChannelState; use dlc_manager::channel::Channel; @@ -35,6 +34,7 @@ use rust_decimal::prelude::ToPrimitive; use rust_decimal::Decimal; use time::OffsetDateTime; use tokio::sync::mpsc; +use tokio::task::spawn_blocking; use uuid::Uuid; use xxi_node::bitcoin_conversion::to_secp_pk_29; use xxi_node::bitcoin_conversion::to_xonly_pk_29; @@ -43,11 +43,16 @@ use xxi_node::cfd::calculate_margin; use xxi_node::cfd::calculate_pnl; use xxi_node::cfd::calculate_short_liquidation_price; use xxi_node::commons; +use xxi_node::commons::ContractSymbol; use xxi_node::commons::Direction; +use xxi_node::commons::FilledWith; +use xxi_node::commons::Match; use xxi_node::commons::MatchState; +use xxi_node::commons::Matches; use xxi_node::commons::Message; +use xxi_node::commons::Order; +use xxi_node::commons::OrderReason; use xxi_node::commons::OrderState; -use xxi_node::commons::TradeAndChannelParams; use xxi_node::commons::TradeParams; use xxi_node::node::event::NodeEvent; use xxi_node::node::signed_channel_state_name; @@ -93,33 +98,94 @@ enum ResizeAction { }, } -pub struct TradeExecutor { +pub struct ExecutableMatch { + pub order: Order, + pub matches: Vec, +} + +impl ExecutableMatch { + pub fn build_matches(&self) -> Vec { + self.matches + .clone() + .into_iter() + .map(|m| m.into()) + .collect::>() + } + + pub fn sum_quantity(&self) -> Decimal { + self.matches.iter().map(|m| m.quantity).sum() + } +} + +pub fn spawn_trade_executor( + node: Node, + trade_notifier: mpsc::Sender, +) -> Result> { + let trade_executor = TradeExecutor::new(node.clone(), trade_notifier); + + let (sender, mut receiver) = mpsc::channel::(100); + + tokio::spawn({ + let node = node.clone(); + async move { + while let Some(executeable_match) = receiver.recv().await { + let trader_pubkey = executeable_match.order.trader_id; + let order_id = executeable_match.order.id; + let order_reason = executeable_match.order.order_reason; + + if node.inner.is_connected(trader_pubkey) { + tracing::info!(%trader_pubkey, %order_id, ?order_reason, "Executing trade for match"); + trade_executor.execute(executeable_match).await; + } else { + match order_reason { + OrderReason::Manual => { + tracing::warn!(%trader_pubkey, %order_id, ?order_reason, "Skipping trade execution as trader is not connected") + } + OrderReason::Expired + | OrderReason::TraderLiquidated + | OrderReason::CoordinatorLiquidated => { + tracing::info!(%trader_pubkey, %order_id, ?order_reason, "Skipping trade execution as trader is not connected") + } + } + } + } + + tracing::error!("Trade executor channel closed"); + } + }); + + Ok(sender) +} + +struct TradeExecutor { node: Node, notifier: mpsc::Sender, } impl TradeExecutor { - pub fn new(node: Node, notifier: mpsc::Sender) -> Self { + fn new(node: Node, notifier: mpsc::Sender) -> Self { Self { node, notifier } } - pub async fn execute(&self, params: &TradeAndChannelParams) { - let trader_id = params.trade_params.pubkey; - let order_id = params.trade_params.filled_with.order_id; + pub async fn execute(&self, executeable_match: ExecutableMatch) { + let trader_pubkey = executeable_match.order.trader_id; + let order_id = executeable_match.order.id; - match self.execute_internal(params).await { + match self.execute_internal(executeable_match).await { Ok(()) => { tracing::info!( - %trader_id, + %trader_pubkey, %order_id, "Successfully processed match, setting match to Filled" ); + // TODO(holzeis): It's not ideal that the order and match are updated by the trade + // executor. This should rather get updated by the orderbook. if let Err(e) = self.update_order_and_match(order_id, MatchState::Filled, OrderState::Taken) { tracing::error!( - %trader_id, + %trader_pubkey, %order_id, "Failed to update order and match state. Error: {e:#}" ); @@ -130,19 +196,23 @@ impl TradeExecutor { self.node .inner .event_handler - .publish(NodeEvent::SendLastDlcMessage { peer: trader_id }); + .publish(NodeEvent::SendLastDlcMessage { + peer: trader_pubkey, + }); } Err(e) => { - tracing::error!(%trader_id, %order_id,"Failed to execute trade. Error: {e:#}"); + tracing::error!(%trader_pubkey, %order_id,"Failed to execute trade. Error: {e:#}"); + // TODO(holzeis): It's not ideal that the order and match are updated by the trade + // executor. This should rather get updated by the orderbook. if let Err(e) = self.update_order_and_match(order_id, MatchState::Failed, OrderState::Failed) { - tracing::error!(%trader_id, %order_id, "Failed to update order and match: {e}"); + tracing::error!(%trader_pubkey, %order_id, "Failed to update order and match: {e}"); }; let message = TraderMessage { - trader_id, + trader_id: trader_pubkey, message: Message::TradeError { order_id, error: e.into(), @@ -168,13 +238,32 @@ impl TradeExecutor { /// 2. If no position is found, we open a position. /// /// 3. If a position of differing quantity is found, we resize the position. - async fn execute_internal(&self, params: &TradeAndChannelParams) -> Result<()> { - let mut connection = self.node.pool.get()?; + async fn execute_internal(&self, executable_match: ExecutableMatch) -> Result<()> { + let trader_pubkey = executable_match.order.trader_id; + let order_id = executable_match.order.id; + + let expiry_timestamp = + commons::calculate_next_expiry(OffsetDateTime::now_utc(), self.node.inner.network); + + let order = executable_match.order; + + let filled_with = FilledWith { + order_id, + expiry_timestamp, + oracle_pk: self.node.inner.oracle_pubkey, + matches: executable_match.build_matches(), + }; + + let trade_params = TradeParams { + pubkey: trader_pubkey, + contract_symbol: ContractSymbol::BtcUsd, + leverage: order.leverage, + quantity: executable_match.sum_quantity().to_f32().expect("to fit"), + direction: order.direction, + filled_with, + }; - let order_id = params.trade_params.filled_with.order_id; - let trader_id = params.trade_params.pubkey; - let order = - orders::get_with_id(&mut connection, order_id)?.context("Could not find order")?; + let trader_pubkey = trade_params.pubkey; let is_stable_order = order.stable; ensure!( @@ -183,13 +272,13 @@ impl TradeExecutor { ); ensure!( order.order_state == OrderState::Matched, - "Can't execute trade with in invalid state {:?}", + "Can't execute trade with an invalid state {:?}", order.order_state ); - tracing::info!(%trader_id, %order_id, "Executing match"); + tracing::info!(%trader_pubkey, %order_id, "Executing match"); - let trade_action = self.determine_trade_action(&mut connection, params).await?; + let trade_action = self.determine_trade_action(&trade_params).await?; ensure!( matches!(trade_action, TradeAction::ClosePosition { .. }) @@ -199,16 +288,23 @@ impl TradeExecutor { match trade_action { TradeAction::OpenDlcChannel => { - let collateral_reserve_coordinator = params - .coordinator_reserve - .context("Missing coordinator collateral reserve")?; - let collateral_reserve_trader = params - .trader_reserve - .context("Missing trader collateral reserve")?; + let channel_opening_params = spawn_blocking({ + let pool = self.node.pool.clone(); + move || { + let mut conn = pool.get()?; + let params = + db::channel_opening_params::get_by_order_id(&mut conn, order_id)?; + anyhow::Ok(params) + } + }) + .await?? + .context("Missing channel opening params")?; + + let collateral_reserve_coordinator = channel_opening_params.coordinator_reserve; + let collateral_reserve_trader = channel_opening_params.trader_reserve; self.open_dlc_channel( - &mut connection, - ¶ms.trade_params, + &trade_params, collateral_reserve_coordinator, collateral_reserve_trader, is_stable_order, @@ -222,9 +318,8 @@ impl TradeExecutor { counter_payout, } => self .open_position( - &mut connection, channel_id, - ¶ms.trade_params, + &trade_params, own_payout, counter_payout, is_stable_order, @@ -235,13 +330,7 @@ impl TradeExecutor { channel_id, position, } => self - .start_closing_position( - &mut connection, - order, - &position, - ¶ms.trade_params, - channel_id, - ) + .start_closing_position(order, &position, &trade_params, channel_id) .await .with_context(|| format!("Failed to close position {}", position.id))?, TradeAction::ResizePosition { @@ -249,13 +338,7 @@ impl TradeExecutor { position, resize_action, } => self - .resize_position( - &mut connection, - channel_id, - &position, - ¶ms.trade_params, - resize_action, - ) + .resize_position(channel_id, &position, &trade_params, resize_action) .await .with_context(|| format!("Failed to resize position {}", position.id))?, }; @@ -265,7 +348,6 @@ impl TradeExecutor { async fn open_dlc_channel( &self, - conn: &mut PgConnection, trade_params: &TradeParams, collateral_reserve_coordinator: Amount, collateral_reserve_trader: Amount, @@ -397,7 +479,6 @@ impl TradeExecutor { // TODO(holzeis): The position should only get created after the dlc protocol has finished // successfully. self.persist_position( - conn, trade_params, temporary_contract_id, leverage_coordinator, @@ -409,7 +490,6 @@ impl TradeExecutor { async fn open_position( &self, - conn: &mut PgConnection, dlc_channel_id: DlcChannelId, trade_params: &TradeParams, coordinator_dlc_channel_collateral: u64, @@ -565,7 +645,6 @@ impl TradeExecutor { // TODO(holzeis): The position should only get created after the dlc protocol has finished // successfully. self.persist_position( - conn, trade_params, temporary_contract_id, leverage_coordinator, @@ -577,7 +656,6 @@ impl TradeExecutor { async fn resize_position( &self, - conn: &mut PgConnection, dlc_channel_id: DlcChannelId, position: &Position, trade_params: &TradeParams, @@ -735,28 +813,36 @@ impl TradeExecutor { DlcProtocolType::resize_position(trade_params, protocol_id, realized_pnl), )?; - db::positions::Position::set_position_to_resizing( - conn, - peer_id, - temporary_contract_id, - contracts, - coordinator_direction.opposite(), - margin_trader, - margin_coordinator, - average_execution_price, - expiry_timestamp, - coordinator_liquidation_price, - trader_liquidation_price, - realized_pnl, - order_matching_fee, - )?; + spawn_blocking({ + let pool = self.node.pool.clone(); + move || { + let mut conn = pool.get()?; + db::positions::Position::set_position_to_resizing( + &mut conn, + peer_id, + temporary_contract_id, + contracts, + coordinator_direction.opposite(), + margin_trader, + margin_coordinator, + average_execution_price, + expiry_timestamp, + coordinator_liquidation_price, + trader_liquidation_price, + realized_pnl, + order_matching_fee, + )?; + + anyhow::Ok(()) + } + }) + .await??; Ok(()) } async fn persist_position( &self, - connection: &mut PgConnection, trade_params: &TradeParams, temporary_contract_id: ContractId, coordinator_leverage: f32, @@ -811,15 +897,20 @@ impl TradeExecutor { // TODO(holzeis): We should only create the position once the dlc protocol finished // successfully. - db::positions::Position::insert(connection, new_position.clone())?; - - Ok(()) + spawn_blocking({ + let pool = self.node.pool.clone(); + move || { + let mut conn = pool.get()?; + db::positions::Position::insert(&mut conn, new_position.clone())?; + anyhow::Ok(()) + } + }) + .await? } pub async fn start_closing_position( &self, - conn: &mut PgConnection, - order: commons::Order, + order: Order, position: &Position, trade_params: &TradeParams, channel_id: DlcChannelId, @@ -891,11 +982,20 @@ impl TradeExecutor { DlcProtocolType::settle(trade_params, protocol_id), )?; - db::positions::Position::set_open_position_to_closing( - conn, - &position.trader, - Some(closing_price), - )?; + spawn_blocking({ + let pool = self.node.pool.clone(); + let trader_pubkey = position.trader; + move || { + let mut conn = pool.get()?; + db::positions::Position::set_open_position_to_closing( + &mut conn, + &trader_pubkey, + Some(closing_price), + )?; + anyhow::Ok(()) + } + }) + .await??; Ok(()) } @@ -918,17 +1018,13 @@ impl TradeExecutor { .map_err(|e| anyhow!("Failed to update order and match. Error: {e:#}")) } - async fn determine_trade_action( - &self, - connection: &mut PgConnection, - params: &TradeAndChannelParams, - ) -> Result { - let trader_id = params.trade_params.pubkey; + async fn determine_trade_action(&self, trade_params: &TradeParams) -> Result { + let trader_pubkey = trade_params.pubkey; let trade_action = match self .node .inner - .get_signed_dlc_channel_by_counterparty(&trader_id)? + .get_signed_dlc_channel_by_counterparty(&trader_pubkey)? { None => { ensure!( @@ -937,7 +1033,7 @@ impl TradeExecutor { .inner .list_dlc_channels()? .iter() - .filter(|c| c.get_counter_party_id() == to_secp_pk_29(trader_id)) + .filter(|c| c.get_counter_party_id() == to_secp_pk_29(trader_pubkey)) .any(|c| matches!(c, Channel::Offered(_) | Channel::Accepted(_))), "Previous DLC Channel offer still pending." ); @@ -963,14 +1059,21 @@ impl TradeExecutor { channel_id, .. }) => { - let trade_params = ¶ms.trade_params; - - let position = db::positions::Position::get_position_by_trader( - connection, - trader_id, - vec![PositionState::Open], - )? - .context("Failed to find open position")?; + let position = spawn_blocking({ + let pool = self.node.pool.clone(); + move || { + let mut conn = pool.get()?; + let position = db::positions::Position::get_position_by_trader( + &mut conn, + trader_pubkey, + vec![PositionState::Open], + )? + .context("Failed to find open position")?; + + anyhow::Ok(position) + } + }) + .await??; let position_contracts = { let contracts = decimal_from_f32(position.quantity); diff --git a/crates/dev-maker/src/main.rs b/crates/dev-maker/src/main.rs index 321b2674e..f62ec9f68 100644 --- a/crates/dev-maker/src/main.rs +++ b/crates/dev-maker/src/main.rs @@ -113,7 +113,7 @@ async fn post_order( id: uuid, contract_symbol: ContractSymbol::BtcUsd, price, - quantity: Decimal::from(5000), + quantity: Decimal::from(1000), trader_id: public_key, direction, leverage: Decimal::from(2), diff --git a/crates/xxi-node/src/commons/message.rs b/crates/xxi-node/src/commons/message.rs index 05e285b1d..2002ca2d8 100644 --- a/crates/xxi-node/src/commons/message.rs +++ b/crates/xxi-node/src/commons/message.rs @@ -80,7 +80,11 @@ pub enum OrderbookRequest { os: Option, signature: Signature, }, + // TODO(holzeis): This orderbook request seems to be unused and a duplicate to our http + // endpoint. I guess we can remove it. InsertOrder(NewLimitOrder), + // TODO(holzeis): This orderbook request seems to be unused and a duplicate to our http + // endpoint. I guess we can remove it. DeleteOrder(Uuid), } diff --git a/crates/xxi-node/src/commons/order.rs b/crates/xxi-node/src/commons/order.rs index 430fcfa68..66f244cf3 100644 --- a/crates/xxi-node/src/commons/order.rs +++ b/crates/xxi-node/src/commons/order.rs @@ -227,6 +227,13 @@ pub struct Order { pub stable: bool, } +impl Order { + /// Returns true if the order is expired. + pub fn is_expired(&self) -> bool { + OffsetDateTime::now_utc() > self.expiry + } +} + /// Extra information required to open a DLC channel, independent of the [`TradeParams`] associated /// with the filled order. /// diff --git a/crates/xxi-node/src/commons/trade.rs b/crates/xxi-node/src/commons/trade.rs index fbcaf09b3..7a28b6dce 100644 --- a/crates/xxi-node/src/commons/trade.rs +++ b/crates/xxi-node/src/commons/trade.rs @@ -9,15 +9,6 @@ use serde::Serialize; use time::OffsetDateTime; use uuid::Uuid; -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct TradeAndChannelParams { - pub trade_params: TradeParams, - #[serde(with = "bitcoin::amount::serde::as_sat::opt")] - pub trader_reserve: Option, - #[serde(with = "bitcoin::amount::serde::as_sat::opt")] - pub coordinator_reserve: Option, -} - /// The trade parameters defining the trade execution. /// /// Emitted by the orderbook when a match is found. @@ -185,12 +176,14 @@ pub fn average_execution_price(matches: Vec) -> Decimal { sum_quantity / nominal_prices } +#[derive(Clone, Copy)] pub enum MatchState { Pending, Filled, Failed, } +#[derive(Clone, Copy)] pub struct Matches { pub id: Uuid, pub match_state: MatchState,