From f3fa8db5d7b31cc2466867e8845ca63ca8596f66 Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Sun, 17 Sep 2023 12:03:46 +0200 Subject: [PATCH] chore: Separate components into separate modules Before everything was handled by the trading module. Now the following responsibilities have been split up. - Rollover: Moved to the coordinator component and responsible for proposing a rollover. - Async Match: Moved into a dedicated component to check if an async match needs to be executed by the app. - Notification: Responsible for the users and sending messages to them. Note 1, the websocket communication is still in one component and not separated between coordinator and orderbook. It would have been correct to split that up as well, but the effort and additional complexity was IMHO not worth it. Note 2, we have multiple commons crates, which are very much related to each ohter. I think we should combine all of those into a single one, to simplify things. --- coordinator/src/bin/coordinator.rs | 15 +- coordinator/src/lib.rs | 3 +- coordinator/src/node.rs | 1 + coordinator/src/node/expired_positions.rs | 7 +- coordinator/src/{ => node}/rollover.rs | 74 +++++- coordinator/src/notification/mod.rs | 84 +++++++ coordinator/src/orderbook/async_match.rs | 110 +++++++++ coordinator/src/orderbook/mod.rs | 1 + coordinator/src/orderbook/routes.rs | 13 +- coordinator/src/orderbook/trading.rs | 259 +++----------------- coordinator/src/orderbook/websocket.rs | 31 ++- coordinator/src/routes.rs | 16 +- crates/coordinator-commons/src/lib.rs | 142 +++++++++++ crates/orderbook-commons/src/lib.rs | 183 +------------- crates/tests-e2e/tests/rollover_position.rs | 2 +- mobile/native/src/api.rs | 4 +- mobile/native/src/orderbook.rs | 18 +- 17 files changed, 517 insertions(+), 446 deletions(-) rename coordinator/src/{ => node}/rollover.rs (83%) create mode 100644 coordinator/src/notification/mod.rs create mode 100644 coordinator/src/orderbook/async_match.rs diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index 68cce2c7f..a2e290ad5 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -8,10 +8,14 @@ use coordinator::node; use coordinator::node::closed_positions; use coordinator::node::connection; use coordinator::node::expired_positions; +use coordinator::node::rollover; use coordinator::node::storage::NodeStorage; use coordinator::node::unrealized_pnl; use coordinator::node::Node; +use coordinator::notification; +use coordinator::notification::NewUserMessage; use coordinator::notification_service::NotificationService; +use coordinator::orderbook::async_match; use coordinator::orderbook::trading; use coordinator::routes::router; use coordinator::run_migration; @@ -192,8 +196,16 @@ async fn main() -> Result<()> { } }); + let (tx_user_feed, _rx) = broadcast::channel::(100); let (tx_price_feed, _rx) = broadcast::channel(100); - let (_handle, trading_sender) = trading::start(pool.clone(), tx_price_feed.clone()); + + let (_handle, notifier) = notification::start(tx_user_feed.clone()); + + let (_handle, trading_sender) = + trading::start(pool.clone(), tx_price_feed.clone(), notifier.clone()); + + let _handle = async_match::monitor(pool.clone(), tx_user_feed.clone(), notifier.clone()); + let _handle = rollover::monitor(pool.clone(), tx_user_feed.clone(), notifier); tokio::spawn({ let node = node.clone(); @@ -235,6 +247,7 @@ async fn main() -> Result<()> { NODE_ALIAS, trading_sender, tx_price_feed, + tx_user_feed, ); let notification_service = NotificationService::new(opts.fcm_api_key); diff --git a/coordinator/src/lib.rs b/coordinator/src/lib.rs index 7e652db59..1fad8a83d 100644 --- a/coordinator/src/lib.rs +++ b/coordinator/src/lib.rs @@ -8,14 +8,13 @@ use diesel_migrations::EmbeddedMigrations; use diesel_migrations::MigrationHarness; use serde_json::json; -mod rollover; - pub mod admin; pub mod cli; pub mod db; pub mod logger; pub mod metrics; pub mod node; +pub mod notification; pub mod notification_service; pub mod orderbook; pub mod position; diff --git a/coordinator/src/node.rs b/coordinator/src/node.rs index bff81487d..6fd82c905 100644 --- a/coordinator/src/node.rs +++ b/coordinator/src/node.rs @@ -62,6 +62,7 @@ pub mod closed_positions; pub mod connection; pub mod expired_positions; pub mod order_matching_fee; +pub mod rollover; pub mod routing_fees; pub mod storage; pub mod unrealized_pnl; diff --git a/coordinator/src/node/expired_positions.rs b/coordinator/src/node/expired_positions.rs index 56762eb0d..e669e1fbc 100644 --- a/coordinator/src/node/expired_positions.rs +++ b/coordinator/src/node/expired_positions.rs @@ -2,7 +2,6 @@ use crate::db; use crate::node::Node; use crate::orderbook; use crate::orderbook::trading::NewOrderMessage; -use crate::orderbook::trading::TradingMessage; use crate::position::models::Position; use crate::position::models::PositionState; use anyhow::Context; @@ -22,7 +21,7 @@ use time::Duration; use time::OffsetDateTime; use tokio::sync::mpsc; -pub async fn close(node: Node, trading_sender: mpsc::Sender) -> Result<()> { +pub async fn close(node: Node, trading_sender: mpsc::Sender) -> Result<()> { let mut conn = node.pool.get()?; let positions = db::positions::Position::get_all_open_positions(&mut conn) @@ -93,11 +92,11 @@ pub async fn close(node: Node, trading_sender: mpsc::Sender) -> }; let (sender, mut receiver) = mpsc::channel::>(1); - let message = TradingMessage::NewOrder(NewOrderMessage { + let message = NewOrderMessage { new_order: new_order.clone(), order_reason: OrderReason::Expired, sender, - }); + }; if let Err(e) = trading_sender.send(message).await { tracing::error!(order_id=%new_order.id, trader_id=%new_order.trader_id, "Failed to submit new order for closing expired position. Error: {e:#}"); diff --git a/coordinator/src/rollover.rs b/coordinator/src/node/rollover.rs similarity index 83% rename from coordinator/src/rollover.rs rename to coordinator/src/node/rollover.rs index a95e6322b..42d1b2415 100644 --- a/coordinator/src/rollover.rs +++ b/coordinator/src/node/rollover.rs @@ -1,19 +1,30 @@ use crate::db; +use crate::db::positions; use crate::node::Node; +use crate::notification::NewUserMessage; +use crate::notification::Notification; use anyhow::bail; use anyhow::Context; use anyhow::Result; use bitcoin::hashes::hex::ToHex; use bitcoin::secp256k1::PublicKey; use bitcoin::XOnlyPublicKey; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; +use diesel::PgConnection; use dlc_manager::contract::contract_input::ContractInput; use dlc_manager::contract::contract_input::ContractInputInfo; use dlc_manager::contract::contract_input::OracleInput; use dlc_manager::contract::Contract; use dlc_manager::contract::ContractDescriptor; use dlc_manager::ChannelId; +use futures::future::RemoteHandle; +use futures::FutureExt; +use orderbook_commons::Message; use std::str::FromStr; use time::OffsetDateTime; +use tokio::sync::broadcast; +use tokio::sync::mpsc; use trade::ContractSymbol; #[derive(Debug, Clone)] @@ -28,6 +39,67 @@ struct Rollover { contract_tx_fee_rate: u64, } +pub fn monitor( + pool: Pool>, + tx_user_feed: broadcast::Sender, + notifier: mpsc::Sender, +) -> RemoteHandle> { + let mut user_feed = tx_user_feed.subscribe(); + let (fut, remote_handle) = async move { + while let Ok(new_user_msg) = user_feed.recv().await { + tokio::spawn({ + let mut conn = pool.get()?; + let notifier = notifier.clone(); + async move { + if let Err(e) = + check_if_eligible_for_rollover(&mut conn, notifier, new_user_msg.new_user) + .await + { + tracing::error!("Failed to check if eligible for rollover. Error: {e:#}"); + } + } + }); + } + Ok(()) + } + .remote_handle(); + + tokio::spawn(fut); + + remote_handle +} + +async fn check_if_eligible_for_rollover( + conn: &mut PgConnection, + notifier: mpsc::Sender, + trader_id: PublicKey, +) -> Result<()> { + tracing::debug!(%trader_id, "Checking if the users positions is eligible for rollover"); + if let Some(position) = + positions::Position::get_open_position_by_trader(conn, trader_id.to_string())? + { + if coordinator_commons::is_in_rollover_weekend(position.expiry_timestamp) { + let next_expiry = coordinator_commons::calculate_next_expiry(OffsetDateTime::now_utc()); + if position.expiry_timestamp == next_expiry { + tracing::trace!(%trader_id, position_id=position.id, "Position has already been rolled over"); + return Ok(()); + } + + tracing::debug!(%trader_id, position_id=position.id, "Proposing to rollover users position"); + + let message = Notification::Message { + trader_id, + message: Message::Rollover, + }; + if let Err(e) = notifier.send(message).await { + tracing::debug!("Failed to notify trader. Error: {e:#}"); + } + } + } + + Ok(()) +} + impl Rollover { pub fn new(contract: Contract) -> Result { let contract = match contract { @@ -81,7 +153,7 @@ impl Rollover { /// Calculates the maturity time based on the current expiry timestamp. pub fn maturity_time(&self) -> OffsetDateTime { - orderbook_commons::get_expiry_timestamp(self.expiry_timestamp) + coordinator_commons::calculate_next_expiry(self.expiry_timestamp) } } diff --git a/coordinator/src/notification/mod.rs b/coordinator/src/notification/mod.rs new file mode 100644 index 000000000..2e5092b4d --- /dev/null +++ b/coordinator/src/notification/mod.rs @@ -0,0 +1,84 @@ +use anyhow::Result; +use bitcoin::secp256k1::PublicKey; +use futures::future::RemoteHandle; +use futures::FutureExt; +use orderbook_commons::Message; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::RwLock; +use tokio::sync::broadcast; +use tokio::sync::mpsc; + +/// This value is arbitrarily set to 100 and defines the message accepted in the notification +/// channel buffer. +const NOTIFICATION_BUFFER_SIZE: usize = 100; + +// TODO(holzeis): This enum should be extended to allow for sending push notifications. +pub enum Notification { + Message { + trader_id: PublicKey, + message: Message, + }, +} + +#[derive(Clone)] +pub struct NewUserMessage { + pub new_user: PublicKey, + pub sender: mpsc::Sender, +} + +pub fn start( + tx_user_feed: broadcast::Sender, +) -> (RemoteHandle>, mpsc::Sender) { + let (sender, mut receiver) = mpsc::channel::(NOTIFICATION_BUFFER_SIZE); + + let authenticated_users = Arc::new(RwLock::new(HashMap::new())); + + tokio::task::spawn({ + let traders = authenticated_users.clone(); + async move { + let mut user_feed = tx_user_feed.subscribe(); + while let Ok(new_user_msg) = user_feed.recv().await { + traders + .write() + .expect("RwLock to not be poisoned") + .insert(new_user_msg.new_user, new_user_msg.sender); + } + } + }); + + let (fut, remote_handle) = { + async move { + while let Some(notification) = receiver.recv().await { + match notification { + Notification::Message { trader_id, message } => { + tracing::info!(%trader_id, "Sending message: {message:?}"); + + let trader = { + let traders = authenticated_users + .read() + .expect("RwLock to not be poisoned"); + traders.get(&trader_id).cloned() + }; + + match trader { + Some(sender) => { + if let Err(e) = sender.send(message).await { + tracing::warn!("Connection lost to trader {e:#}"); + } + } + None => tracing::warn!(%trader_id, "Trader is not connected"), + } + } + } + } + + Ok(()) + } + .remote_handle() + }; + + tokio::spawn(fut); + + (remote_handle, sender) +} diff --git a/coordinator/src/orderbook/async_match.rs b/coordinator/src/orderbook/async_match.rs new file mode 100644 index 000000000..9a04e1d54 --- /dev/null +++ b/coordinator/src/orderbook/async_match.rs @@ -0,0 +1,110 @@ +use crate::notification::NewUserMessage; +use crate::notification::Notification; +use crate::orderbook::db::matches; +use crate::orderbook::db::orders; +use anyhow::ensure; +use anyhow::Result; +use bitcoin::secp256k1::PublicKey; +use bitcoin::XOnlyPublicKey; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; +use diesel::PgConnection; +use futures::future::RemoteHandle; +use futures::FutureExt; +use orderbook_commons::FilledWith; +use orderbook_commons::Match; +use orderbook_commons::Matches; +use orderbook_commons::Message; +use orderbook_commons::OrderReason; +use orderbook_commons::OrderState; +use std::str::FromStr; +use time::OffsetDateTime; +use tokio::sync::broadcast; +use tokio::sync::mpsc; + +pub fn monitor( + pool: Pool>, + tx_user_feed: broadcast::Sender, + notifier: mpsc::Sender, +) -> RemoteHandle> { + let mut user_feed = tx_user_feed.subscribe(); + let (fut, remote_handle) = async move { + while let Ok(new_user_msg) = user_feed.recv().await { + tokio::spawn({ + let mut conn = pool.get()?; + let notifier = notifier.clone(); + async move { + tracing::debug!(trader_id=%new_user_msg.new_user, "Checking if the user needs to be notified about pending matches"); + if let Err(e) = process_pending_match(&mut conn, notifier, new_user_msg.new_user).await { + tracing::error!("Failed to process pending match. Error: {e:#}"); + } + } + }); + } + Ok(()) + }.remote_handle(); + + tokio::spawn(fut); + + remote_handle +} + +/// Checks if there are any pending matches +async fn process_pending_match( + conn: &mut PgConnection, + notifier: mpsc::Sender, + trader_id: PublicKey, +) -> Result<()> { + if let Some(order) = orders::get_by_trader_id_and_state(conn, trader_id, OrderState::Matched)? { + tracing::debug!(%trader_id, order_id=%order.id, "Notifying trader about pending match"); + + let matches = matches::get_matches_by_order_id(conn, order.id)?; + let filled_with = get_filled_with_from_matches(matches)?; + + let message = match order.order_reason { + OrderReason::Manual => Message::Match(filled_with), + OrderReason::Expired => Message::AsyncMatch { order, filled_with }, + }; + + let msg = Notification::Message { trader_id, message }; + if let Err(e) = notifier.send(msg).await { + tracing::error!("Failed to send notification. Error: {e:#}"); + } + } + + Ok(()) +} + +fn get_filled_with_from_matches(matches: Vec) -> Result { + ensure!( + !matches.is_empty(), + "Need at least one matches record to construct a FilledWith" + ); + + let order_id = matches + .first() + .expect("to have at least one match") + .order_id; + let oracle_pk = XOnlyPublicKey::from_str( + "16f88cf7d21e6c0f46bcbc983a4e3b19726c6c98858cc31c83551a88fde171c0", + ) + .expect("To be a valid pubkey"); + + let expiry_timestamp = coordinator_commons::calculate_next_expiry(OffsetDateTime::now_utc()); + + Ok(FilledWith { + order_id, + expiry_timestamp, + oracle_pk, + matches: matches + .iter() + .map(|m| Match { + id: m.id, + order_id: m.order_id, + quantity: m.quantity, + pubkey: m.match_trader_id, + execution_price: m.execution_price, + }) + .collect(), + }) +} diff --git a/coordinator/src/orderbook/mod.rs b/coordinator/src/orderbook/mod.rs index 7ab6837db..79026b3e6 100644 --- a/coordinator/src/orderbook/mod.rs +++ b/coordinator/src/orderbook/mod.rs @@ -1,3 +1,4 @@ +pub mod async_match; pub mod db; pub mod routes; pub mod trading; diff --git a/coordinator/src/orderbook/routes.rs b/coordinator/src/orderbook/routes.rs index 3e90247a6..83f636af5 100644 --- a/coordinator/src/orderbook/routes.rs +++ b/coordinator/src/orderbook/routes.rs @@ -1,7 +1,6 @@ use crate::orderbook; use crate::orderbook::trading::NewOrderMessage; use crate::orderbook::trading::TradingError; -use crate::orderbook::trading::TradingMessage; use crate::orderbook::websocket::websocket_connection; use crate::routes::AppState; use crate::AppError; @@ -16,10 +15,10 @@ use axum::Json; use diesel::r2d2::ConnectionManager; use diesel::r2d2::PooledConnection; use diesel::PgConnection; +use orderbook_commons::Message; use orderbook_commons::NewOrder; use orderbook_commons::Order; use orderbook_commons::OrderReason; -use orderbook_commons::OrderbookMsg; use serde::de; use serde::Deserialize; use serde::Deserializer; @@ -94,11 +93,11 @@ pub async fn post_order( ) -> Result, AppError> { let (sender, mut receiver) = mpsc::channel::>(1); - let message = TradingMessage::NewOrder(NewOrderMessage { + let message = NewOrderMessage { new_order, order_reason: OrderReason::Manual, sender, - }); + }; state.trading_sender.send(message).await.map_err(|e| { AppError::InternalServerError(format!("Failed to send new order message: {e:#}")) })?; @@ -118,7 +117,7 @@ pub async fn post_order( Ok(Json(order)) } -fn update_pricefeed(pricefeed_msg: OrderbookMsg, sender: Sender) { +fn update_pricefeed(pricefeed_msg: Message, sender: Sender) { match sender.send(pricefeed_msg) { Ok(_) => { tracing::trace!("Pricefeed updated") @@ -143,7 +142,7 @@ pub async fn put_order( let order = orderbook::db::orders::set_is_taken(&mut conn, order_id, updated_order.taken) .map_err(|e| AppError::InternalServerError(format!("Failed to update order: {e:#}")))?; let sender = state.tx_price_feed.clone(); - update_pricefeed(OrderbookMsg::Update(order.clone()), sender); + update_pricefeed(Message::Update(order.clone()), sender); Ok(Json(order)) } @@ -157,7 +156,7 @@ pub async fn delete_order( .map_err(|e| AppError::InternalServerError(format!("Failed to delete order: {e:#}")))?; if deleted > 0 { let sender = state.tx_price_feed.clone(); - update_pricefeed(OrderbookMsg::DeleteOrder(order_id), sender); + update_pricefeed(Message::DeleteOrder(order_id), sender); } Ok(Json(deleted)) diff --git a/coordinator/src/orderbook/trading.rs b/coordinator/src/orderbook/trading.rs index 5a573a35c..a54ce06a2 100644 --- a/coordinator/src/orderbook/trading.rs +++ b/coordinator/src/orderbook/trading.rs @@ -1,4 +1,4 @@ -use crate::db::positions; +use crate::notification::Notification; use crate::orderbook::db::matches; use crate::orderbook::db::orders; use anyhow::anyhow; @@ -15,15 +15,14 @@ use futures::future::RemoteHandle; use futures::FutureExt; use orderbook_commons::FilledWith; use orderbook_commons::Match; +use orderbook_commons::Message; use orderbook_commons::NewOrder; use orderbook_commons::Order; use orderbook_commons::OrderReason; use orderbook_commons::OrderState; use orderbook_commons::OrderType; -use orderbook_commons::OrderbookMsg; use rust_decimal::Decimal; use std::cmp::Ordering; -use std::collections::HashMap; use std::str::FromStr; use thiserror::Error; use time::OffsetDateTime; @@ -32,26 +31,15 @@ use tokio::sync::mpsc; use trade::Direction; use uuid::Uuid; -/// This value is arbitrarily set to 100 and defines the message accepted in the trading messages -/// channel buffer. -const TRADING_MESSAGES_BUFFER_SIZE: usize = 100; - -pub enum TradingMessage { - NewOrder(NewOrderMessage), - NewUser(NewUserMessage), -} - +/// This value is arbitrarily set to 100 and defines the number of new order messages buffered +/// in the channel. +const NEW_ORDERS_BUFFER_SIZE: usize = 100; pub struct NewOrderMessage { pub new_order: NewOrder, pub order_reason: OrderReason, pub sender: mpsc::Sender>, } -pub struct NewUserMessage { - pub new_user: PublicKey, - pub sender: mpsc::Sender, -} - #[derive(Error, Debug, PartialEq)] pub enum TradingError { #[error("Invalid order: {0}")] @@ -95,52 +83,37 @@ impl From<&TradeParams> for TraderMatchParams { /// the trading task by spawning a new tokio task that is handling messages pub fn start( pool: Pool>, - tx_price_feed: broadcast::Sender, -) -> (RemoteHandle>, mpsc::Sender) { - let (sender, mut receiver) = mpsc::channel::(TRADING_MESSAGES_BUFFER_SIZE); - - let mut authenticated_users = HashMap::new(); + tx_price_feed: broadcast::Sender, + notifier: mpsc::Sender, +) -> (RemoteHandle>, mpsc::Sender) { + let (sender, mut receiver) = mpsc::channel::(NEW_ORDERS_BUFFER_SIZE); let (fut, remote_handle) = async move { - - while let Some(trading_message) = receiver.recv().await { - match trading_message { - TradingMessage::NewOrder(new_order_msg) => { - tokio::spawn({ - let mut conn = pool.get()?; - let authenticated_users = authenticated_users.clone(); - let tx_price_feed = tx_price_feed.clone(); - async move { - let new_order = new_order_msg.new_order; - let result = process_new_order(&mut conn, tx_price_feed, new_order, new_order_msg.order_reason, &authenticated_users) - .await; - if let Err(e) = new_order_msg.sender.send(result).await { - tracing::error!("Failed to send new order message! Error: {e:#}"); - } - } - }); - } - TradingMessage::NewUser(new_user_msg) => { - tracing::info!(trader_id=%new_user_msg.new_user, "User logged in to 10101"); - - authenticated_users.insert(new_user_msg.new_user, new_user_msg.sender); - - tokio::spawn({ - let mut conn = pool.get()?; - let authenticated_users = authenticated_users.clone(); - async move { - tracing::debug!(trader_id=%new_user_msg.new_user, "Checking if the user needs to be notified about pending matches"); - if let Err(e) = process_pending_actions(&mut conn, &authenticated_users, new_user_msg.new_user).await { - tracing::error!("Failed to process pending match. Error: {e:#}"); - } - } - }); + while let Some(new_order_msg) = receiver.recv().await { + tokio::spawn({ + let mut conn = pool.get()?; + let tx_price_feed = tx_price_feed.clone(); + let notifier = notifier.clone(); + async move { + let new_order = new_order_msg.new_order; + let result = process_new_order( + &mut conn, + notifier, + tx_price_feed, + new_order, + new_order_msg.order_reason, + ) + .await; + if let Err(e) = new_order_msg.sender.send(result).await { + tracing::error!("Failed to send new order message! Error: {e:#}"); + } } - } + }); } Ok(()) - }.remote_handle(); + } + .remote_handle(); tokio::spawn(fut); @@ -156,10 +129,10 @@ pub fn start( /// Market order: find match and notify traders async fn process_new_order( conn: &mut PgConnection, - tx_price_feed: broadcast::Sender, + notifier: mpsc::Sender, + tx_price_feed: broadcast::Sender, new_order: NewOrder, order_reason: OrderReason, - authenticated_users: &HashMap>, ) -> Result { tracing::info!(trader_id=%new_order.trader_id, "Received a new {:?} order", new_order.order_type); @@ -181,7 +154,7 @@ async fn process_new_order( if new_order.order_type == OrderType::Limit { // we only tell everyone about new limit orders tx_price_feed - .send(OrderbookMsg::NewOrder(order.clone())) + .send(Message::NewOrder(order.clone())) .map_err(|error| anyhow!("Could not update price feed due to '{error}'"))?; } else { // reject new order if there is already a matched order waiting for execution. @@ -232,14 +205,15 @@ async fn process_new_order( tracing::info!(%trader_id, order_id, "Notifying trader about match"); let message = match &order.order_reason { - OrderReason::Manual => OrderbookMsg::Match(match_param.filled_with.clone()), - OrderReason::Expired => OrderbookMsg::AsyncMatch { + OrderReason::Manual => Message::Match(match_param.filled_with.clone()), + OrderReason::Expired => Message::AsyncMatch { order: order.clone(), filled_with: match_param.filled_with.clone(), }, }; - let order_state = match notify_trader(trader_id, message, authenticated_users).await { + let msg = Notification::Message { trader_id, message }; + let order_state = match notifier.send(msg).await { Ok(()) => { tracing::debug!(%trader_id, order_id, "Successfully notified trader"); OrderState::Matched @@ -274,54 +248,6 @@ async fn process_new_order( Ok(order) } -/// Checks if there are any immediate actions to be processed by the app -/// - Pending Match -/// - Rollover -async fn process_pending_actions( - conn: &mut PgConnection, - authenticated_users: &HashMap>, - trader_id: PublicKey, -) -> Result<()> { - if let Some(order) = orders::get_by_trader_id_and_state(conn, trader_id, OrderState::Matched)? { - tracing::debug!(%trader_id, order_id=%order.id, "Notifying trader about pending match"); - - let matches = matches::get_matches_by_order_id(conn, order.id)?; - let filled_with = orderbook_commons::get_filled_with_from_matches(matches)?; - - let message = match order.order_reason { - OrderReason::Manual => OrderbookMsg::Match(filled_with), - OrderReason::Expired => OrderbookMsg::AsyncMatch { order, filled_with }, - }; - - if let Err(e) = notify_trader(trader_id, message, authenticated_users).await { - tracing::warn!("Failed to notify trader. Error: {e:#}"); - } - } else if let Some(position) = - positions::Position::get_open_position_by_trader(conn, trader_id.to_string())? - { - tracing::debug!(%trader_id, position_id=position.id, "Checking if the users positions is eligible for rollover"); - - if orderbook_commons::is_in_rollover_weekend(position.expiry_timestamp) { - let next_expiry = orderbook_commons::get_expiry_timestamp(OffsetDateTime::now_utc()); - if position.expiry_timestamp == next_expiry { - tracing::trace!(%trader_id, position_id=position.id, "Position has already been rolled over"); - return Ok(()); - } - - tracing::debug!(%trader_id, position_id=position.id, "Proposing to rollover users position"); - - let message = OrderbookMsg::Rollover; - if let Err(e) = notify_trader(trader_id, message, authenticated_users).await { - // if that happens, it's most likely that the user already closed its app again - // and we can simply wait for the user to come online again to retry. - tracing::debug!("Failed to notify trader. Error: {e:#}"); - } - } - } - - Ok(()) -} - /// Matches a provided market order with limit orders from the DB /// /// If the order is a long order, we return the short orders sorted by price (highest first) @@ -368,7 +294,7 @@ fn match_order( return Ok(None); } - let expiry_timestamp = orderbook_commons::get_expiry_timestamp(OffsetDateTime::now_utc()); + let expiry_timestamp = coordinator_commons::calculate_next_expiry(OffsetDateTime::now_utc()); // For now we hardcode the oracle pubkey here let oracle_pk = XOnlyPublicKey::from_str( @@ -453,45 +379,20 @@ fn sort_orders(mut orders: Vec, is_long: bool) -> Vec { orders } -async fn notify_trader( - trader_id: PublicKey, - message: OrderbookMsg, - traders: &HashMap>, -) -> Result<()> { - match traders.get(&trader_id) { - None => bail!("Trader is not connected"), - Some(sender) => sender - .send(message) - .await - .map_err(|err| anyhow!("Connection lost to trader {err:#}")), - } -} - #[cfg(test)] pub mod tests { use crate::orderbook::trading::match_order; - use crate::orderbook::trading::notify_trader; use crate::orderbook::trading::sort_orders; - use crate::orderbook::trading::MatchParams; - use crate::orderbook::trading::TraderMatchParams; use bitcoin::secp256k1::PublicKey; - use bitcoin::secp256k1::SecretKey; - use bitcoin::secp256k1::SECP256K1; - use bitcoin::XOnlyPublicKey; - use orderbook_commons::FilledWith; - use orderbook_commons::Match; use orderbook_commons::Order; use orderbook_commons::OrderReason; use orderbook_commons::OrderState; use orderbook_commons::OrderType; - use orderbook_commons::OrderbookMsg; use rust_decimal::Decimal; use rust_decimal_macros::dec; - use std::collections::HashMap; use std::str::FromStr; use time::Duration; use time::OffsetDateTime; - use tokio::sync::mpsc; use trade::ContractSymbol; use trade::Direction; use uuid::Uuid; @@ -788,88 +689,4 @@ pub mod tests { assert!(matched_orders.is_none()); } - - #[tokio::test] - async fn given_matches_will_notify_all_traders() { - let trader_key = SecretKey::from_slice(&b"Me noob, don't lose money pleazz"[..]).unwrap(); - let trader_pub_key = trader_key.public_key(SECP256K1); - let maker_key = SecretKey::from_slice(&b"I am a king trader mate, right!?"[..]).unwrap(); - let maker_pub_key = maker_key.public_key(SECP256K1); - let trader_order_id = Uuid::new_v4(); - let maker_order_id = Uuid::new_v4(); - let oracle_pk = XOnlyPublicKey::from_str( - "16f88cf7d21e6c0f46bcbc983a4e3b19726c6c98858cc31c83551a88fde171c0", - ) - .unwrap(); - let maker_order_price = dec!(20_000); - let expiry_timestamp = OffsetDateTime::now_utc(); - let matched_orders = MatchParams { - taker_match: TraderMatchParams { - trader_id: trader_pub_key, - filled_with: FilledWith { - order_id: trader_order_id, - expiry_timestamp, - oracle_pk, - matches: vec![Match { - id: Uuid::new_v4(), - order_id: maker_order_id, - quantity: dec!(100), - pubkey: maker_pub_key, - execution_price: maker_order_price, - }], - }, - }, - makers_matches: vec![TraderMatchParams { - trader_id: maker_pub_key, - filled_with: FilledWith { - order_id: maker_order_id, - expiry_timestamp, - oracle_pk, - matches: vec![Match { - id: Uuid::new_v4(), - order_id: trader_order_id, - quantity: dec!(100), - pubkey: trader_pub_key, - execution_price: maker_order_price, - }], - }, - }], - }; - let mut traders = HashMap::new(); - let (maker_sender, mut maker_receiver) = mpsc::channel::(1); - let (trader_sender, mut trader_receiver) = mpsc::channel::(1); - traders.insert(maker_pub_key, maker_sender); - traders.insert(trader_pub_key, trader_sender); - - for match_param in matched_orders.matches() { - notify_trader( - match_param.trader_id, - OrderbookMsg::Match(match_param.filled_with.clone()), - &traders, - ) - .await - .unwrap(); - } - - let maker_msg = maker_receiver.recv().await.unwrap(); - let trader_msg = trader_receiver.recv().await.unwrap(); - - match maker_msg { - OrderbookMsg::Match(msg) => { - assert_eq!(msg.order_id, maker_order_id) - } - _ => { - panic!("Invalid message received") - } - } - - match trader_msg { - OrderbookMsg::Match(msg) => { - assert_eq!(msg.order_id, trader_order_id) - } - _ => { - panic!("Invalid message received") - } - } - } } diff --git a/coordinator/src/orderbook/websocket.rs b/coordinator/src/orderbook/websocket.rs index 1147b1221..52fe4efbd 100644 --- a/coordinator/src/orderbook/websocket.rs +++ b/coordinator/src/orderbook/websocket.rs @@ -1,13 +1,12 @@ +use crate::notification::NewUserMessage; use crate::orderbook; -use crate::orderbook::trading::NewUserMessage; -use crate::orderbook::trading::TradingMessage; use crate::routes::AppState; -use axum::extract::ws::Message; +use axum::extract::ws::Message as WebsocketMessage; use axum::extract::ws::WebSocket; use futures::SinkExt; use futures::StreamExt; use orderbook_commons::create_sign_message; -use orderbook_commons::OrderbookMsg; +use orderbook_commons::Message; use orderbook_commons::OrderbookRequest; use orderbook_commons::Signature; use std::sync::Arc; @@ -25,7 +24,7 @@ pub async fn websocket_connection(stream: WebSocket, state: Arc) { // We subscribe *before* sending the "joined" message, so that we will also // display it to our client. - let mut rx = state.tx_price_feed.subscribe(); + let mut price_feed = state.tx_price_feed.subscribe(); let mut conn = match state.pool.clone().get() { Ok(conn) => conn, @@ -44,11 +43,11 @@ pub async fn websocket_connection(stream: WebSocket, state: Arc) { }; // Now send the "all orders" to the new client. - if let Ok(msg) = serde_json::to_string(&OrderbookMsg::AllOrders(orders)) { - let _ = sender.send(Message::Text(msg)).await; + if let Ok(msg) = serde_json::to_string(&Message::AllOrders(orders)) { + let _ = sender.send(WebsocketMessage::Text(msg)).await; } - let (local_sender, mut local_receiver) = mpsc::channel::(100); + let (local_sender, mut local_receiver) = mpsc::channel::(100); let mut local_recv_task = tokio::spawn(async move { while let Some(local_msg) = local_receiver.recv().await { @@ -56,7 +55,7 @@ pub async fn websocket_connection(stream: WebSocket, state: Arc) { Ok(msg) => { if let Err(err) = tokio::time::timeout( WEBSOCKET_SEND_TIMEOUT, - sender.send(Message::Text(msg.clone())), + sender.send(WebsocketMessage::Text(msg.clone())), ) .await { @@ -76,7 +75,7 @@ pub async fn websocket_connection(stream: WebSocket, state: Arc) { let mut send_task = { let local_sender = local_sender.clone(); tokio::spawn(async move { - while let Ok(st) = rx.recv().await { + while let Ok(st) = price_feed.recv().await { if let Err(error) = local_sender.send(st).await { tracing::error!("Could not send message {error:#}"); return; @@ -88,28 +87,28 @@ pub async fn websocket_connection(stream: WebSocket, state: Arc) { // Spawn a task that takes messages from the websocket let local_sender = local_sender.clone(); let mut recv_task = tokio::spawn(async move { - while let Some(Ok(Message::Text(text))) = receiver.next().await { + while let Some(Ok(WebsocketMessage::Text(text))) = receiver.next().await { match serde_json::from_str(text.as_str()) { Ok(OrderbookRequest::Authenticate(Signature { signature, pubkey })) => { let msg = create_sign_message(); match signature.verify(&msg, &pubkey) { Ok(_) => { - if let Err(e) = local_sender.send(OrderbookMsg::Authenticated).await { + if let Err(e) = local_sender.send(Message::Authenticated).await { tracing::error!("Could not respond to user {e:#}"); return; } - let message = TradingMessage::NewUser(NewUserMessage { + let message = NewUserMessage { new_user: pubkey, sender: local_sender.clone(), - }); - if let Err(e) = state.trading_sender.send(message).await { + }; + if let Err(e) = state.tx_user_feed.send(message) { tracing::error!("Could not send trading message. Error: {e:#}"); } } Err(err) => { if let Err(er) = local_sender - .send(OrderbookMsg::InvalidAuthentication(format!( + .send(Message::InvalidAuthentication(format!( "Could not authenticate {err:#}" ))) .await diff --git a/coordinator/src/routes.rs b/coordinator/src/routes.rs index 1e9a3d572..5425c5537 100644 --- a/coordinator/src/routes.rs +++ b/coordinator/src/routes.rs @@ -11,13 +11,14 @@ use crate::admin::send_payment; use crate::admin::sign_message; use crate::db::user; use crate::node::Node; +use crate::notification::NewUserMessage; use crate::orderbook::routes::delete_order; use crate::orderbook::routes::get_order; use crate::orderbook::routes::get_orders; use crate::orderbook::routes::post_order; use crate::orderbook::routes::put_order; use crate::orderbook::routes::websocket_handler; -use crate::orderbook::trading::TradingMessage; +use crate::orderbook::trading::NewOrderMessage; use crate::settings::Settings; use crate::AppError; use autometrics::autometrics; @@ -48,7 +49,7 @@ use ln_dlc_node::node::peer_manager::alias_as_bytes; use ln_dlc_node::node::peer_manager::broadcast_node_announcement; use ln_dlc_node::node::NodeInfo; use opentelemetry_prometheus::PrometheusExporter; -use orderbook_commons::OrderbookMsg; +use orderbook_commons::Message; use orderbook_commons::RouteHintHop; use prometheus::Encoder; use prometheus::TextEncoder; @@ -65,8 +66,9 @@ use tracing::instrument; pub struct AppState { pub node: Node, // Channel used to send messages to all connected clients. - pub tx_price_feed: broadcast::Sender, - pub trading_sender: mpsc::Sender, + pub tx_price_feed: broadcast::Sender, + pub tx_user_feed: broadcast::Sender, + pub trading_sender: mpsc::Sender, pub pool: Pool>, pub settings: RwLock, pub exporter: PrometheusExporter, @@ -82,14 +84,16 @@ pub fn router( exporter: PrometheusExporter, announcement_addresses: Vec, node_alias: &str, - trading_sender: mpsc::Sender, - tx_price_feed: broadcast::Sender, + trading_sender: mpsc::Sender, + tx_price_feed: broadcast::Sender, + tx_user_feed: broadcast::Sender, ) -> Router { let app_state = Arc::new(AppState { node, pool, settings: RwLock::new(settings), tx_price_feed, + tx_user_feed, trading_sender, exporter, announcement_addresses, diff --git a/crates/coordinator-commons/src/lib.rs b/crates/coordinator-commons/src/lib.rs index e239f7b5a..dd23d4e2b 100644 --- a/crates/coordinator-commons/src/lib.rs +++ b/crates/coordinator-commons/src/lib.rs @@ -3,6 +3,10 @@ use orderbook_commons::FilledWith; use rust_decimal::Decimal; use serde::Deserialize; use serde::Serialize; +use time::macros::time; +use time::Duration; +use time::OffsetDateTime; +use time::Weekday; use trade::ContractSymbol; use trade::Direction; @@ -78,3 +82,141 @@ pub struct TokenUpdateParams { pub pubkey: String, pub fcm_token: String, } + +/// Calculates the expiry timestamp at the next Sunday at 3 pm UTC from a given offset date time. +/// If the argument falls in between Friday, 3 pm UTC and Sunday, 3pm UTC, the expiry will be +/// calculated to next weeks Sunday at 3 pm +pub fn calculate_next_expiry(time: OffsetDateTime) -> OffsetDateTime { + let days = if is_in_rollover_weekend(time) || time.weekday() == Weekday::Sunday { + // if the provided time is in the rollover weekend or on a sunday, we expire the sunday the + // week after. + 7 - time.weekday().number_from_monday() + 7 + } else { + 7 - time.weekday().number_from_monday() + }; + let time = time.date().with_hms(15, 0, 0).expect("to fit into time"); + + (time + Duration::days(days as i64)).assume_utc() +} + +/// Checks whether the provided expiry date is eligible for a rollover +/// +/// Returns true if the given date falls in between friday 15 pm UTC and sunday 15 pm UTC +pub fn is_in_rollover_weekend(timestamp: OffsetDateTime) -> bool { + match timestamp.weekday() { + Weekday::Friday => timestamp.time() >= time!(15:00), + Weekday::Saturday => true, + Weekday::Sunday => timestamp.time() < time!(15:00), + _ => false, + } +} + +#[cfg(test)] +mod test { + use crate::calculate_next_expiry; + use crate::is_in_rollover_weekend; + use time::OffsetDateTime; + + #[test] + fn test_is_not_in_rollover_weekend() { + // Wed Aug 09 2023 09:30:23 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691573423).unwrap(); + assert!(!is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_is_just_in_rollover_weekend_friday() { + // Fri Aug 11 2023 15:00:00 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691766000).unwrap(); + assert!(is_in_rollover_weekend(expiry)); + + // Fri Aug 11 2023 15:00:01 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691766001).unwrap(); + assert!(is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_is_in_rollover_weekend_saturday() { + // Sat Aug 12 2023 16:00:00 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691856000).unwrap(); + assert!(is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_is_just_in_rollover_weekend_sunday() { + // Sun Aug 13 2023 14:59:59 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691938799).unwrap(); + assert!(is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_is_just_not_in_rollover_weekend_sunday() { + // Sun Aug 13 2023 15:00:00 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691938800).unwrap(); + assert!(!is_in_rollover_weekend(expiry)); + + // Sun Aug 13 2023 15:00:01 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691938801).unwrap(); + assert!(!is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_expiry_timestamp_before_friday_15pm() { + // Wed Aug 09 2023 09:30:23 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691573423).unwrap(); + let expiry = calculate_next_expiry(from); + + // Sun Aug 13 2023 15:00:00 GMT+0000 + assert_eq!(1691938800, expiry.unix_timestamp()); + } + + #[test] + fn test_expiry_timestamp_just_before_friday_15pm() { + // Fri Aug 11 2023 14:59:59 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691765999).unwrap(); + let expiry = calculate_next_expiry(from); + + // Sun Aug 13 2023 15:00:00 GMT+0000 + assert_eq!(1691938800, expiry.unix_timestamp()); + } + + #[test] + fn test_expiry_timestamp_just_after_friday_15pm() { + // Fri Aug 11 2023 15:00:01 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691766001).unwrap(); + let expiry = calculate_next_expiry(from); + + // Sun Aug 20 2023 15:00:00 GMT+0000 + assert_eq!(1692543600, expiry.unix_timestamp()); + } + + #[test] + fn test_expiry_timestamp_at_friday_15pm() { + // Fri Aug 11 2023 15:00:00 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691766000).unwrap(); + let expiry = calculate_next_expiry(from); + + // Sun Aug 20 2023 15:00:00 GMT+0000 + assert_eq!(1692543600, expiry.unix_timestamp()); + } + + #[test] + fn test_expiry_timestamp_after_sunday_15pm() { + // Sun Aug 06 2023 16:00:00 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691337600).unwrap(); + let expiry = calculate_next_expiry(from); + + // Sun Aug 13 2023 15:00:00 GMT+0000 + assert_eq!(1691938800, expiry.unix_timestamp()); + } + + #[test] + fn test_expiry_timestamp_on_saturday() { + // // Sat Aug 12 2023 16:00:00 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691856000).unwrap(); + let expiry = calculate_next_expiry(from); + + // Sun Aug 20 2023 15:00:00 GMT+0000 + assert_eq!(1692543600, expiry.unix_timestamp()); + } +} diff --git a/crates/orderbook-commons/src/lib.rs b/crates/orderbook-commons/src/lib.rs index 9e1ef7cd5..f571f6904 100644 --- a/crates/orderbook-commons/src/lib.rs +++ b/crates/orderbook-commons/src/lib.rs @@ -2,11 +2,9 @@ pub use crate::order_matching_fee::order_matching_fee_taker; pub use crate::price::best_current_price; pub use crate::price::Price; pub use crate::price::Prices; -use anyhow::ensure; -use anyhow::Result; use rust_decimal::prelude::ToPrimitive; use rust_decimal::Decimal; -use secp256k1::Message; +use secp256k1::Message as SecpMessage; use secp256k1::PublicKey; use secp256k1::XOnlyPublicKey; use serde::Deserialize; @@ -14,11 +12,7 @@ use serde::Serialize; use sha2::digest::FixedOutput; use sha2::Digest; use sha2::Sha256; -use std::str::FromStr; -use time::macros::time; -use time::Duration; use time::OffsetDateTime; -use time::Weekday; use trade::ContractSymbol; use trade::Direction; use uuid::Uuid; @@ -69,11 +63,11 @@ pub struct Signature { pub signature: secp256k1::ecdsa::Signature, } -pub fn create_sign_message() -> Message { +pub fn create_sign_message() -> SecpMessage { let sign_message = "Hello it's me Mario".to_string(); let hashed_message = Sha256::new().chain_update(sign_message).finalize_fixed(); - let msg = Message::from_slice(hashed_message.as_slice()) + let msg = SecpMessage::from_slice(hashed_message.as_slice()) .expect("The message is static, hence this should never happen"); msg } @@ -117,8 +111,10 @@ pub enum OrderbookRequest { Authenticate(Signature), } +// TODO(holzeis): The message enum should not be in the orderbook-commons crate as it also contains +// coordinator messages. We should move all common crates into a single one. #[derive(Serialize, Clone, Deserialize, Debug)] -pub enum OrderbookMsg { +pub enum Message { AllOrders(Vec), NewOrder(Order), DeleteOrder(Uuid), @@ -320,72 +316,8 @@ pub struct Matches { pub updated_at: OffsetDateTime, } -pub fn get_filled_with_from_matches(matches: Vec) -> Result { - ensure!( - !matches.is_empty(), - "Need at least one matches record to construct a FilledWith" - ); - - let order_id = matches - .first() - .expect("to have at least one match") - .order_id; - let oracle_pk = XOnlyPublicKey::from_str( - "16f88cf7d21e6c0f46bcbc983a4e3b19726c6c98858cc31c83551a88fde171c0", - ) - .expect("To be a valid pubkey"); - - let expiry_timestamp = get_expiry_timestamp(OffsetDateTime::now_utc()); - - Ok(FilledWith { - order_id, - expiry_timestamp, - oracle_pk, - matches: matches - .iter() - .map(|m| Match { - id: m.id, - order_id: m.order_id, - quantity: m.quantity, - pubkey: m.match_trader_id, - execution_price: m.execution_price, - }) - .collect(), - }) -} - -/// Calculates the expiry timestamp at the next Sunday at 3 pm UTC from a given offset date time. -/// If the argument falls in between Friday, 3 pm UTC and Sunday, 3pm UTC, the expiry will be -/// calculated to next weeks Sunday at 3 pm -pub fn get_expiry_timestamp(time: OffsetDateTime) -> OffsetDateTime { - let days = if is_in_rollover_weekend(time) || time.weekday() == Weekday::Sunday { - // if the provided time is in the rollover weekend or on a sunday, we expire the sunday the - // week after. - 7 - time.weekday().number_from_monday() + 7 - } else { - 7 - time.weekday().number_from_monday() - }; - let time = time.date().with_hms(15, 0, 0).expect("to fit into time"); - - (time + Duration::days(days as i64)).assume_utc() -} - -/// Checks whether the provided expiry date is eligible for a rollover -/// -/// Returns true if the given date falls in between friday 15 pm UTC and sunday 15 pm UTC -pub fn is_in_rollover_weekend(timestamp: OffsetDateTime) -> bool { - match timestamp.weekday() { - Weekday::Friday => timestamp.time() >= time!(15:00), - Weekday::Saturday => true, - Weekday::Sunday => timestamp.time() < time!(15:00), - _ => false, - } -} - #[cfg(test)] mod test { - use crate::get_expiry_timestamp; - use crate::is_in_rollover_weekend; use crate::FilledWith; use crate::Match; use crate::Signature; @@ -402,109 +334,6 @@ mod test { .unwrap() } - #[test] - fn test_is_not_in_rollover_weekend() { - // Wed Aug 09 2023 09:30:23 GMT+0000 - let expiry = OffsetDateTime::from_unix_timestamp(1691573423).unwrap(); - assert!(!is_in_rollover_weekend(expiry)); - } - - #[test] - fn test_is_just_in_rollover_weekend_friday() { - // Fri Aug 11 2023 15:00:00 GMT+0000 - let expiry = OffsetDateTime::from_unix_timestamp(1691766000).unwrap(); - assert!(is_in_rollover_weekend(expiry)); - - // Fri Aug 11 2023 15:00:01 GMT+0000 - let expiry = OffsetDateTime::from_unix_timestamp(1691766001).unwrap(); - assert!(is_in_rollover_weekend(expiry)); - } - - #[test] - fn test_is_in_rollover_weekend_saturday() { - // Sat Aug 12 2023 16:00:00 GMT+0000 - let expiry = OffsetDateTime::from_unix_timestamp(1691856000).unwrap(); - assert!(is_in_rollover_weekend(expiry)); - } - - #[test] - fn test_is_just_in_rollover_weekend_sunday() { - // Sun Aug 13 2023 14:59:59 GMT+0000 - let expiry = OffsetDateTime::from_unix_timestamp(1691938799).unwrap(); - assert!(is_in_rollover_weekend(expiry)); - } - - #[test] - fn test_is_just_not_in_rollover_weekend_sunday() { - // Sun Aug 13 2023 15:00:00 GMT+0000 - let expiry = OffsetDateTime::from_unix_timestamp(1691938800).unwrap(); - assert!(!is_in_rollover_weekend(expiry)); - - // Sun Aug 13 2023 15:00:01 GMT+0000 - let expiry = OffsetDateTime::from_unix_timestamp(1691938801).unwrap(); - assert!(!is_in_rollover_weekend(expiry)); - } - - #[test] - fn test_expiry_timestamp_before_friday_15pm() { - // Wed Aug 09 2023 09:30:23 GMT+0000 - let from = OffsetDateTime::from_unix_timestamp(1691573423).unwrap(); - let expiry = get_expiry_timestamp(from); - - // Sun Aug 13 2023 15:00:00 GMT+0000 - assert_eq!(1691938800, expiry.unix_timestamp()); - } - - #[test] - fn test_expiry_timestamp_just_before_friday_15pm() { - // Fri Aug 11 2023 14:59:59 GMT+0000 - let from = OffsetDateTime::from_unix_timestamp(1691765999).unwrap(); - let expiry = get_expiry_timestamp(from); - - // Sun Aug 13 2023 15:00:00 GMT+0000 - assert_eq!(1691938800, expiry.unix_timestamp()); - } - - #[test] - fn test_expiry_timestamp_just_after_friday_15pm() { - // Fri Aug 11 2023 15:00:01 GMT+0000 - let from = OffsetDateTime::from_unix_timestamp(1691766001).unwrap(); - let expiry = get_expiry_timestamp(from); - - // Sun Aug 20 2023 15:00:00 GMT+0000 - assert_eq!(1692543600, expiry.unix_timestamp()); - } - - #[test] - fn test_expiry_timestamp_at_friday_15pm() { - // Fri Aug 11 2023 15:00:00 GMT+0000 - let from = OffsetDateTime::from_unix_timestamp(1691766000).unwrap(); - let expiry = get_expiry_timestamp(from); - - // Sun Aug 20 2023 15:00:00 GMT+0000 - assert_eq!(1692543600, expiry.unix_timestamp()); - } - - #[test] - fn test_expiry_timestamp_after_sunday_15pm() { - // Sun Aug 06 2023 16:00:00 GMT+0000 - let from = OffsetDateTime::from_unix_timestamp(1691337600).unwrap(); - let expiry = get_expiry_timestamp(from); - - // Sun Aug 13 2023 15:00:00 GMT+0000 - assert_eq!(1691938800, expiry.unix_timestamp()); - } - - #[test] - fn test_expiry_timestamp_on_saturday() { - // // Sat Aug 12 2023 16:00:00 GMT+0000 - let from = OffsetDateTime::from_unix_timestamp(1691856000).unwrap(); - let expiry = get_expiry_timestamp(from); - - // Sun Aug 20 2023 15:00:00 GMT+0000 - assert_eq!(1692543600, expiry.unix_timestamp()); - } - #[test] fn test_serialize_signature() { let secret_key = SecretKey::from_slice(&[ diff --git a/crates/tests-e2e/tests/rollover_position.rs b/crates/tests-e2e/tests/rollover_position.rs index ee2d216eb..8e6af79dc 100644 --- a/crates/tests-e2e/tests/rollover_position.rs +++ b/crates/tests-e2e/tests/rollover_position.rs @@ -22,7 +22,7 @@ async fn can_rollover_position() { .unwrap(); let position = test.app.rx.position().expect("position to exist"); - let new_expiry = orderbook_commons::get_expiry_timestamp(position.expiry); + let new_expiry = coordinator_commons::calculate_next_expiry(position.expiry); coordinator .rollover(&dlc_channel.dlc_channel_id.unwrap()) diff --git a/mobile/native/src/api.rs b/mobile/native/src/api.rs index 6bfa679dc..0d049ee68 100644 --- a/mobile/native/src/api.rs +++ b/mobile/native/src/api.rs @@ -397,5 +397,7 @@ pub fn get_channel_open_fee_estimate_sat() -> Result { } pub fn get_expiry_timestamp() -> SyncReturn { - SyncReturn(orderbook_commons::get_expiry_timestamp(OffsetDateTime::now_utc()).unix_timestamp()) + SyncReturn( + coordinator_commons::calculate_next_expiry(OffsetDateTime::now_utc()).unix_timestamp(), + ) } diff --git a/mobile/native/src/orderbook.rs b/mobile/native/src/orderbook.rs index 4637ad3c9..91bee3de0 100644 --- a/mobile/native/src/orderbook.rs +++ b/mobile/native/src/orderbook.rs @@ -10,8 +10,8 @@ use bdk::bitcoin::secp256k1::SecretKey; use bdk::bitcoin::secp256k1::SECP256K1; use futures::TryStreamExt; use orderbook_commons::best_current_price; +use orderbook_commons::Message; use orderbook_commons::Order; -use orderbook_commons::OrderbookMsg; use orderbook_commons::Prices; use orderbook_commons::Signature; use parking_lot::Mutex; @@ -94,7 +94,7 @@ pub fn subscribe( Ok(Some(msg)) => { tracing::debug!(%msg, "New message from orderbook"); - let msg = match serde_json::from_str::(&msg) { + let msg = match serde_json::from_str::(&msg) { Ok(msg) => msg, Err(e) => { tracing::error!( @@ -105,7 +105,7 @@ pub fn subscribe( }; match msg { - OrderbookMsg::Rollover => { + Message::Rollover => { tracing::info!("Received a rollover request from orderbook."); event::publish(&EventInternal::BackgroundNotification(BackgroundTask::Rollover(TaskStatus::Pending))); @@ -114,7 +114,7 @@ pub fn subscribe( event::publish(&EventInternal::BackgroundNotification(BackgroundTask::Rollover(TaskStatus::Failed))); } }, - OrderbookMsg::AsyncMatch { order, filled_with } => { + Message::AsyncMatch { order, filled_with } => { let order_reason = order.clone().order_reason.into(); tracing::info!(order_id = %order.id, "Received an async match from orderbook. Reason: {order_reason:?}"); event::publish(&EventInternal::BackgroundNotification(BackgroundTask::AsyncTrade(order_reason))); @@ -123,14 +123,14 @@ pub fn subscribe( tracing::error!(order_id = %order.id, "Failed to process async trade. Error: {e:#}"); } }, - OrderbookMsg::Match(filled) => { + Message::Match(filled) => { tracing::info!(order_id = %filled.order_id, "Received match from orderbook"); if let Err(e) = position::handler::trade(filled.clone()).await { tracing::error!(order_id = %filled.order_id, "Trade request sent to coordinator failed. Error: {e:#}"); } }, - OrderbookMsg::AllOrders(initial_orders) => { + Message::AllOrders(initial_orders) => { let mut orders = orders.lock(); if !orders.is_empty() { tracing::debug!("Received new set of initial orders from orderbook, replacing the previously stored orders"); @@ -141,12 +141,12 @@ pub fn subscribe( *orders = initial_orders; update_prices_if_needed(&mut cached_best_price, &orders); }, - OrderbookMsg::NewOrder(order) => { + Message::NewOrder(order) => { let mut orders = orders.lock(); orders.push(order); update_prices_if_needed(&mut cached_best_price, &orders); } - OrderbookMsg::DeleteOrder(order_id) => { + Message::DeleteOrder(order_id) => { let mut orders = orders.lock(); let found = remove_order(&mut orders, order_id); if !found { @@ -154,7 +154,7 @@ pub fn subscribe( } update_prices_if_needed(&mut cached_best_price, &orders); }, - OrderbookMsg::Update(updated_order) => { + Message::Update(updated_order) => { let mut orders = orders.lock(); let found = remove_order(&mut orders, updated_order.id); if !found {