Skip to content

Commit

Permalink
feat: Rework order matching strategy
Browse files Browse the repository at this point in the history
Refactors the current trading component into a clearly separated orderbook component and a trade execution component. The linking part is the `ExecutableMatch` which can be derived from the matches stored into the database.

At the moment we assume optimistically that the trade execution will succeed. However, we should consider that a pending match may never get filled or it fails at execution in such a scenario we would need to rollback the matched orders.
  • Loading branch information
holzeis committed May 15, 2024
1 parent 60ceea9 commit 77d9bcc
Show file tree
Hide file tree
Showing 16 changed files with 944 additions and 1,285 deletions.
44 changes: 22 additions & 22 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use coordinator::node::storage::NodeStorage;
use coordinator::node::unrealized_pnl;
use coordinator::node::Node;
use coordinator::notifications::NotificationService;
use coordinator::orderbook;
use coordinator::orderbook::async_match;
use coordinator::orderbook::collaborative_revert;
use coordinator::orderbook::trading;
Expand All @@ -26,12 +25,15 @@ use coordinator::run_migration;
use coordinator::scheduler::NotificationScheduler;
use coordinator::settings::Settings;
use coordinator::storage::CoordinatorTenTenOneStorage;
use coordinator::trade;
use coordinator::trade::websocket::InternalPositionUpdateMessage;
use diesel::r2d2;
use diesel::r2d2::ConnectionManager;
use diesel::PgConnection;
use rand::thread_rng;
use rand::RngCore;
use rust_decimal::prelude::FromPrimitive;
use rust_decimal::Decimal;
use std::backtrace::Backtrace;
use std::net::IpAddr;
use std::net::Ipv4Addr;
Expand Down Expand Up @@ -257,24 +259,22 @@ async fn main() -> Result<()> {

let (tx_orderbook_feed, _rx) = broadcast::channel(100);

let orderbook_sender =
orderbook::spawn_orderbook(node.pool.clone(), tx_orderbook_feed.clone())?;
let trade_executor = trade::spawn_trade_executor(node.clone(), auth_users_notifier.clone())?;

let (_handle, trading_sender) = trading::start(
node.clone(),
orderbook_sender.clone(),
auth_users_notifier.clone(),
let order_matching_fee_rate =
Decimal::from_f32(node.settings.read().await.order_matching_fee_rate).expect("to fit");

let orderbook_sender = trading::spawn_orderbook(
node.pool.clone(),
notification_service.get_sender(),
network,
node.inner.oracle_pubkey,
);
let _handle = async_match::monitor(
node.clone(),
node_event_handler.subscribe(),
auth_users_notifier.clone(),
network,
node.inner.oracle_pubkey,
);
trade_executor.clone(),
tx_orderbook_feed.clone(),
order_matching_fee_rate,
)?;

let _handle =
async_match::monitor(node.clone(), node_event_handler.subscribe(), trade_executor);

let _handle = rollover::monitor(
pool.clone(),
node_event_handler.subscribe(),
Expand All @@ -293,11 +293,12 @@ async fn main() -> Result<()> {

tokio::spawn({
let node = node.clone();
let trading_sender = trading_sender.clone();
let orderbook_sender = orderbook_sender.clone();
async move {
loop {
tokio::time::sleep(EXPIRED_POSITION_SYNC_INTERVAL).await;
if let Err(e) = expired_positions::close(node.clone(), trading_sender.clone()).await
if let Err(e) =
expired_positions::close(node.clone(), orderbook_sender.clone()).await
{
tracing::error!("Failed to close expired positions! Error: {e:#}");
}
Expand All @@ -307,11 +308,11 @@ async fn main() -> Result<()> {

tokio::spawn({
let node = node.clone();
let trading_sender = trading_sender.clone();
let orderbook_sender = orderbook_sender.clone();
async move {
loop {
tokio::time::sleep(LIQUIDATED_POSITION_SYNC_INTERVAL).await;
liquidated_positions::monitor(node.clone(), trading_sender.clone()).await
liquidated_positions::monitor(node.clone(), orderbook_sender.clone()).await
}
}
});
Expand All @@ -326,7 +327,6 @@ async fn main() -> Result<()> {
opts.p2p_announcement_addresses(),
NODE_ALIAS,
orderbook_sender,
trading_sender,
tx_orderbook_feed,
tx_position_feed,
tx_user_feed,
Expand Down
12 changes: 6 additions & 6 deletions coordinator/src/node/expired_positions.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::db;
use crate::node::Node;
use crate::orderbook;
use crate::orderbook::trading::NewOrderMessage;
use crate::orderbook::trading::OrderbookMessage;
use crate::position::models::Position;
use crate::position::models::PositionState;
use anyhow::Context;
Expand All @@ -24,7 +24,7 @@ use xxi_node::commons::OrderState;
/// not be larger than our refund transaction time lock.
pub const EXPIRED_POSITION_TIMEOUT: Duration = Duration::days(7);

pub async fn close(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>) -> Result<()> {
pub async fn close(node: Node, orderbook_sender: mpsc::Sender<OrderbookMessage>) -> Result<()> {
let mut conn = node.pool.get()?;

let positions = db::positions::Position::get_all_open_positions(&mut conn)
Expand All @@ -49,8 +49,9 @@ pub async fn close(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>) ->

if order.expiry < OffsetDateTime::now_utc() {
tracing::warn!(trader_id, order_id, "Matched order expired! Giving up on that position, looks like the corresponding dlc channel has to get force closed.");
// TODO(holzeis): It's not ideal that the order and match are updated by the trade
// executor. This should rather get updated by the orderbook.
orderbook::db::orders::set_order_state(&mut conn, order.id, OrderState::Expired)?;

orderbook::db::matches::set_match_state_by_order_id(
&mut conn,
order.id,
Expand Down Expand Up @@ -90,13 +91,12 @@ pub async fn close(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>) ->
stable: position.stable,
};

let message = NewOrderMessage {
let message = OrderbookMessage::NewOrder {
new_order: NewOrder::Market(new_order),
channel_opening_params: None,
order_reason: OrderReason::Expired,
};

if let Err(e) = trading_sender.send(message).await {
if let Err(e) = orderbook_sender.send(message).await {
tracing::error!(%trader_pubkey, %order_id, "Failed to submit new order for closing expired position. Error: {e:#}");
continue;
}
Expand Down
13 changes: 6 additions & 7 deletions coordinator/src/node/liquidated_positions.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::db;
use crate::node::Node;
use crate::orderbook;
use crate::orderbook::trading::NewOrderMessage;
use crate::orderbook::trading::OrderbookMessage;
use anyhow::Result;
use rust_decimal::prelude::FromPrimitive;
use rust_decimal::Decimal;
Expand All @@ -24,9 +24,9 @@ use xxi_node::commons::OrderState;
/// should not be larger than our refund transaction time lock.
pub const LIQUIDATION_POSITION_TIMEOUT: Duration = Duration::days(7);

pub async fn monitor(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>) {
pub async fn monitor(node: Node, orderbook_sender: mpsc::Sender<OrderbookMessage>) {
if let Err(e) =
check_if_positions_need_to_get_liquidated(trading_sender.clone(), node.clone()).await
check_if_positions_need_to_get_liquidated(orderbook_sender.clone(), node.clone()).await
{
tracing::error!("Failed to check if positions need to get liquidated. Error: {e:#}");
}
Expand All @@ -35,7 +35,7 @@ pub async fn monitor(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>)
/// For all open positions, check if the maintenance margin has been reached. Send a liquidation
/// async match to the traders whose positions have been liquidated.
async fn check_if_positions_need_to_get_liquidated(
trading_sender: mpsc::Sender<NewOrderMessage>,
orderbook_sender: mpsc::Sender<OrderbookMessage>,
node: Node,
) -> Result<()> {
let mut conn = node.pool.get()?;
Expand Down Expand Up @@ -142,13 +142,12 @@ async fn check_if_positions_need_to_get_liquidated(
false => OrderReason::CoordinatorLiquidated,
};

let message = NewOrderMessage {
let message = OrderbookMessage::NewOrder {
new_order: NewOrder::Market(new_order),
channel_opening_params: None,
order_reason,
};

if let Err(e) = trading_sender.send(message).await {
if let Err(e) = orderbook_sender.send(message).await {
tracing::error!(%trader_pubkey, %order_id, "Failed to submit new order for closing liquidated position. Error: {e:#}");
continue;
}
Expand Down
101 changes: 17 additions & 84 deletions coordinator/src/orderbook/async_match.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,38 @@
use crate::check_version::check_version;
use crate::db;
use crate::message::TraderMessage;
use crate::node::Node;
use crate::orderbook::db::matches;
use crate::orderbook::db::orders;
use crate::trade::TradeExecutor;
use anyhow::ensure;
use crate::trade::ExecutableMatch;
use anyhow::Result;
use bitcoin::secp256k1::PublicKey;
use bitcoin::secp256k1::XOnlyPublicKey;
use bitcoin::Network;
use futures::future::RemoteHandle;
use futures::FutureExt;
use rust_decimal::prelude::ToPrimitive;
use time::OffsetDateTime;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc;
use tokio::task::spawn_blocking;
use xxi_node::commons;
use xxi_node::commons::ContractSymbol;
use xxi_node::commons::FilledWith;
use xxi_node::commons::Match;
use xxi_node::commons::Matches;
use xxi_node::commons::OrderState;
use xxi_node::commons::TradeAndChannelParams;
use xxi_node::commons::TradeParams;
use xxi_node::node::event::NodeEvent;

pub fn monitor(
node: Node,
mut receiver: broadcast::Receiver<NodeEvent>,
notifier: mpsc::Sender<TraderMessage>,
network: Network,
oracle_pk: XOnlyPublicKey,
trade_executor: mpsc::Sender<ExecutableMatch>,
) -> RemoteHandle<()> {
let (fut, remote_handle) = async move {
loop {
match receiver.recv().await {
Ok(NodeEvent::Connected { peer: trader_id }) => {
tokio::spawn({
let notifier = notifier.clone();
let node = node.clone();
let trade_executor = trade_executor.clone();
async move {
tracing::debug!(
%trader_id,
"Checking if the user needs to be notified about pending matches"
);
if let Err(e) =
process_pending_match(node, notifier, trader_id, network, oracle_pk)
.await
process_pending_match(node, trade_executor, trader_id).await
{
tracing::error!("Failed to process pending match. Error: {e:#}");
}
Expand Down Expand Up @@ -77,10 +60,8 @@ pub fn monitor(
/// Checks if there are any pending matches
async fn process_pending_match(
node: Node,
notifier: mpsc::Sender<TraderMessage>,
trader_id: PublicKey,
network: Network,
oracle_pk: XOnlyPublicKey,
trade_executor: mpsc::Sender<ExecutableMatch>,
trader_pubkey: PublicKey,
) -> Result<()> {
let mut conn = spawn_blocking({
let node = node.clone();
Expand All @@ -89,74 +70,26 @@ async fn process_pending_match(
.await
.expect("task to complete")?;

if check_version(&mut conn, &trader_id).is_err() {
tracing::info!(%trader_id, "User is not on the latest version. Skipping check if user needs to be informed about pending matches.");
if check_version(&mut conn, &trader_pubkey).is_err() {
tracing::info!(%trader_pubkey, "User is not on the latest version. Skipping check if user needs to be informed about pending matches.");
return Ok(());
}

if let Some(order) =
orders::get_by_trader_id_and_state(&mut conn, trader_id, OrderState::Matched)?
orders::get_by_trader_id_and_state(&mut conn, trader_pubkey, OrderState::Matched)?
{
tracing::debug!(%trader_id, order_id=%order.id, "Executing pending match");
tracing::debug!(%trader_pubkey, order_id=%order.id, "Executing pending match");

let matches = matches::get_matches_by_order_id(&mut conn, order.id)?;
let filled_with = get_filled_with_from_matches(matches, network, oracle_pk)?;

let channel_opening_params =
db::channel_opening_params::get_by_order_id(&mut conn, order.id)?;

tracing::info!(trader_id = %order.trader_id, order_id = %order.id, order_reason = ?order.order_reason, "Executing trade for match");
let trade_executor = TradeExecutor::new(node, notifier);
trade_executor
.execute(&TradeAndChannelParams {
trade_params: TradeParams {
pubkey: trader_id,
contract_symbol: ContractSymbol::BtcUsd,
leverage: order.leverage,
quantity: order.quantity.to_f32().expect("to fit into f32"),
direction: order.direction,
filled_with,
},
trader_reserve: channel_opening_params.map(|c| c.trader_reserve),
coordinator_reserve: channel_opening_params.map(|c| c.coordinator_reserve),
})
.await;
tracing::info!(%trader_pubkey, order_id = %order.id, order_reason = ?order.order_reason, "Executing trade for match");
if let Err(e) = trade_executor
.send(ExecutableMatch { order, matches })
.await
{
tracing::error!(%trader_pubkey, order_id= %order.id, "Failed to execute trade. Error: {e:#}");
}
}

Ok(())
}

fn get_filled_with_from_matches(
matches: Vec<Matches>,
network: Network,
oracle_pk: XOnlyPublicKey,
) -> Result<FilledWith> {
ensure!(
!matches.is_empty(),
"Need at least one matches record to construct a FilledWith"
);

let order_id = matches
.first()
.expect("to have at least one match")
.order_id;

let expiry_timestamp = commons::calculate_next_expiry(OffsetDateTime::now_utc(), network);

Ok(FilledWith {
order_id,
expiry_timestamp,
oracle_pk,
matches: matches
.iter()
.map(|m| Match {
id: m.id,
order_id: m.order_id,
quantity: m.quantity,
pubkey: m.match_trader_id,
execution_price: m.execution_price,
matching_fee: m.matching_fee,
})
.collect(),
})
}
Loading

0 comments on commit 77d9bcc

Please sign in to comment.