Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Rewrite order matching strategy #2539

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ time = { version = "0.3", features = ["serde", "parsing", "std", "formatting", "
tokio = { version = "1", features = ["full", "tracing"] }
tokio-cron-scheduler = { version = "0.9.4" }
tokio-metrics = "0.2.2"
tokio-stream = { version = "0.1.14", features = ["sync"] }
tokio-util = { version = "0.7", features = ["io"] }
toml = "0.8"
tracing = "0.1.37"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
ALTER TABLE trades DROP COLUMN IF EXISTS order_id;

ALTER TABLE trade_params DROP COLUMN IF EXISTS order_id;

ALTER TABLE orders
RENAME COLUMN trader_pubkey TO trader_id;

ALTER TABLE orders
RENAME COLUMN order_id TO trader_order_id;

Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- Your SQL goes here
ALTER TABLE orders
RENAME COLUMN trader_order_id TO order_id;
holzeis marked this conversation as resolved.
Show resolved Hide resolved

ALTER TABLE orders
RENAME COLUMN trader_id TO trader_pubkey;

ALTER TABLE trade_params
ADD COLUMN order_id UUID REFERENCES orders(order_id);

ALTER TABLE trades
ADD COLUMN order_id UUID REFERENCES orders(order_id);
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
ALTER TABLE "matches"
ADD CONSTRAINT matches_order_id_fkey
FOREIGN KEY (order_id)
REFERENCES orders (order_id);

ALTER TABLE "matches"
ADD CONSTRAINT matches_match_order_id_fkey
FOREIGN KEY (match_order_id)
REFERENCES orders (order_id);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE "matches" DROP CONSTRAINT matches_order_id_fkey;
ALTER TABLE "matches" DROP CONSTRAINT matches_match_order_id_fkey;
50 changes: 33 additions & 17 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@ use coordinator::funding_fee::generate_funding_fee_events_periodically;
use coordinator::logger;
use coordinator::message::spawn_delivering_messages_to_authenticated_users;
use coordinator::message::NewUserMessage;
use coordinator::message::TraderSender;
use coordinator::node::expired_positions;
use coordinator::node::liquidated_positions;
use coordinator::node::rollover;
use coordinator::node::storage::NodeStorage;
use coordinator::node::unrealized_pnl;
use coordinator::node::Node;
use coordinator::notifications::NotificationService;
use coordinator::orderbook::async_match;
use coordinator::orderbook::collaborative_revert;
use coordinator::orderbook::match_order;
use coordinator::orderbook::trading;
use coordinator::orderbook::OrderMatchingFeeRate;
use coordinator::routes::router;
use coordinator::run_migration;
use coordinator::scheduler::NotificationScheduler;
Expand All @@ -32,6 +34,8 @@ use diesel::PgConnection;
use lnd_bridge::LndBridge;
use rand::thread_rng;
use rand::RngCore;
use rust_decimal::prelude::FromPrimitive;
use rust_decimal::Decimal;
use std::backtrace::Backtrace;
use std::net::IpAddr;
use std::net::Ipv4Addr;
Expand Down Expand Up @@ -247,28 +251,39 @@ async fn main() -> Result<()> {

let (tx_orderbook_feed, _rx) = broadcast::channel(100);

let (_handle, trading_sender) = trading::start(
node.clone(),
tx_orderbook_feed.clone(),
auth_users_notifier.clone(),
notification_service.get_sender(),
network,
node.inner.oracle_pubkey,
);
let _handle = async_match::monitor(
let order_matching_fee_rate = OrderMatchingFeeRate {
// TODO(holzeis): Split up order matching fee rate into taker and maker fees.
taker: Decimal::from_f32(node.settings.read().await.order_matching_fee_rate)
.expect("to fit"),
maker: Decimal::from_f32(node.settings.read().await.order_matching_fee_rate)
.expect("to fit"),
};

let match_executor = match_order::spawn_match_executor(
node.clone(),
node_event_handler.subscribe(),
order_matching_fee_rate,
TraderSender {
sender: auth_users_notifier.clone(),
},
);

let orderbook_sender = trading::spawn_orderbook(
node.pool.clone(),
notification_service.get_sender(),
match_executor.clone(),
tx_orderbook_feed.clone(),
auth_users_notifier.clone(),
network,
node.inner.oracle_pubkey,
);

let _handle = rollover::monitor(
pool.clone(),
node_event_handler.subscribe(),
notification_service.get_sender(),
network,
node.clone(),
);

let _handle = collaborative_revert::monitor(
pool.clone(),
tx_user_feed.clone(),
Expand All @@ -280,11 +295,12 @@ async fn main() -> Result<()> {

tokio::spawn({
let node = node.clone();
let trading_sender = trading_sender.clone();
let orderbook_sender = orderbook_sender.clone();
async move {
loop {
tokio::time::sleep(EXPIRED_POSITION_SYNC_INTERVAL).await;
if let Err(e) = expired_positions::close(node.clone(), trading_sender.clone()).await
if let Err(e) =
expired_positions::close(node.clone(), orderbook_sender.clone()).await
{
tracing::error!("Failed to close expired positions! Error: {e:#}");
}
Expand All @@ -294,11 +310,11 @@ async fn main() -> Result<()> {

tokio::spawn({
let node = node.clone();
let trading_sender = trading_sender.clone();
let orderbook_sender = orderbook_sender.clone();
async move {
loop {
tokio::time::sleep(LIQUIDATED_POSITION_SYNC_INTERVAL).await;
liquidated_positions::monitor(node.clone(), trading_sender.clone()).await
liquidated_positions::monitor(node.clone(), orderbook_sender.clone()).await
}
}
});
Expand All @@ -310,7 +326,7 @@ async fn main() -> Result<()> {
pool.clone(),
settings.clone(),
NODE_ALIAS,
trading_sender,
orderbook_sender,
tx_orderbook_feed,
tx_position_feed,
tx_user_feed,
Expand Down
6 changes: 3 additions & 3 deletions coordinator/src/collaborative_revert.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::db;
use crate::db::positions::Position;
use crate::message::OrderbookMessage;
use crate::message::TraderMessage;
use crate::node::storage::NodeStorage;
use crate::notifications::NotificationKind;
use crate::position;
Expand Down Expand Up @@ -56,7 +56,7 @@ pub async fn propose_collaborative_revert(
>,
>,
pool: Pool<ConnectionManager<PgConnection>>,
sender: mpsc::Sender<OrderbookMessage>,
sender: mpsc::Sender<TraderMessage>,
channel_id: DlcChannelId,
fee_rate_sats_vb: u64,
trader_amount_sats: u64,
Expand Down Expand Up @@ -130,7 +130,7 @@ pub async fn propose_collaborative_revert(
};

sender
.send(OrderbookMessage::TraderMessage {
.send(TraderMessage {
trader_id: to_secp_pk_30(peer_id),
message: Message::DlcChannelCollaborativeRevert {
channel_id,
Expand Down
3 changes: 3 additions & 0 deletions coordinator/src/db/trade_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub(crate) struct TradeParams {
pub direction: Direction,
pub matching_fee: i64,
pub trader_pnl: Option<i64>,
pub order_id: Option<Uuid>,
}

pub(crate) fn insert(
Expand All @@ -44,6 +45,7 @@ pub(crate) fn insert(
trade_params::average_price.eq(params.average_price),
trade_params::matching_fee.eq(params.matching_fee.to_sat() as i64),
trade_params::trader_pnl_sat.eq(params.trader_pnl.map(|pnl| pnl.to_sat())),
trade_params::order_id.eq(params.order_id),
))
.execute(conn)?;

Expand Down Expand Up @@ -76,6 +78,7 @@ impl From<TradeParams> for dlc_protocol::TradeParams {
direction: commons::Direction::from(value.direction),
matching_fee: Amount::from_sat(value.matching_fee as u64),
trader_pnl: value.trader_pnl.map(SignedAmount::from_sat),
order_id: value.order_id,
}
}
}
8 changes: 6 additions & 2 deletions coordinator/src/db/trades.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use bitcoin::Amount;
use diesel::prelude::*;
use std::str::FromStr;
use time::OffsetDateTime;
use uuid::Uuid;

#[derive(Queryable, Debug, Clone)]
#[diesel(table_name = trades)]
Expand All @@ -22,6 +23,7 @@ struct Trade {
timestamp: OffsetDateTime,
order_matching_fee_sat: i64,
trader_realized_pnl_sat: Option<i64>,
order_id: Option<Uuid>,
}

#[derive(Insertable, Debug, Clone)]
Expand All @@ -36,6 +38,7 @@ struct NewTrade {
average_price: f32,
order_matching_fee_sat: i64,
trader_realized_pnl_sat: Option<i64>,
order_id: Option<Uuid>,
}

pub fn insert(
Expand Down Expand Up @@ -90,6 +93,7 @@ impl From<crate::trade::models::NewTrade> for NewTrade {
average_price: value.average_price,
order_matching_fee_sat: value.order_matching_fee.to_sat() as i64,
trader_realized_pnl_sat: value.trader_realized_pnl_sat,
order_id: value.order_id,
}
}
}
Expand All @@ -100,15 +104,15 @@ impl From<Trade> for crate::trade::models::Trade {
id: value.id,
position_id: value.position_id,
contract_symbol: value.contract_symbol.into(),
trader_pubkey: PublicKey::from_str(value.trader_pubkey.as_str())
.expect("public key to decode"),
trader_pubkey: PublicKey::from_str(value.trader_pubkey.as_str()).expect("to fit"),
quantity: value.quantity,
trader_leverage: value.trader_leverage,
direction: value.direction.into(),
average_price: value.average_price,
timestamp: value.timestamp,
order_matching_fee: Amount::from_sat(value.order_matching_fee_sat as u64),
trader_realized_pnl_sat: value.trader_realized_pnl_sat,
order_id: value.order_id,
}
}
}
6 changes: 6 additions & 0 deletions coordinator/src/dlc_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use rust_decimal::prelude::ToPrimitive;
use rust_decimal::Decimal;
use time::OffsetDateTime;
use tokio::sync::broadcast::Sender;
use uuid::Uuid;
use xxi_node::cfd::calculate_pnl;
use xxi_node::commons;
use xxi_node::commons::Direction;
Expand Down Expand Up @@ -48,6 +49,7 @@ pub struct TradeParams {
pub direction: Direction,
pub matching_fee: Amount,
pub trader_pnl: Option<SignedAmount>,
pub order_id: Option<Uuid>,
}

impl TradeParams {
Expand All @@ -68,6 +70,7 @@ impl TradeParams {
direction: trade_params.direction,
matching_fee: trade_params.order_matching_fee(),
trader_pnl,
order_id: Some(trade_params.filled_with.order_id),
}
}
}
Expand Down Expand Up @@ -519,6 +522,7 @@ impl DlcProtocolExecutor {
average_price: trade_params.average_price,
order_matching_fee,
trader_realized_pnl_sat: Some(trader_realized_pnl_sat),
order_id: trade_params.order_id,
};

db::trades::insert(conn, new_trade)?;
Expand Down Expand Up @@ -573,6 +577,7 @@ impl DlcProtocolExecutor {
average_price: trade_params.average_price,
order_matching_fee,
trader_realized_pnl_sat: None,
order_id: trade_params.order_id,
};

db::trades::insert(conn, new_trade)?;
Expand Down Expand Up @@ -619,6 +624,7 @@ impl DlcProtocolExecutor {
average_price: trade_params.average_price,
order_matching_fee,
trader_realized_pnl_sat: trade_params.trader_pnl.map(|pnl| pnl.to_sat()),
order_id: trade_params.order_id,
};

db::trades::insert(conn, new_trade)?;
Expand Down
8 changes: 4 additions & 4 deletions coordinator/src/funding_fee.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::decimal_from_f32;
use crate::message::OrderbookMessage;
use crate::message::TraderMessage;
use crate::FundingFee;
use anyhow::bail;
use anyhow::Context;
Expand Down Expand Up @@ -71,7 +71,7 @@ pub enum IndexPriceSource {
pub async fn generate_funding_fee_events_periodically(
scheduler: &JobScheduler,
pool: Pool<ConnectionManager<PgConnection>>,
auth_users_notifier: tokio::sync::mpsc::Sender<OrderbookMessage>,
auth_users_notifier: tokio::sync::mpsc::Sender<TraderMessage>,
schedule: String,
index_price_source: IndexPriceSource,
) -> Result<()> {
Expand Down Expand Up @@ -124,7 +124,7 @@ pub async fn generate_funding_fee_events_periodically(
fn generate_funding_fee_events(
pool: &Pool<ConnectionManager<PgConnection>>,
index_price_source: IndexPriceSource,
auth_users_notifier: tokio::sync::mpsc::Sender<OrderbookMessage>,
auth_users_notifier: tokio::sync::mpsc::Sender<TraderMessage>,
) -> Result<()> {
let mut conn = pool.get()?;

Expand Down Expand Up @@ -189,7 +189,7 @@ fn generate_funding_fee_events(
{
block_in_place(|| {
auth_users_notifier
.blocking_send(OrderbookMessage::TraderMessage {
.blocking_send(TraderMessage {
trader_id: position.trader,
message: Message::FundingFeeEvent(xxi_node::FundingFeeEvent {
contract_symbol,
Expand Down
Loading
Loading