Skip to content

Commit

Permalink
chore: Separate components into separate modules
Browse files Browse the repository at this point in the history
Before everything was handled by the trading module. Now the following responsibilities have been split up.

- Rollover: Moved to the coordinator component and responsible for proposing a rollover.
- Async Match: Moved into a dedicated component to check if an async match needs to be executed by the app.
- Notification: Responsible for the users and sending messages to them.

Note 1, the websocket communication is still in one component and not separated between coordinator and orderbook. It would have been correct to split that up as well, but the effort and additional complexity was IMHO not worth it.

Note 2, we have multiple commons crates, which are very much related to each ohter. I think we should combine all of those into a single one, to simplify things.
  • Loading branch information
holzeis committed Sep 19, 2023
1 parent 63932dc commit f3fa8db
Show file tree
Hide file tree
Showing 17 changed files with 517 additions and 446 deletions.
15 changes: 14 additions & 1 deletion coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ use coordinator::node;
use coordinator::node::closed_positions;
use coordinator::node::connection;
use coordinator::node::expired_positions;
use coordinator::node::rollover;
use coordinator::node::storage::NodeStorage;
use coordinator::node::unrealized_pnl;
use coordinator::node::Node;
use coordinator::notification;
use coordinator::notification::NewUserMessage;
use coordinator::notification_service::NotificationService;
use coordinator::orderbook::async_match;
use coordinator::orderbook::trading;
use coordinator::routes::router;
use coordinator::run_migration;
Expand Down Expand Up @@ -192,8 +196,16 @@ async fn main() -> Result<()> {
}
});

let (tx_user_feed, _rx) = broadcast::channel::<NewUserMessage>(100);
let (tx_price_feed, _rx) = broadcast::channel(100);
let (_handle, trading_sender) = trading::start(pool.clone(), tx_price_feed.clone());

let (_handle, notifier) = notification::start(tx_user_feed.clone());

let (_handle, trading_sender) =
trading::start(pool.clone(), tx_price_feed.clone(), notifier.clone());

let _handle = async_match::monitor(pool.clone(), tx_user_feed.clone(), notifier.clone());
let _handle = rollover::monitor(pool.clone(), tx_user_feed.clone(), notifier);

tokio::spawn({
let node = node.clone();
Expand Down Expand Up @@ -235,6 +247,7 @@ async fn main() -> Result<()> {
NODE_ALIAS,
trading_sender,
tx_price_feed,
tx_user_feed,
);

let notification_service = NotificationService::new(opts.fcm_api_key);
Expand Down
3 changes: 1 addition & 2 deletions coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ use diesel_migrations::EmbeddedMigrations;
use diesel_migrations::MigrationHarness;
use serde_json::json;

mod rollover;

pub mod admin;
pub mod cli;
pub mod db;
pub mod logger;
pub mod metrics;
pub mod node;
pub mod notification;
pub mod notification_service;
pub mod orderbook;
pub mod position;
Expand Down
1 change: 1 addition & 0 deletions coordinator/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub mod closed_positions;
pub mod connection;
pub mod expired_positions;
pub mod order_matching_fee;
pub mod rollover;
pub mod routing_fees;
pub mod storage;
pub mod unrealized_pnl;
Expand Down
7 changes: 3 additions & 4 deletions coordinator/src/node/expired_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::db;
use crate::node::Node;
use crate::orderbook;
use crate::orderbook::trading::NewOrderMessage;
use crate::orderbook::trading::TradingMessage;
use crate::position::models::Position;
use crate::position::models::PositionState;
use anyhow::Context;
Expand All @@ -22,7 +21,7 @@ use time::Duration;
use time::OffsetDateTime;
use tokio::sync::mpsc;

pub async fn close(node: Node, trading_sender: mpsc::Sender<TradingMessage>) -> Result<()> {
pub async fn close(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>) -> Result<()> {
let mut conn = node.pool.get()?;

let positions = db::positions::Position::get_all_open_positions(&mut conn)
Expand Down Expand Up @@ -93,11 +92,11 @@ pub async fn close(node: Node, trading_sender: mpsc::Sender<TradingMessage>) ->
};

let (sender, mut receiver) = mpsc::channel::<Result<Order>>(1);
let message = TradingMessage::NewOrder(NewOrderMessage {
let message = NewOrderMessage {
new_order: new_order.clone(),
order_reason: OrderReason::Expired,
sender,
});
};

if let Err(e) = trading_sender.send(message).await {
tracing::error!(order_id=%new_order.id, trader_id=%new_order.trader_id, "Failed to submit new order for closing expired position. Error: {e:#}");
Expand Down
74 changes: 73 additions & 1 deletion coordinator/src/rollover.rs → coordinator/src/node/rollover.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
use crate::db;
use crate::db::positions;
use crate::node::Node;
use crate::notification::NewUserMessage;
use crate::notification::Notification;
use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
use bitcoin::hashes::hex::ToHex;
use bitcoin::secp256k1::PublicKey;
use bitcoin::XOnlyPublicKey;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::Pool;
use diesel::PgConnection;
use dlc_manager::contract::contract_input::ContractInput;
use dlc_manager::contract::contract_input::ContractInputInfo;
use dlc_manager::contract::contract_input::OracleInput;
use dlc_manager::contract::Contract;
use dlc_manager::contract::ContractDescriptor;
use dlc_manager::ChannelId;
use futures::future::RemoteHandle;
use futures::FutureExt;
use orderbook_commons::Message;
use std::str::FromStr;
use time::OffsetDateTime;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use trade::ContractSymbol;

#[derive(Debug, Clone)]
Expand All @@ -28,6 +39,67 @@ struct Rollover {
contract_tx_fee_rate: u64,
}

pub fn monitor(
pool: Pool<ConnectionManager<PgConnection>>,
tx_user_feed: broadcast::Sender<NewUserMessage>,
notifier: mpsc::Sender<Notification>,
) -> RemoteHandle<Result<()>> {
let mut user_feed = tx_user_feed.subscribe();
let (fut, remote_handle) = async move {
while let Ok(new_user_msg) = user_feed.recv().await {
tokio::spawn({
let mut conn = pool.get()?;
let notifier = notifier.clone();
async move {
if let Err(e) =
check_if_eligible_for_rollover(&mut conn, notifier, new_user_msg.new_user)
.await
{
tracing::error!("Failed to check if eligible for rollover. Error: {e:#}");
}
}
});
}
Ok(())
}
.remote_handle();

tokio::spawn(fut);

remote_handle
}

async fn check_if_eligible_for_rollover(
conn: &mut PgConnection,
notifier: mpsc::Sender<Notification>,
trader_id: PublicKey,
) -> Result<()> {
tracing::debug!(%trader_id, "Checking if the users positions is eligible for rollover");
if let Some(position) =
positions::Position::get_open_position_by_trader(conn, trader_id.to_string())?
{
if coordinator_commons::is_in_rollover_weekend(position.expiry_timestamp) {
let next_expiry = coordinator_commons::calculate_next_expiry(OffsetDateTime::now_utc());
if position.expiry_timestamp == next_expiry {
tracing::trace!(%trader_id, position_id=position.id, "Position has already been rolled over");
return Ok(());
}

tracing::debug!(%trader_id, position_id=position.id, "Proposing to rollover users position");

let message = Notification::Message {
trader_id,
message: Message::Rollover,
};
if let Err(e) = notifier.send(message).await {
tracing::debug!("Failed to notify trader. Error: {e:#}");
}
}
}

Ok(())
}

impl Rollover {
pub fn new(contract: Contract) -> Result<Self> {
let contract = match contract {
Expand Down Expand Up @@ -81,7 +153,7 @@ impl Rollover {

/// Calculates the maturity time based on the current expiry timestamp.
pub fn maturity_time(&self) -> OffsetDateTime {
orderbook_commons::get_expiry_timestamp(self.expiry_timestamp)
coordinator_commons::calculate_next_expiry(self.expiry_timestamp)
}
}

Expand Down
84 changes: 84 additions & 0 deletions coordinator/src/notification/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use anyhow::Result;
use bitcoin::secp256k1::PublicKey;
use futures::future::RemoteHandle;
use futures::FutureExt;
use orderbook_commons::Message;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use tokio::sync::broadcast;
use tokio::sync::mpsc;

/// This value is arbitrarily set to 100 and defines the message accepted in the notification
/// channel buffer.
const NOTIFICATION_BUFFER_SIZE: usize = 100;

// TODO(holzeis): This enum should be extended to allow for sending push notifications.
pub enum Notification {
Message {
trader_id: PublicKey,
message: Message,
},
}

#[derive(Clone)]
pub struct NewUserMessage {
pub new_user: PublicKey,
pub sender: mpsc::Sender<Message>,
}

pub fn start(
tx_user_feed: broadcast::Sender<NewUserMessage>,
) -> (RemoteHandle<Result<()>>, mpsc::Sender<Notification>) {
let (sender, mut receiver) = mpsc::channel::<Notification>(NOTIFICATION_BUFFER_SIZE);

let authenticated_users = Arc::new(RwLock::new(HashMap::new()));

tokio::task::spawn({
let traders = authenticated_users.clone();
async move {
let mut user_feed = tx_user_feed.subscribe();
while let Ok(new_user_msg) = user_feed.recv().await {
traders
.write()
.expect("RwLock to not be poisoned")
.insert(new_user_msg.new_user, new_user_msg.sender);
}
}
});

let (fut, remote_handle) = {
async move {
while let Some(notification) = receiver.recv().await {
match notification {
Notification::Message { trader_id, message } => {
tracing::info!(%trader_id, "Sending message: {message:?}");

let trader = {
let traders = authenticated_users
.read()
.expect("RwLock to not be poisoned");
traders.get(&trader_id).cloned()
};

match trader {
Some(sender) => {
if let Err(e) = sender.send(message).await {
tracing::warn!("Connection lost to trader {e:#}");
}
}
None => tracing::warn!(%trader_id, "Trader is not connected"),
}
}
}
}

Ok(())
}
.remote_handle()
};

tokio::spawn(fut);

(remote_handle, sender)
}
110 changes: 110 additions & 0 deletions coordinator/src/orderbook/async_match.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use crate::notification::NewUserMessage;
use crate::notification::Notification;
use crate::orderbook::db::matches;
use crate::orderbook::db::orders;
use anyhow::ensure;
use anyhow::Result;
use bitcoin::secp256k1::PublicKey;
use bitcoin::XOnlyPublicKey;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::Pool;
use diesel::PgConnection;
use futures::future::RemoteHandle;
use futures::FutureExt;
use orderbook_commons::FilledWith;
use orderbook_commons::Match;
use orderbook_commons::Matches;
use orderbook_commons::Message;
use orderbook_commons::OrderReason;
use orderbook_commons::OrderState;
use std::str::FromStr;
use time::OffsetDateTime;
use tokio::sync::broadcast;
use tokio::sync::mpsc;

pub fn monitor(
pool: Pool<ConnectionManager<PgConnection>>,
tx_user_feed: broadcast::Sender<NewUserMessage>,
notifier: mpsc::Sender<Notification>,
) -> RemoteHandle<Result<()>> {
let mut user_feed = tx_user_feed.subscribe();
let (fut, remote_handle) = async move {
while let Ok(new_user_msg) = user_feed.recv().await {
tokio::spawn({
let mut conn = pool.get()?;
let notifier = notifier.clone();
async move {
tracing::debug!(trader_id=%new_user_msg.new_user, "Checking if the user needs to be notified about pending matches");
if let Err(e) = process_pending_match(&mut conn, notifier, new_user_msg.new_user).await {
tracing::error!("Failed to process pending match. Error: {e:#}");
}
}
});
}
Ok(())
}.remote_handle();

tokio::spawn(fut);

remote_handle
}

/// Checks if there are any pending matches
async fn process_pending_match(
conn: &mut PgConnection,
notifier: mpsc::Sender<Notification>,
trader_id: PublicKey,
) -> Result<()> {
if let Some(order) = orders::get_by_trader_id_and_state(conn, trader_id, OrderState::Matched)? {
tracing::debug!(%trader_id, order_id=%order.id, "Notifying trader about pending match");

let matches = matches::get_matches_by_order_id(conn, order.id)?;
let filled_with = get_filled_with_from_matches(matches)?;

let message = match order.order_reason {
OrderReason::Manual => Message::Match(filled_with),
OrderReason::Expired => Message::AsyncMatch { order, filled_with },
};

let msg = Notification::Message { trader_id, message };
if let Err(e) = notifier.send(msg).await {
tracing::error!("Failed to send notification. Error: {e:#}");
}
}

Ok(())
}

fn get_filled_with_from_matches(matches: Vec<Matches>) -> Result<FilledWith> {
ensure!(
!matches.is_empty(),
"Need at least one matches record to construct a FilledWith"
);

let order_id = matches
.first()
.expect("to have at least one match")
.order_id;
let oracle_pk = XOnlyPublicKey::from_str(
"16f88cf7d21e6c0f46bcbc983a4e3b19726c6c98858cc31c83551a88fde171c0",
)
.expect("To be a valid pubkey");

let expiry_timestamp = coordinator_commons::calculate_next_expiry(OffsetDateTime::now_utc());

Ok(FilledWith {
order_id,
expiry_timestamp,
oracle_pk,
matches: matches
.iter()
.map(|m| Match {
id: m.id,
order_id: m.order_id,
quantity: m.quantity,
pubkey: m.match_trader_id,
execution_price: m.execution_price,
})
.collect(),
})
}
Loading

0 comments on commit f3fa8db

Please sign in to comment.