diff --git a/CHANGELOG.md b/CHANGELOG.md index 7cc1d29dd..73cba68c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Show loading screen when app starts with an expired position - Fix: Prevent crashing the app when there's no Internet connection - feat: Allow exporting the seed phrase even if the Node is offline +- Changed expiry to next Sunday 3 pm UTC +- Automatically rollover if user opens app during rollover weekend ## [1.2.6] - 2023-09-06 diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index 68cce2c7f..a2e290ad5 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -8,10 +8,14 @@ use coordinator::node; use coordinator::node::closed_positions; use coordinator::node::connection; use coordinator::node::expired_positions; +use coordinator::node::rollover; use coordinator::node::storage::NodeStorage; use coordinator::node::unrealized_pnl; use coordinator::node::Node; +use coordinator::notification; +use coordinator::notification::NewUserMessage; use coordinator::notification_service::NotificationService; +use coordinator::orderbook::async_match; use coordinator::orderbook::trading; use coordinator::routes::router; use coordinator::run_migration; @@ -192,8 +196,16 @@ async fn main() -> Result<()> { } }); + let (tx_user_feed, _rx) = broadcast::channel::(100); let (tx_price_feed, _rx) = broadcast::channel(100); - let (_handle, trading_sender) = trading::start(pool.clone(), tx_price_feed.clone()); + + let (_handle, notifier) = notification::start(tx_user_feed.clone()); + + let (_handle, trading_sender) = + trading::start(pool.clone(), tx_price_feed.clone(), notifier.clone()); + + let _handle = async_match::monitor(pool.clone(), tx_user_feed.clone(), notifier.clone()); + let _handle = rollover::monitor(pool.clone(), tx_user_feed.clone(), notifier); tokio::spawn({ let node = node.clone(); @@ -235,6 +247,7 @@ async fn main() -> Result<()> { NODE_ALIAS, trading_sender, tx_price_feed, + tx_user_feed, ); let notification_service = NotificationService::new(opts.fcm_api_key); diff --git a/coordinator/src/db/positions.rs b/coordinator/src/db/positions.rs index 93914b561..7b4fadcdc 100644 --- a/coordinator/src/db/positions.rs +++ b/coordinator/src/db/positions.rs @@ -134,6 +134,7 @@ impl Position { ) -> Result<()> { let affected_rows = diesel::update(positions::table) .filter(positions::trader_pubkey.eq(trader_pubkey)) + .filter(positions::position_state.eq(PositionState::Rollover)) .set(( positions::position_state.eq(PositionState::Open), positions::temporary_contract_id.eq(temporary_contract_id.to_hex()), @@ -169,6 +170,7 @@ impl Position { ) -> Result<()> { let affected_rows = diesel::update(positions::table) .filter(positions::trader_pubkey.eq(trader_pubkey)) + .filter(positions::position_state.eq(PositionState::Open)) .set(( positions::expiry_timestamp.eq(expiry_timestamp), positions::position_state.eq(PositionState::Rollover), diff --git a/coordinator/src/lib.rs b/coordinator/src/lib.rs index 7e652db59..1fad8a83d 100644 --- a/coordinator/src/lib.rs +++ b/coordinator/src/lib.rs @@ -8,14 +8,13 @@ use diesel_migrations::EmbeddedMigrations; use diesel_migrations::MigrationHarness; use serde_json::json; -mod rollover; - pub mod admin; pub mod cli; pub mod db; pub mod logger; pub mod metrics; pub mod node; +pub mod notification; pub mod notification_service; pub mod orderbook; pub mod position; diff --git a/coordinator/src/node.rs b/coordinator/src/node.rs index bff81487d..6fd82c905 100644 --- a/coordinator/src/node.rs +++ b/coordinator/src/node.rs @@ -62,6 +62,7 @@ pub mod closed_positions; pub mod connection; pub mod expired_positions; pub mod order_matching_fee; +pub mod rollover; pub mod routing_fees; pub mod storage; pub mod unrealized_pnl; diff --git a/coordinator/src/node/expired_positions.rs b/coordinator/src/node/expired_positions.rs index 56762eb0d..e669e1fbc 100644 --- a/coordinator/src/node/expired_positions.rs +++ b/coordinator/src/node/expired_positions.rs @@ -2,7 +2,6 @@ use crate::db; use crate::node::Node; use crate::orderbook; use crate::orderbook::trading::NewOrderMessage; -use crate::orderbook::trading::TradingMessage; use crate::position::models::Position; use crate::position::models::PositionState; use anyhow::Context; @@ -22,7 +21,7 @@ use time::Duration; use time::OffsetDateTime; use tokio::sync::mpsc; -pub async fn close(node: Node, trading_sender: mpsc::Sender) -> Result<()> { +pub async fn close(node: Node, trading_sender: mpsc::Sender) -> Result<()> { let mut conn = node.pool.get()?; let positions = db::positions::Position::get_all_open_positions(&mut conn) @@ -93,11 +92,11 @@ pub async fn close(node: Node, trading_sender: mpsc::Sender) -> }; let (sender, mut receiver) = mpsc::channel::>(1); - let message = TradingMessage::NewOrder(NewOrderMessage { + let message = NewOrderMessage { new_order: new_order.clone(), order_reason: OrderReason::Expired, sender, - }); + }; if let Err(e) = trading_sender.send(message).await { tracing::error!(order_id=%new_order.id, trader_id=%new_order.trader_id, "Failed to submit new order for closing expired position. Error: {e:#}"); diff --git a/coordinator/src/rollover.rs b/coordinator/src/node/rollover.rs similarity index 82% rename from coordinator/src/rollover.rs rename to coordinator/src/node/rollover.rs index 770caa72c..63a27a3ea 100644 --- a/coordinator/src/rollover.rs +++ b/coordinator/src/node/rollover.rs @@ -1,20 +1,30 @@ use crate::db; +use crate::db::positions; use crate::node::Node; +use crate::notification::NewUserMessage; +use crate::notification::Notification; use anyhow::bail; use anyhow::Context; use anyhow::Result; use bitcoin::hashes::hex::ToHex; use bitcoin::secp256k1::PublicKey; use bitcoin::XOnlyPublicKey; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; +use diesel::PgConnection; use dlc_manager::contract::contract_input::ContractInput; use dlc_manager::contract::contract_input::ContractInputInfo; use dlc_manager::contract::contract_input::OracleInput; use dlc_manager::contract::Contract; use dlc_manager::contract::ContractDescriptor; use dlc_manager::ChannelId; +use futures::future::RemoteHandle; +use futures::FutureExt; +use orderbook_commons::Message; use std::str::FromStr; -use time::Duration; use time::OffsetDateTime; +use tokio::sync::broadcast; +use tokio::sync::mpsc; use trade::ContractSymbol; #[derive(Debug, Clone)] @@ -29,6 +39,69 @@ struct Rollover { contract_tx_fee_rate: u64, } +pub fn monitor( + pool: Pool>, + tx_user_feed: broadcast::Sender, + notifier: mpsc::Sender, +) -> RemoteHandle> { + let mut user_feed = tx_user_feed.subscribe(); + let (fut, remote_handle) = async move { + while let Ok(new_user_msg) = user_feed.recv().await { + tokio::spawn({ + let mut conn = pool.get()?; + let notifier = notifier.clone(); + async move { + if let Err(e) = + check_if_eligible_for_rollover(&mut conn, notifier, new_user_msg.new_user) + .await + { + tracing::error!("Failed to check if eligible for rollover. Error: {e:#}"); + } + } + }); + } + Ok(()) + } + .remote_handle(); + + tokio::spawn(fut); + + remote_handle +} + +async fn check_if_eligible_for_rollover( + conn: &mut PgConnection, + notifier: mpsc::Sender, + trader_id: PublicKey, +) -> Result<()> { + tracing::debug!(%trader_id, "Checking if the users positions is eligible for rollover"); + if let Some(position) = + positions::Position::get_open_position_by_trader(conn, trader_id.to_string())? + { + if coordinator_commons::is_in_rollover_weekend(OffsetDateTime::now_utc()) + && !position.is_expired() + { + let next_expiry = coordinator_commons::calculate_next_expiry(OffsetDateTime::now_utc()); + if position.expiry_timestamp == next_expiry { + tracing::trace!(%trader_id, position_id=position.id, "Position has already been rolled over"); + return Ok(()); + } + + tracing::debug!(%trader_id, position_id=position.id, "Proposing to rollover users position"); + + let message = Notification::Message { + trader_id, + message: Message::Rollover, + }; + if let Err(e) = notifier.send(message).await { + tracing::debug!("Failed to notify trader. Error: {e:#}"); + } + } + } + + Ok(()) +} + impl Rollover { pub fn new(contract: Contract) -> Result { let contract = match contract { @@ -81,11 +154,8 @@ impl Rollover { } /// Calculates the maturity time based on the current expiry timestamp. - /// - /// todo(holzeis): this should come from a configuration https://github.com/get10101/10101/issues/1029 pub fn maturity_time(&self) -> OffsetDateTime { - let tomorrow = self.expiry_timestamp.date() + Duration::days(7); - tomorrow.midnight().assume_utc() + coordinator_commons::calculate_next_expiry(self.expiry_timestamp) } } @@ -209,8 +279,8 @@ pub mod tests { let event_id = rollover.event_id(); // expect expiry in seven days at midnight. - // Thu Aug 24 2023 00:00:00 GMT+0000 - assert_eq!(event_id, format!("btcusd1692835200")) + // Sun Aug 20 2023 15:00:00 GMT+0000 + assert_eq!(event_id, format!("btcusd1692543600")) } #[test] diff --git a/coordinator/src/notification/mod.rs b/coordinator/src/notification/mod.rs new file mode 100644 index 000000000..2e5092b4d --- /dev/null +++ b/coordinator/src/notification/mod.rs @@ -0,0 +1,84 @@ +use anyhow::Result; +use bitcoin::secp256k1::PublicKey; +use futures::future::RemoteHandle; +use futures::FutureExt; +use orderbook_commons::Message; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::RwLock; +use tokio::sync::broadcast; +use tokio::sync::mpsc; + +/// This value is arbitrarily set to 100 and defines the message accepted in the notification +/// channel buffer. +const NOTIFICATION_BUFFER_SIZE: usize = 100; + +// TODO(holzeis): This enum should be extended to allow for sending push notifications. +pub enum Notification { + Message { + trader_id: PublicKey, + message: Message, + }, +} + +#[derive(Clone)] +pub struct NewUserMessage { + pub new_user: PublicKey, + pub sender: mpsc::Sender, +} + +pub fn start( + tx_user_feed: broadcast::Sender, +) -> (RemoteHandle>, mpsc::Sender) { + let (sender, mut receiver) = mpsc::channel::(NOTIFICATION_BUFFER_SIZE); + + let authenticated_users = Arc::new(RwLock::new(HashMap::new())); + + tokio::task::spawn({ + let traders = authenticated_users.clone(); + async move { + let mut user_feed = tx_user_feed.subscribe(); + while let Ok(new_user_msg) = user_feed.recv().await { + traders + .write() + .expect("RwLock to not be poisoned") + .insert(new_user_msg.new_user, new_user_msg.sender); + } + } + }); + + let (fut, remote_handle) = { + async move { + while let Some(notification) = receiver.recv().await { + match notification { + Notification::Message { trader_id, message } => { + tracing::info!(%trader_id, "Sending message: {message:?}"); + + let trader = { + let traders = authenticated_users + .read() + .expect("RwLock to not be poisoned"); + traders.get(&trader_id).cloned() + }; + + match trader { + Some(sender) => { + if let Err(e) = sender.send(message).await { + tracing::warn!("Connection lost to trader {e:#}"); + } + } + None => tracing::warn!(%trader_id, "Trader is not connected"), + } + } + } + } + + Ok(()) + } + .remote_handle() + }; + + tokio::spawn(fut); + + (remote_handle, sender) +} diff --git a/coordinator/src/orderbook/async_match.rs b/coordinator/src/orderbook/async_match.rs new file mode 100644 index 000000000..9a04e1d54 --- /dev/null +++ b/coordinator/src/orderbook/async_match.rs @@ -0,0 +1,110 @@ +use crate::notification::NewUserMessage; +use crate::notification::Notification; +use crate::orderbook::db::matches; +use crate::orderbook::db::orders; +use anyhow::ensure; +use anyhow::Result; +use bitcoin::secp256k1::PublicKey; +use bitcoin::XOnlyPublicKey; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; +use diesel::PgConnection; +use futures::future::RemoteHandle; +use futures::FutureExt; +use orderbook_commons::FilledWith; +use orderbook_commons::Match; +use orderbook_commons::Matches; +use orderbook_commons::Message; +use orderbook_commons::OrderReason; +use orderbook_commons::OrderState; +use std::str::FromStr; +use time::OffsetDateTime; +use tokio::sync::broadcast; +use tokio::sync::mpsc; + +pub fn monitor( + pool: Pool>, + tx_user_feed: broadcast::Sender, + notifier: mpsc::Sender, +) -> RemoteHandle> { + let mut user_feed = tx_user_feed.subscribe(); + let (fut, remote_handle) = async move { + while let Ok(new_user_msg) = user_feed.recv().await { + tokio::spawn({ + let mut conn = pool.get()?; + let notifier = notifier.clone(); + async move { + tracing::debug!(trader_id=%new_user_msg.new_user, "Checking if the user needs to be notified about pending matches"); + if let Err(e) = process_pending_match(&mut conn, notifier, new_user_msg.new_user).await { + tracing::error!("Failed to process pending match. Error: {e:#}"); + } + } + }); + } + Ok(()) + }.remote_handle(); + + tokio::spawn(fut); + + remote_handle +} + +/// Checks if there are any pending matches +async fn process_pending_match( + conn: &mut PgConnection, + notifier: mpsc::Sender, + trader_id: PublicKey, +) -> Result<()> { + if let Some(order) = orders::get_by_trader_id_and_state(conn, trader_id, OrderState::Matched)? { + tracing::debug!(%trader_id, order_id=%order.id, "Notifying trader about pending match"); + + let matches = matches::get_matches_by_order_id(conn, order.id)?; + let filled_with = get_filled_with_from_matches(matches)?; + + let message = match order.order_reason { + OrderReason::Manual => Message::Match(filled_with), + OrderReason::Expired => Message::AsyncMatch { order, filled_with }, + }; + + let msg = Notification::Message { trader_id, message }; + if let Err(e) = notifier.send(msg).await { + tracing::error!("Failed to send notification. Error: {e:#}"); + } + } + + Ok(()) +} + +fn get_filled_with_from_matches(matches: Vec) -> Result { + ensure!( + !matches.is_empty(), + "Need at least one matches record to construct a FilledWith" + ); + + let order_id = matches + .first() + .expect("to have at least one match") + .order_id; + let oracle_pk = XOnlyPublicKey::from_str( + "16f88cf7d21e6c0f46bcbc983a4e3b19726c6c98858cc31c83551a88fde171c0", + ) + .expect("To be a valid pubkey"); + + let expiry_timestamp = coordinator_commons::calculate_next_expiry(OffsetDateTime::now_utc()); + + Ok(FilledWith { + order_id, + expiry_timestamp, + oracle_pk, + matches: matches + .iter() + .map(|m| Match { + id: m.id, + order_id: m.order_id, + quantity: m.quantity, + pubkey: m.match_trader_id, + execution_price: m.execution_price, + }) + .collect(), + }) +} diff --git a/coordinator/src/orderbook/mod.rs b/coordinator/src/orderbook/mod.rs index 7ab6837db..79026b3e6 100644 --- a/coordinator/src/orderbook/mod.rs +++ b/coordinator/src/orderbook/mod.rs @@ -1,3 +1,4 @@ +pub mod async_match; pub mod db; pub mod routes; pub mod trading; diff --git a/coordinator/src/orderbook/routes.rs b/coordinator/src/orderbook/routes.rs index 3e90247a6..83f636af5 100644 --- a/coordinator/src/orderbook/routes.rs +++ b/coordinator/src/orderbook/routes.rs @@ -1,7 +1,6 @@ use crate::orderbook; use crate::orderbook::trading::NewOrderMessage; use crate::orderbook::trading::TradingError; -use crate::orderbook::trading::TradingMessage; use crate::orderbook::websocket::websocket_connection; use crate::routes::AppState; use crate::AppError; @@ -16,10 +15,10 @@ use axum::Json; use diesel::r2d2::ConnectionManager; use diesel::r2d2::PooledConnection; use diesel::PgConnection; +use orderbook_commons::Message; use orderbook_commons::NewOrder; use orderbook_commons::Order; use orderbook_commons::OrderReason; -use orderbook_commons::OrderbookMsg; use serde::de; use serde::Deserialize; use serde::Deserializer; @@ -94,11 +93,11 @@ pub async fn post_order( ) -> Result, AppError> { let (sender, mut receiver) = mpsc::channel::>(1); - let message = TradingMessage::NewOrder(NewOrderMessage { + let message = NewOrderMessage { new_order, order_reason: OrderReason::Manual, sender, - }); + }; state.trading_sender.send(message).await.map_err(|e| { AppError::InternalServerError(format!("Failed to send new order message: {e:#}")) })?; @@ -118,7 +117,7 @@ pub async fn post_order( Ok(Json(order)) } -fn update_pricefeed(pricefeed_msg: OrderbookMsg, sender: Sender) { +fn update_pricefeed(pricefeed_msg: Message, sender: Sender) { match sender.send(pricefeed_msg) { Ok(_) => { tracing::trace!("Pricefeed updated") @@ -143,7 +142,7 @@ pub async fn put_order( let order = orderbook::db::orders::set_is_taken(&mut conn, order_id, updated_order.taken) .map_err(|e| AppError::InternalServerError(format!("Failed to update order: {e:#}")))?; let sender = state.tx_price_feed.clone(); - update_pricefeed(OrderbookMsg::Update(order.clone()), sender); + update_pricefeed(Message::Update(order.clone()), sender); Ok(Json(order)) } @@ -157,7 +156,7 @@ pub async fn delete_order( .map_err(|e| AppError::InternalServerError(format!("Failed to delete order: {e:#}")))?; if deleted > 0 { let sender = state.tx_price_feed.clone(); - update_pricefeed(OrderbookMsg::DeleteOrder(order_id), sender); + update_pricefeed(Message::DeleteOrder(order_id), sender); } Ok(Json(deleted)) diff --git a/coordinator/src/orderbook/trading.rs b/coordinator/src/orderbook/trading.rs index 471fa99f1..a54ce06a2 100644 --- a/coordinator/src/orderbook/trading.rs +++ b/coordinator/src/orderbook/trading.rs @@ -1,3 +1,4 @@ +use crate::notification::Notification; use crate::orderbook::db::matches; use crate::orderbook::db::orders; use anyhow::anyhow; @@ -14,44 +15,31 @@ use futures::future::RemoteHandle; use futures::FutureExt; use orderbook_commons::FilledWith; use orderbook_commons::Match; +use orderbook_commons::Message; use orderbook_commons::NewOrder; use orderbook_commons::Order; use orderbook_commons::OrderReason; use orderbook_commons::OrderState; use orderbook_commons::OrderType; -use orderbook_commons::OrderbookMsg; use rust_decimal::Decimal; use std::cmp::Ordering; -use std::collections::HashMap; use std::str::FromStr; use thiserror::Error; -use time::Duration; use time::OffsetDateTime; use tokio::sync::broadcast; use tokio::sync::mpsc; use trade::Direction; use uuid::Uuid; -/// This value is arbitrarily set to 100 and defines the message accepted in the trading messages -/// channel buffer. -const TRADING_MESSAGES_BUFFER_SIZE: usize = 100; - -pub enum TradingMessage { - NewOrder(NewOrderMessage), - NewUser(NewUserMessage), -} - +/// This value is arbitrarily set to 100 and defines the number of new order messages buffered +/// in the channel. +const NEW_ORDERS_BUFFER_SIZE: usize = 100; pub struct NewOrderMessage { pub new_order: NewOrder, pub order_reason: OrderReason, pub sender: mpsc::Sender>, } -pub struct NewUserMessage { - pub new_user: PublicKey, - pub sender: mpsc::Sender, -} - #[derive(Error, Debug, PartialEq)] pub enum TradingError { #[error("Invalid order: {0}")] @@ -95,52 +83,37 @@ impl From<&TradeParams> for TraderMatchParams { /// the trading task by spawning a new tokio task that is handling messages pub fn start( pool: Pool>, - tx_price_feed: broadcast::Sender, -) -> (RemoteHandle>, mpsc::Sender) { - let (sender, mut receiver) = mpsc::channel::(TRADING_MESSAGES_BUFFER_SIZE); - - let mut authenticated_users = HashMap::new(); + tx_price_feed: broadcast::Sender, + notifier: mpsc::Sender, +) -> (RemoteHandle>, mpsc::Sender) { + let (sender, mut receiver) = mpsc::channel::(NEW_ORDERS_BUFFER_SIZE); let (fut, remote_handle) = async move { - - while let Some(trading_message) = receiver.recv().await { - match trading_message { - TradingMessage::NewOrder(new_order_msg) => { - tokio::spawn({ - let mut conn = pool.get()?; - let authenticated_users = authenticated_users.clone(); - let tx_price_feed = tx_price_feed.clone(); - async move { - let new_order = new_order_msg.new_order; - let result = process_new_order(&mut conn, tx_price_feed, new_order, new_order_msg.order_reason, &authenticated_users) - .await; - if let Err(e) = new_order_msg.sender.send(result).await { - tracing::error!("Failed to send new order message! Error: {e:#}"); - } - } - }); - } - TradingMessage::NewUser(new_user_msg) => { - tracing::info!(trader_id=%new_user_msg.new_user, "User logged in to 10101"); - - authenticated_users.insert(new_user_msg.new_user, new_user_msg.sender); - - tokio::spawn({ - let mut conn = pool.get()?; - let authenticated_users = authenticated_users.clone(); - async move { - tracing::debug!(trader_id=%new_user_msg.new_user, "Checking if the user needs to be notified about pending matches"); - if let Err(e) = process_pending_match(&mut conn, &authenticated_users, new_user_msg.new_user).await { - tracing::error!("Failed to process pending match. Error: {e:#}"); - } - } - }); + while let Some(new_order_msg) = receiver.recv().await { + tokio::spawn({ + let mut conn = pool.get()?; + let tx_price_feed = tx_price_feed.clone(); + let notifier = notifier.clone(); + async move { + let new_order = new_order_msg.new_order; + let result = process_new_order( + &mut conn, + notifier, + tx_price_feed, + new_order, + new_order_msg.order_reason, + ) + .await; + if let Err(e) = new_order_msg.sender.send(result).await { + tracing::error!("Failed to send new order message! Error: {e:#}"); + } } - } + }); } Ok(()) - }.remote_handle(); + } + .remote_handle(); tokio::spawn(fut); @@ -156,10 +129,10 @@ pub fn start( /// Market order: find match and notify traders async fn process_new_order( conn: &mut PgConnection, - tx_price_feed: broadcast::Sender, + notifier: mpsc::Sender, + tx_price_feed: broadcast::Sender, new_order: NewOrder, order_reason: OrderReason, - authenticated_users: &HashMap>, ) -> Result { tracing::info!(trader_id=%new_order.trader_id, "Received a new {:?} order", new_order.order_type); @@ -181,7 +154,7 @@ async fn process_new_order( if new_order.order_type == OrderType::Limit { // we only tell everyone about new limit orders tx_price_feed - .send(OrderbookMsg::NewOrder(order.clone())) + .send(Message::NewOrder(order.clone())) .map_err(|error| anyhow!("Could not update price feed due to '{error}'"))?; } else { // reject new order if there is already a matched order waiting for execution. @@ -232,14 +205,15 @@ async fn process_new_order( tracing::info!(%trader_id, order_id, "Notifying trader about match"); let message = match &order.order_reason { - OrderReason::Manual => OrderbookMsg::Match(match_param.filled_with.clone()), - OrderReason::Expired => OrderbookMsg::AsyncMatch { + OrderReason::Manual => Message::Match(match_param.filled_with.clone()), + OrderReason::Expired => Message::AsyncMatch { order: order.clone(), filled_with: match_param.filled_with.clone(), }, }; - let order_state = match notify_trader(trader_id, message, authenticated_users).await { + let msg = Notification::Message { trader_id, message }; + let order_state = match notifier.send(msg).await { Ok(()) => { tracing::debug!(%trader_id, order_id, "Successfully notified trader"); OrderState::Matched @@ -274,31 +248,6 @@ async fn process_new_order( Ok(order) } -/// Notifies the trader if a pending match is waiting for them. -async fn process_pending_match( - conn: &mut PgConnection, - authenticated_users: &HashMap>, - trader_id: PublicKey, -) -> Result<()> { - if let Some(order) = orders::get_by_trader_id_and_state(conn, trader_id, OrderState::Matched)? { - tracing::debug!(%trader_id, order_id=%order.id, "Notifying trader about pending match"); - - let matches = matches::get_matches_by_order_id(conn, order.id)?; - let filled_with = orderbook_commons::get_filled_with_from_matches(matches)?; - - let message = match order.order_reason { - OrderReason::Manual => OrderbookMsg::Match(filled_with), - OrderReason::Expired => OrderbookMsg::AsyncMatch { order, filled_with }, - }; - - if let Err(e) = notify_trader(trader_id, message, authenticated_users).await { - tracing::warn!("Failed to notify trader. Error: {e:#}"); - } - } - - Ok(()) -} - /// Matches a provided market order with limit orders from the DB /// /// If the order is a long order, we return the short orders sorted by price (highest first) @@ -345,8 +294,7 @@ fn match_order( return Ok(None); } - let tomorrow = OffsetDateTime::now_utc().date() + Duration::days(7); - let expiry_timestamp = tomorrow.midnight().assume_utc(); + let expiry_timestamp = coordinator_commons::calculate_next_expiry(OffsetDateTime::now_utc()); // For now we hardcode the oracle pubkey here let oracle_pk = XOnlyPublicKey::from_str( @@ -431,45 +379,20 @@ fn sort_orders(mut orders: Vec, is_long: bool) -> Vec { orders } -async fn notify_trader( - trader_id: PublicKey, - message: OrderbookMsg, - traders: &HashMap>, -) -> Result<()> { - match traders.get(&trader_id) { - None => bail!("Trader is not connected"), - Some(sender) => sender - .send(message) - .await - .map_err(|err| anyhow!("Connection lost to trader {err:#}")), - } -} - #[cfg(test)] pub mod tests { use crate::orderbook::trading::match_order; - use crate::orderbook::trading::notify_trader; use crate::orderbook::trading::sort_orders; - use crate::orderbook::trading::MatchParams; - use crate::orderbook::trading::TraderMatchParams; use bitcoin::secp256k1::PublicKey; - use bitcoin::secp256k1::SecretKey; - use bitcoin::secp256k1::SECP256K1; - use bitcoin::XOnlyPublicKey; - use orderbook_commons::FilledWith; - use orderbook_commons::Match; use orderbook_commons::Order; use orderbook_commons::OrderReason; use orderbook_commons::OrderState; use orderbook_commons::OrderType; - use orderbook_commons::OrderbookMsg; use rust_decimal::Decimal; use rust_decimal_macros::dec; - use std::collections::HashMap; use std::str::FromStr; use time::Duration; use time::OffsetDateTime; - use tokio::sync::mpsc; use trade::ContractSymbol; use trade::Direction; use uuid::Uuid; @@ -766,88 +689,4 @@ pub mod tests { assert!(matched_orders.is_none()); } - - #[tokio::test] - async fn given_matches_will_notify_all_traders() { - let trader_key = SecretKey::from_slice(&b"Me noob, don't lose money pleazz"[..]).unwrap(); - let trader_pub_key = trader_key.public_key(SECP256K1); - let maker_key = SecretKey::from_slice(&b"I am a king trader mate, right!?"[..]).unwrap(); - let maker_pub_key = maker_key.public_key(SECP256K1); - let trader_order_id = Uuid::new_v4(); - let maker_order_id = Uuid::new_v4(); - let oracle_pk = XOnlyPublicKey::from_str( - "16f88cf7d21e6c0f46bcbc983a4e3b19726c6c98858cc31c83551a88fde171c0", - ) - .unwrap(); - let maker_order_price = dec!(20_000); - let expiry_timestamp = OffsetDateTime::now_utc(); - let matched_orders = MatchParams { - taker_match: TraderMatchParams { - trader_id: trader_pub_key, - filled_with: FilledWith { - order_id: trader_order_id, - expiry_timestamp, - oracle_pk, - matches: vec![Match { - id: Uuid::new_v4(), - order_id: maker_order_id, - quantity: dec!(100), - pubkey: maker_pub_key, - execution_price: maker_order_price, - }], - }, - }, - makers_matches: vec![TraderMatchParams { - trader_id: maker_pub_key, - filled_with: FilledWith { - order_id: maker_order_id, - expiry_timestamp, - oracle_pk, - matches: vec![Match { - id: Uuid::new_v4(), - order_id: trader_order_id, - quantity: dec!(100), - pubkey: trader_pub_key, - execution_price: maker_order_price, - }], - }, - }], - }; - let mut traders = HashMap::new(); - let (maker_sender, mut maker_receiver) = mpsc::channel::(1); - let (trader_sender, mut trader_receiver) = mpsc::channel::(1); - traders.insert(maker_pub_key, maker_sender); - traders.insert(trader_pub_key, trader_sender); - - for match_param in matched_orders.matches() { - notify_trader( - match_param.trader_id, - OrderbookMsg::Match(match_param.filled_with.clone()), - &traders, - ) - .await - .unwrap(); - } - - let maker_msg = maker_receiver.recv().await.unwrap(); - let trader_msg = trader_receiver.recv().await.unwrap(); - - match maker_msg { - OrderbookMsg::Match(msg) => { - assert_eq!(msg.order_id, maker_order_id) - } - _ => { - panic!("Invalid message received") - } - } - - match trader_msg { - OrderbookMsg::Match(msg) => { - assert_eq!(msg.order_id, trader_order_id) - } - _ => { - panic!("Invalid message received") - } - } - } } diff --git a/coordinator/src/orderbook/websocket.rs b/coordinator/src/orderbook/websocket.rs index 1147b1221..52fe4efbd 100644 --- a/coordinator/src/orderbook/websocket.rs +++ b/coordinator/src/orderbook/websocket.rs @@ -1,13 +1,12 @@ +use crate::notification::NewUserMessage; use crate::orderbook; -use crate::orderbook::trading::NewUserMessage; -use crate::orderbook::trading::TradingMessage; use crate::routes::AppState; -use axum::extract::ws::Message; +use axum::extract::ws::Message as WebsocketMessage; use axum::extract::ws::WebSocket; use futures::SinkExt; use futures::StreamExt; use orderbook_commons::create_sign_message; -use orderbook_commons::OrderbookMsg; +use orderbook_commons::Message; use orderbook_commons::OrderbookRequest; use orderbook_commons::Signature; use std::sync::Arc; @@ -25,7 +24,7 @@ pub async fn websocket_connection(stream: WebSocket, state: Arc) { // We subscribe *before* sending the "joined" message, so that we will also // display it to our client. - let mut rx = state.tx_price_feed.subscribe(); + let mut price_feed = state.tx_price_feed.subscribe(); let mut conn = match state.pool.clone().get() { Ok(conn) => conn, @@ -44,11 +43,11 @@ pub async fn websocket_connection(stream: WebSocket, state: Arc) { }; // Now send the "all orders" to the new client. - if let Ok(msg) = serde_json::to_string(&OrderbookMsg::AllOrders(orders)) { - let _ = sender.send(Message::Text(msg)).await; + if let Ok(msg) = serde_json::to_string(&Message::AllOrders(orders)) { + let _ = sender.send(WebsocketMessage::Text(msg)).await; } - let (local_sender, mut local_receiver) = mpsc::channel::(100); + let (local_sender, mut local_receiver) = mpsc::channel::(100); let mut local_recv_task = tokio::spawn(async move { while let Some(local_msg) = local_receiver.recv().await { @@ -56,7 +55,7 @@ pub async fn websocket_connection(stream: WebSocket, state: Arc) { Ok(msg) => { if let Err(err) = tokio::time::timeout( WEBSOCKET_SEND_TIMEOUT, - sender.send(Message::Text(msg.clone())), + sender.send(WebsocketMessage::Text(msg.clone())), ) .await { @@ -76,7 +75,7 @@ pub async fn websocket_connection(stream: WebSocket, state: Arc) { let mut send_task = { let local_sender = local_sender.clone(); tokio::spawn(async move { - while let Ok(st) = rx.recv().await { + while let Ok(st) = price_feed.recv().await { if let Err(error) = local_sender.send(st).await { tracing::error!("Could not send message {error:#}"); return; @@ -88,28 +87,28 @@ pub async fn websocket_connection(stream: WebSocket, state: Arc) { // Spawn a task that takes messages from the websocket let local_sender = local_sender.clone(); let mut recv_task = tokio::spawn(async move { - while let Some(Ok(Message::Text(text))) = receiver.next().await { + while let Some(Ok(WebsocketMessage::Text(text))) = receiver.next().await { match serde_json::from_str(text.as_str()) { Ok(OrderbookRequest::Authenticate(Signature { signature, pubkey })) => { let msg = create_sign_message(); match signature.verify(&msg, &pubkey) { Ok(_) => { - if let Err(e) = local_sender.send(OrderbookMsg::Authenticated).await { + if let Err(e) = local_sender.send(Message::Authenticated).await { tracing::error!("Could not respond to user {e:#}"); return; } - let message = TradingMessage::NewUser(NewUserMessage { + let message = NewUserMessage { new_user: pubkey, sender: local_sender.clone(), - }); - if let Err(e) = state.trading_sender.send(message).await { + }; + if let Err(e) = state.tx_user_feed.send(message) { tracing::error!("Could not send trading message. Error: {e:#}"); } } Err(err) => { if let Err(er) = local_sender - .send(OrderbookMsg::InvalidAuthentication(format!( + .send(Message::InvalidAuthentication(format!( "Could not authenticate {err:#}" ))) .await diff --git a/coordinator/src/position/models.rs b/coordinator/src/position/models.rs index 5b0750500..2e9ede8ee 100644 --- a/coordinator/src/position/models.rs +++ b/coordinator/src/position/models.rs @@ -74,6 +74,11 @@ pub struct Position { } impl Position { + // Returns true if the position is expired + pub fn is_expired(&self) -> bool { + OffsetDateTime::now_utc() >= self.expiry_timestamp + } + /// Calculates the profit and loss for the coordinator in satoshis pub fn calculate_coordinator_pnl(&self, quote: Quote) -> Result { let closing_price = match self.closing_price { diff --git a/coordinator/src/routes.rs b/coordinator/src/routes.rs index 1e9a3d572..5425c5537 100644 --- a/coordinator/src/routes.rs +++ b/coordinator/src/routes.rs @@ -11,13 +11,14 @@ use crate::admin::send_payment; use crate::admin::sign_message; use crate::db::user; use crate::node::Node; +use crate::notification::NewUserMessage; use crate::orderbook::routes::delete_order; use crate::orderbook::routes::get_order; use crate::orderbook::routes::get_orders; use crate::orderbook::routes::post_order; use crate::orderbook::routes::put_order; use crate::orderbook::routes::websocket_handler; -use crate::orderbook::trading::TradingMessage; +use crate::orderbook::trading::NewOrderMessage; use crate::settings::Settings; use crate::AppError; use autometrics::autometrics; @@ -48,7 +49,7 @@ use ln_dlc_node::node::peer_manager::alias_as_bytes; use ln_dlc_node::node::peer_manager::broadcast_node_announcement; use ln_dlc_node::node::NodeInfo; use opentelemetry_prometheus::PrometheusExporter; -use orderbook_commons::OrderbookMsg; +use orderbook_commons::Message; use orderbook_commons::RouteHintHop; use prometheus::Encoder; use prometheus::TextEncoder; @@ -65,8 +66,9 @@ use tracing::instrument; pub struct AppState { pub node: Node, // Channel used to send messages to all connected clients. - pub tx_price_feed: broadcast::Sender, - pub trading_sender: mpsc::Sender, + pub tx_price_feed: broadcast::Sender, + pub tx_user_feed: broadcast::Sender, + pub trading_sender: mpsc::Sender, pub pool: Pool>, pub settings: RwLock, pub exporter: PrometheusExporter, @@ -82,14 +84,16 @@ pub fn router( exporter: PrometheusExporter, announcement_addresses: Vec, node_alias: &str, - trading_sender: mpsc::Sender, - tx_price_feed: broadcast::Sender, + trading_sender: mpsc::Sender, + tx_price_feed: broadcast::Sender, + tx_user_feed: broadcast::Sender, ) -> Router { let app_state = Arc::new(AppState { node, pool, settings: RwLock::new(settings), tx_price_feed, + tx_user_feed, trading_sender, exporter, announcement_addresses, diff --git a/crates/coordinator-commons/src/lib.rs b/crates/coordinator-commons/src/lib.rs index e239f7b5a..dd23d4e2b 100644 --- a/crates/coordinator-commons/src/lib.rs +++ b/crates/coordinator-commons/src/lib.rs @@ -3,6 +3,10 @@ use orderbook_commons::FilledWith; use rust_decimal::Decimal; use serde::Deserialize; use serde::Serialize; +use time::macros::time; +use time::Duration; +use time::OffsetDateTime; +use time::Weekday; use trade::ContractSymbol; use trade::Direction; @@ -78,3 +82,141 @@ pub struct TokenUpdateParams { pub pubkey: String, pub fcm_token: String, } + +/// Calculates the expiry timestamp at the next Sunday at 3 pm UTC from a given offset date time. +/// If the argument falls in between Friday, 3 pm UTC and Sunday, 3pm UTC, the expiry will be +/// calculated to next weeks Sunday at 3 pm +pub fn calculate_next_expiry(time: OffsetDateTime) -> OffsetDateTime { + let days = if is_in_rollover_weekend(time) || time.weekday() == Weekday::Sunday { + // if the provided time is in the rollover weekend or on a sunday, we expire the sunday the + // week after. + 7 - time.weekday().number_from_monday() + 7 + } else { + 7 - time.weekday().number_from_monday() + }; + let time = time.date().with_hms(15, 0, 0).expect("to fit into time"); + + (time + Duration::days(days as i64)).assume_utc() +} + +/// Checks whether the provided expiry date is eligible for a rollover +/// +/// Returns true if the given date falls in between friday 15 pm UTC and sunday 15 pm UTC +pub fn is_in_rollover_weekend(timestamp: OffsetDateTime) -> bool { + match timestamp.weekday() { + Weekday::Friday => timestamp.time() >= time!(15:00), + Weekday::Saturday => true, + Weekday::Sunday => timestamp.time() < time!(15:00), + _ => false, + } +} + +#[cfg(test)] +mod test { + use crate::calculate_next_expiry; + use crate::is_in_rollover_weekend; + use time::OffsetDateTime; + + #[test] + fn test_is_not_in_rollover_weekend() { + // Wed Aug 09 2023 09:30:23 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691573423).unwrap(); + assert!(!is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_is_just_in_rollover_weekend_friday() { + // Fri Aug 11 2023 15:00:00 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691766000).unwrap(); + assert!(is_in_rollover_weekend(expiry)); + + // Fri Aug 11 2023 15:00:01 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691766001).unwrap(); + assert!(is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_is_in_rollover_weekend_saturday() { + // Sat Aug 12 2023 16:00:00 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691856000).unwrap(); + assert!(is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_is_just_in_rollover_weekend_sunday() { + // Sun Aug 13 2023 14:59:59 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691938799).unwrap(); + assert!(is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_is_just_not_in_rollover_weekend_sunday() { + // Sun Aug 13 2023 15:00:00 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691938800).unwrap(); + assert!(!is_in_rollover_weekend(expiry)); + + // Sun Aug 13 2023 15:00:01 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691938801).unwrap(); + assert!(!is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_expiry_timestamp_before_friday_15pm() { + // Wed Aug 09 2023 09:30:23 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691573423).unwrap(); + let expiry = calculate_next_expiry(from); + + // Sun Aug 13 2023 15:00:00 GMT+0000 + assert_eq!(1691938800, expiry.unix_timestamp()); + } + + #[test] + fn test_expiry_timestamp_just_before_friday_15pm() { + // Fri Aug 11 2023 14:59:59 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691765999).unwrap(); + let expiry = calculate_next_expiry(from); + + // Sun Aug 13 2023 15:00:00 GMT+0000 + assert_eq!(1691938800, expiry.unix_timestamp()); + } + + #[test] + fn test_expiry_timestamp_just_after_friday_15pm() { + // Fri Aug 11 2023 15:00:01 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691766001).unwrap(); + let expiry = calculate_next_expiry(from); + + // Sun Aug 20 2023 15:00:00 GMT+0000 + assert_eq!(1692543600, expiry.unix_timestamp()); + } + + #[test] + fn test_expiry_timestamp_at_friday_15pm() { + // Fri Aug 11 2023 15:00:00 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691766000).unwrap(); + let expiry = calculate_next_expiry(from); + + // Sun Aug 20 2023 15:00:00 GMT+0000 + assert_eq!(1692543600, expiry.unix_timestamp()); + } + + #[test] + fn test_expiry_timestamp_after_sunday_15pm() { + // Sun Aug 06 2023 16:00:00 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691337600).unwrap(); + let expiry = calculate_next_expiry(from); + + // Sun Aug 13 2023 15:00:00 GMT+0000 + assert_eq!(1691938800, expiry.unix_timestamp()); + } + + #[test] + fn test_expiry_timestamp_on_saturday() { + // // Sat Aug 12 2023 16:00:00 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691856000).unwrap(); + let expiry = calculate_next_expiry(from); + + // Sun Aug 20 2023 15:00:00 GMT+0000 + assert_eq!(1692543600, expiry.unix_timestamp()); + } +} diff --git a/crates/orderbook-commons/src/lib.rs b/crates/orderbook-commons/src/lib.rs index 38ff9e369..f571f6904 100644 --- a/crates/orderbook-commons/src/lib.rs +++ b/crates/orderbook-commons/src/lib.rs @@ -2,11 +2,9 @@ pub use crate::order_matching_fee::order_matching_fee_taker; pub use crate::price::best_current_price; pub use crate::price::Price; pub use crate::price::Prices; -use anyhow::ensure; -use anyhow::Result; use rust_decimal::prelude::ToPrimitive; use rust_decimal::Decimal; -use secp256k1::Message; +use secp256k1::Message as SecpMessage; use secp256k1::PublicKey; use secp256k1::XOnlyPublicKey; use serde::Deserialize; @@ -14,8 +12,6 @@ use serde::Serialize; use sha2::digest::FixedOutput; use sha2::Digest; use sha2::Sha256; -use std::str::FromStr; -use time::Duration; use time::OffsetDateTime; use trade::ContractSymbol; use trade::Direction; @@ -67,11 +63,11 @@ pub struct Signature { pub signature: secp256k1::ecdsa::Signature, } -pub fn create_sign_message() -> Message { +pub fn create_sign_message() -> SecpMessage { let sign_message = "Hello it's me Mario".to_string(); let hashed_message = Sha256::new().chain_update(sign_message).finalize_fixed(); - let msg = Message::from_slice(hashed_message.as_slice()) + let msg = SecpMessage::from_slice(hashed_message.as_slice()) .expect("The message is static, hence this should never happen"); msg } @@ -115,8 +111,10 @@ pub enum OrderbookRequest { Authenticate(Signature), } +// TODO(holzeis): The message enum should not be in the orderbook-commons crate as it also contains +// coordinator messages. We should move all common crates into a single one. #[derive(Serialize, Clone, Deserialize, Debug)] -pub enum OrderbookMsg { +pub enum Message { AllOrders(Vec), NewOrder(Order), DeleteOrder(Uuid), @@ -128,6 +126,7 @@ pub enum OrderbookMsg { order: Order, filled_with: FilledWith, }, + Rollover, } /// A match for an order @@ -317,41 +316,6 @@ pub struct Matches { pub updated_at: OffsetDateTime, } -pub fn get_filled_with_from_matches(matches: Vec) -> Result { - ensure!( - !matches.is_empty(), - "Need at least one matches record to construct a FilledWith" - ); - - let order_id = matches - .first() - .expect("to have at least one match") - .order_id; - let oracle_pk = XOnlyPublicKey::from_str( - "16f88cf7d21e6c0f46bcbc983a4e3b19726c6c98858cc31c83551a88fde171c0", - ) - .expect("To be a valid pubkey"); - - let tomorrow = OffsetDateTime::now_utc().date() + Duration::days(7); - let expiry_timestamp = tomorrow.midnight().assume_utc(); - - Ok(FilledWith { - order_id, - expiry_timestamp, - oracle_pk, - matches: matches - .iter() - .map(|m| Match { - id: m.id, - order_id: m.order_id, - quantity: m.quantity, - pubkey: m.match_trader_id, - execution_price: m.execution_price, - }) - .collect(), - }) -} - #[cfg(test)] mod test { use crate::FilledWith; diff --git a/crates/tests-e2e/src/test_subscriber.rs b/crates/tests-e2e/src/test_subscriber.rs index b3c4c8118..f814a55b6 100644 --- a/crates/tests-e2e/src/test_subscriber.rs +++ b/crates/tests-e2e/src/test_subscriber.rs @@ -199,7 +199,7 @@ impl Senders { native::event::EventInternal::PaymentClaimed(_amount_msats) => { unreachable!("PaymentClaimed event should not be sent to the subscriber"); } - native::event::EventInternal::AsyncTrade(_order_id) => { + native::event::EventInternal::BackgroundNotification(_task) => { // ignored } } diff --git a/crates/tests-e2e/tests/rollover_position.rs b/crates/tests-e2e/tests/rollover_position.rs index 5e5bf658d..8e6af79dc 100644 --- a/crates/tests-e2e/tests/rollover_position.rs +++ b/crates/tests-e2e/tests/rollover_position.rs @@ -4,7 +4,6 @@ use position::PositionState; use tests_e2e::app::AppHandle; use tests_e2e::setup; use tests_e2e::wait_until; -use time::Duration; use time::OffsetDateTime; #[tokio::test] @@ -23,8 +22,7 @@ async fn can_rollover_position() { .unwrap(); let position = test.app.rx.position().expect("position to exist"); - let tomorrow = position.expiry.date() + Duration::days(7); - let new_expiry = tomorrow.midnight().assume_utc(); + let new_expiry = coordinator_commons::calculate_next_expiry(position.expiry); coordinator .rollover(&dlc_channel.dlc_channel_id.unwrap()) diff --git a/mobile/lib/common/domain/background_task.dart b/mobile/lib/common/domain/background_task.dart new file mode 100644 index 000000000..e6c9afe90 --- /dev/null +++ b/mobile/lib/common/domain/background_task.dart @@ -0,0 +1,51 @@ +import 'package:get_10101/bridge_generated/bridge_definitions.dart' as bridge; +import 'package:get_10101/features/trade/domain/order.dart'; + +class AsyncTrade { + final OrderReason orderReason; + + AsyncTrade({required this.orderReason}); + + static AsyncTrade fromApi(bridge.BackgroundTask_AsyncTrade asyncTrade) { + return AsyncTrade(orderReason: OrderReason.fromApi(asyncTrade.field0)); + } + + static bridge.BackgroundTask apiDummy() { + return bridge.BackgroundTask_AsyncTrade(OrderReason.apiDummy()); + } +} + +enum TaskStatus { + pending, + failed, + success; + + static TaskStatus fromApi(bridge.TaskStatus taskStatus) { + switch (taskStatus) { + case bridge.TaskStatus.Pending: + return TaskStatus.pending; + case bridge.TaskStatus.Failed: + return TaskStatus.failed; + case bridge.TaskStatus.Success: + return TaskStatus.success; + } + } + + static bridge.TaskStatus apiDummy() { + return bridge.TaskStatus.Pending; + } +} + +class Rollover { + final TaskStatus taskStatus; + + Rollover({required this.taskStatus}); + + static Rollover fromApi(bridge.BackgroundTask_Rollover rollover) { + return Rollover(taskStatus: TaskStatus.fromApi(rollover.field0)); + } + + static bridge.BackgroundTask apiDummy() { + return bridge.BackgroundTask_Rollover(TaskStatus.apiDummy()); + } +} diff --git a/mobile/lib/features/stable/stable_confirmation_sheet.dart b/mobile/lib/features/stable/stable_confirmation_sheet.dart index a66be0e90..435ee08af 100644 --- a/mobile/lib/features/stable/stable_confirmation_sheet.dart +++ b/mobile/lib/features/stable/stable_confirmation_sheet.dart @@ -130,8 +130,6 @@ class _StableBottomSheet extends State { }); } - final now = DateTime.now(); - return Form( key: _formKey, child: Column( @@ -233,7 +231,7 @@ class _StableBottomSheet extends State { const SizedBox(height: 16.0), ValueDataRow( type: ValueType.date, - value: DateTime.utc(now.year, now.month, now.day + 2).toLocal(), + value: tradeValues.expiry.toLocal(), label: "Expiry", valueTextStyle: const TextStyle(fontSize: 18), labelTextStyle: const TextStyle(fontSize: 18)), diff --git a/mobile/lib/features/trade/application/trade_values_service.dart b/mobile/lib/features/trade/application/trade_values_service.dart index 063db318a..b0897d425 100644 --- a/mobile/lib/features/trade/application/trade_values_service.dart +++ b/mobile/lib/features/trade/application/trade_values_service.dart @@ -1,7 +1,7 @@ -import 'package:get_10101/ffi.dart' as rust; -import 'package:get_10101/features/trade/domain/leverage.dart'; import 'package:get_10101/common/domain/model.dart'; import 'package:get_10101/features/trade/domain/direction.dart'; +import 'package:get_10101/features/trade/domain/leverage.dart'; +import 'package:get_10101/ffi.dart' as rust; class TradeValuesService { Amount? calculateMargin( @@ -39,4 +39,8 @@ class TradeValuesService { price: price, leverage: leverage.leverage, direction: direction.toApi()); } } + + DateTime getExpiryTimestamp() { + return DateTime.fromMillisecondsSinceEpoch(rust.api.getExpiryTimestamp() * 1000); + } } diff --git a/mobile/lib/features/trade/async_order_change_notifier.dart b/mobile/lib/features/trade/async_order_change_notifier.dart index 347508a64..5cff390f4 100644 --- a/mobile/lib/features/trade/async_order_change_notifier.dart +++ b/mobile/lib/features/trade/async_order_change_notifier.dart @@ -2,6 +2,7 @@ import 'package:f_logs/model/flog/flog.dart'; import 'package:flutter/material.dart'; import 'package:get_10101/bridge_generated/bridge_definitions.dart' as bridge; import 'package:get_10101/common/application/event_service.dart'; +import 'package:get_10101/common/domain/background_task.dart'; import 'package:get_10101/common/global_keys.dart'; import 'package:get_10101/features/trade/application/order_service.dart'; import 'package:get_10101/features/trade/domain/order.dart'; @@ -26,9 +27,10 @@ class AsyncOrderChangeNotifier extends ChangeNotifier implements Subscriber { @override void notify(bridge.Event event) { - if (event is bridge.Event_AsyncTrade) { - OrderReason reason = OrderReason.fromApi(event.field0); - FLog.debug(text: "Received a async trade event. Reason: $reason"); + if (event is bridge.Event_BackgroundNotification && + event.field0 is bridge.BackgroundTask_AsyncTrade) { + AsyncTrade asyncTrade = AsyncTrade.fromApi(event.field0 as bridge.BackgroundTask_AsyncTrade); + FLog.debug(text: "Received a async trade event. Reason: ${asyncTrade.orderReason}"); showDialog( context: shellNavigatorKey.currentContext!, builder: (context) { @@ -47,7 +49,7 @@ class AsyncOrderChangeNotifier extends ChangeNotifier implements Subscriber { } late Widget content; - switch (reason) { + switch (asyncTrade.orderReason) { case OrderReason.expired: content = const Text("Your position has been closed due to expiry."); case OrderReason.manual: diff --git a/mobile/lib/features/trade/domain/trade_values.dart b/mobile/lib/features/trade/domain/trade_values.dart index 009bda73e..5031fa132 100644 --- a/mobile/lib/features/trade/domain/trade_values.dart +++ b/mobile/lib/features/trade/domain/trade_values.dart @@ -16,6 +16,7 @@ class TradeValues { Amount? fee; // This fee is an estimate of the order-matching fee. double fundingRate; + DateTime expiry; // no final so it can be mocked in tests TradeValuesService tradeValuesService; @@ -29,6 +30,7 @@ class TradeValues { required this.liquidationPrice, required this.fee, required this.fundingRate, + required this.expiry, required this.tradeValuesService}); factory TradeValues.fromQuantity( @@ -47,6 +49,8 @@ class TradeValues { Amount? fee = orderMatchingFee(quantity, price); + DateTime expiry = tradeValuesService.getExpiryTimestamp(); + return TradeValues( direction: direction, margin: margin, @@ -56,6 +60,7 @@ class TradeValues { fundingRate: fundingRate, liquidationPrice: liquidationPrice, fee: fee, + expiry: expiry, tradeValuesService: tradeValuesService); } @@ -75,6 +80,8 @@ class TradeValues { Amount? fee = orderMatchingFee(quantity, price); + DateTime expiry = tradeValuesService.getExpiryTimestamp(); + return TradeValues( direction: direction, margin: margin, @@ -84,6 +91,7 @@ class TradeValues { fundingRate: fundingRate, liquidationPrice: liquidationPrice, fee: fee, + expiry: expiry, tradeValuesService: tradeValuesService); } diff --git a/mobile/lib/features/trade/rollover_change_notifier.dart b/mobile/lib/features/trade/rollover_change_notifier.dart new file mode 100644 index 000000000..03cc748a2 --- /dev/null +++ b/mobile/lib/features/trade/rollover_change_notifier.dart @@ -0,0 +1,54 @@ +import 'package:f_logs/model/flog/flog.dart'; +import 'package:flutter/material.dart'; +import 'package:get_10101/bridge_generated/bridge_definitions.dart' as bridge; +import 'package:get_10101/common/application/event_service.dart'; +import 'package:get_10101/common/domain/background_task.dart'; +import 'package:get_10101/common/global_keys.dart'; +import 'package:get_10101/features/trade/order_submission_status_dialog.dart'; +import 'package:provider/provider.dart'; + +class RolloverChangeNotifier extends ChangeNotifier implements Subscriber { + late TaskStatus taskStatus; + + @override + void notify(bridge.Event event) { + if (event is bridge.Event_BackgroundNotification && + event.field0 is bridge.BackgroundTask_Rollover) { + Rollover rollover = Rollover.fromApi(event.field0 as bridge.BackgroundTask_Rollover); + FLog.debug(text: "Received a rollover event. Status: ${rollover.taskStatus}"); + + taskStatus = rollover.taskStatus; + + if (taskStatus == TaskStatus.pending) { + // initialize dialog for the pending task + showDialog( + context: shellNavigatorKey.currentContext!, + builder: (context) { + TaskStatus status = context.watch().taskStatus; + + // todo(holzeis): Reusing the order submission status dialog is not nice, but it's actually suitable for any task execution that has pending, + // failed and success states. We may should consider renaming this dialog for its more generic purpose. + OrderSubmissionStatusDialogType type = OrderSubmissionStatusDialogType.pendingSubmit; + switch (status) { + case TaskStatus.pending: + type = OrderSubmissionStatusDialogType.successfulSubmit; + case TaskStatus.failed: + type = OrderSubmissionStatusDialogType.failedFill; + case TaskStatus.success: + type = OrderSubmissionStatusDialogType.filled; + } + + late Widget content = const Text("Rolling over your position"); + + return OrderSubmissionStatusDialog(title: "Catching up!", type: type, content: content); + }, + ); + } else { + // notify dialog about changed task status + notifyListeners(); + } + } else { + FLog.warning(text: "Received unexpected event: ${event.toString()}"); + } + } +} diff --git a/mobile/lib/features/trade/submit_order_change_notifier.dart b/mobile/lib/features/trade/submit_order_change_notifier.dart index fa138abcc..24af68c24 100644 --- a/mobile/lib/features/trade/submit_order_change_notifier.dart +++ b/mobile/lib/features/trade/submit_order_change_notifier.dart @@ -98,6 +98,7 @@ class SubmitOrderChangeNotifier extends ChangeNotifier implements Subscriber { liquidationPrice: position.liquidationPrice, fee: fee, fundingRate: 0, + expiry: position.expiry, tradeValuesService: TradeValuesService()), PositionAction.close); } diff --git a/mobile/lib/features/trade/trade_bottom_sheet_confirmation.dart b/mobile/lib/features/trade/trade_bottom_sheet_confirmation.dart index 3f6a4b433..9e2261f4c 100644 --- a/mobile/lib/features/trade/trade_bottom_sheet_confirmation.dart +++ b/mobile/lib/features/trade/trade_bottom_sheet_confirmation.dart @@ -101,7 +101,6 @@ class TradeBottomSheetConfirmation extends StatelessWidget { pnl = position!.unrealizedPnl != null ? position.unrealizedPnl! : Amount(0); } - DateTime now = DateTime.now().toUtc(); TextStyle dataRowStyle = const TextStyle(fontSize: 14); return Container( @@ -122,7 +121,7 @@ class TradeBottomSheetConfirmation extends StatelessWidget { if (!close) ValueDataRow( type: ValueType.date, - value: DateTime.utc(now.year, now.month, now.day + 7).toLocal(), + value: tradeValues.expiry.toLocal(), label: 'Expiry'), close ? ValueDataRow( diff --git a/mobile/lib/main.dart b/mobile/lib/main.dart index 3607719b3..670bda3fe 100644 --- a/mobile/lib/main.dart +++ b/mobile/lib/main.dart @@ -12,6 +12,7 @@ import 'package:get_10101/common/application/channel_info_service.dart'; import 'package:get_10101/common/application/event_service.dart'; import 'package:get_10101/common/channel_status_notifier.dart'; import 'package:get_10101/common/color.dart'; +import 'package:get_10101/common/domain/background_task.dart'; import 'package:get_10101/common/domain/service_status.dart'; import 'package:get_10101/common/global_keys.dart'; import 'package:get_10101/common/service_status_notifier.dart'; @@ -27,6 +28,7 @@ import 'package:get_10101/features/trade/domain/position.dart'; import 'package:get_10101/features/trade/domain/price.dart'; import 'package:get_10101/features/trade/order_change_notifier.dart'; import 'package:get_10101/features/trade/position_change_notifier.dart'; +import 'package:get_10101/features/trade/rollover_change_notifier.dart'; import 'package:get_10101/features/trade/submit_order_change_notifier.dart'; import 'package:get_10101/features/trade/trade_screen.dart'; import 'package:get_10101/features/trade/trade_theme.dart'; @@ -87,6 +89,7 @@ void main() async { ChangeNotifierProvider(create: (context) => ServiceStatusNotifier()), ChangeNotifierProvider(create: (context) => ChannelStatusNotifier()), ChangeNotifierProvider(create: (context) => AsyncOrderChangeNotifier(OrderService())), + ChangeNotifierProvider(create: (context) => RolloverChangeNotifier()), Provider(create: (context) => Environment.parse()), Provider(create: (context) => channelInfoService) ], child: const TenTenOneApp())); @@ -477,6 +480,7 @@ void subscribeToNotifiers(BuildContext context) { final channelStatusNotifier = context.read(); final stableValuesChangeNotifier = context.read(); final asyncOrderChangeNotifier = context.read(); + final rolloverChangeNotifier = context.read(); eventService.subscribe( orderChangeNotifier, bridge.Event.orderUpdateNotification(Order.apiDummy())); @@ -509,7 +513,11 @@ void subscribeToNotifiers(BuildContext context) { eventService.subscribe( asyncOrderChangeNotifier, bridge.Event.orderUpdateNotification(Order.apiDummy())); - eventService.subscribe(asyncOrderChangeNotifier, bridge.Event.asyncTrade(OrderReason.apiDummy())); + eventService.subscribe( + asyncOrderChangeNotifier, bridge.Event.backgroundNotification(AsyncTrade.apiDummy())); + + eventService.subscribe( + rolloverChangeNotifier, bridge.Event.backgroundNotification(Rollover.apiDummy())); channelStatusNotifier.subscribe(eventService); diff --git a/mobile/native/src/api.rs b/mobile/native/src/api.rs index 759084e24..0d049ee68 100644 --- a/mobile/native/src/api.rs +++ b/mobile/native/src/api.rs @@ -34,6 +34,7 @@ use std::ops::Add; use std::str::FromStr; use std::time::Duration; use std::time::SystemTime; +use time::OffsetDateTime; pub use trade::ContractSymbol; pub use trade::Direction; @@ -394,3 +395,9 @@ pub fn get_channel_open_fee_estimate_sat() -> Result { Ok(estimate.ceil() as u64) } + +pub fn get_expiry_timestamp() -> SyncReturn { + SyncReturn( + coordinator_commons::calculate_next_expiry(OffsetDateTime::now_utc()).unix_timestamp(), + ) +} diff --git a/mobile/native/src/event/api.rs b/mobile/native/src/event/api.rs index 61f7330e3..6a4cf9a8e 100644 --- a/mobile/native/src/event/api.rs +++ b/mobile/native/src/event/api.rs @@ -1,4 +1,5 @@ use crate::api::WalletInfo; +use crate::event; use crate::event::subscriber::Subscriber; use crate::event::EventInternal; use crate::event::EventType; @@ -25,7 +26,14 @@ pub enum Event { PriceUpdateNotification(BestPrice), ServiceHealthUpdate(ServiceUpdate), ChannelStatusUpdate(ChannelStatus), + BackgroundNotification(BackgroundTask), +} + +#[frb] +#[derive(Clone)] +pub enum BackgroundTask { AsyncTrade(OrderReason), + Rollover(TaskStatus), } impl From for Event { @@ -64,7 +72,9 @@ impl From for Event { EventInternal::PaymentClaimed(_) => { unreachable!("This internal event is not exposed to the UI") } - EventInternal::AsyncTrade(reason) => Event::AsyncTrade(reason.into()), + EventInternal::BackgroundNotification(task) => { + Event::BackgroundNotification(task.into()) + } } } } @@ -100,7 +110,7 @@ impl Subscriber for FlutterSubscriber { EventType::PriceUpdateNotification, EventType::ServiceHealthUpdate, EventType::ChannelStatusUpdate, - EventType::AsyncTrade, + EventType::BackgroundNotification, ] } } @@ -111,6 +121,35 @@ impl FlutterSubscriber { } } +impl From for BackgroundTask { + fn from(value: event::BackgroundTask) -> Self { + match value { + event::BackgroundTask::AsyncTrade(order_reason) => { + BackgroundTask::AsyncTrade(order_reason.into()) + } + event::BackgroundTask::Rollover(status) => BackgroundTask::Rollover(status.into()), + } + } +} + +#[frb] +#[derive(Clone)] +pub enum TaskStatus { + Pending, + Failed, + Success, +} + +impl From for TaskStatus { + fn from(value: event::TaskStatus) -> Self { + match value { + event::TaskStatus::Pending => TaskStatus::Pending, + event::TaskStatus::Failed => TaskStatus::Failed, + event::TaskStatus::Success => TaskStatus::Success, + } + } +} + /// The best bid and ask price for a contract. /// /// Best prices come from an orderbook. Contrary to the `Price` struct, we can have no price diff --git a/mobile/native/src/event/mod.rs b/mobile/native/src/event/mod.rs index 1bf2ec3c0..1d30f9349 100644 --- a/mobile/native/src/event/mod.rs +++ b/mobile/native/src/event/mod.rs @@ -30,7 +30,6 @@ pub fn publish(event: &EventInternal) { pub enum EventInternal { Init(String), Log(String), - AsyncTrade(OrderReason), OrderUpdateNotification(Order), WalletInfoUpdateNotification(WalletInfo), OrderFilledWith(Box), @@ -41,6 +40,20 @@ pub enum EventInternal { PaymentClaimed(u64), ServiceHealthUpdate(ServiceUpdate), ChannelStatusUpdate(ChannelStatus), + BackgroundNotification(BackgroundTask), +} + +#[derive(Clone, Debug)] +pub enum BackgroundTask { + AsyncTrade(OrderReason), + Rollover(TaskStatus), +} + +#[derive(Clone, Debug)] +pub enum TaskStatus { + Pending, + Failed, + Success, } impl fmt::Display for EventInternal { @@ -58,7 +71,7 @@ impl fmt::Display for EventInternal { EventInternal::PaymentClaimed(_) => "PaymentClaimed", EventInternal::ServiceHealthUpdate(_) => "ServiceHealthUpdate", EventInternal::ChannelStatusUpdate(_) => "ChannelStatusUpdate", - EventInternal::AsyncTrade(_) => "AsyncTrade", + EventInternal::BackgroundNotification(_) => "BackgroundNotification", } .fmt(f) } @@ -81,7 +94,7 @@ impl From for EventType { EventInternal::PaymentClaimed(_) => EventType::PaymentClaimed, EventInternal::ServiceHealthUpdate(_) => EventType::ServiceHealthUpdate, EventInternal::ChannelStatusUpdate(_) => EventType::ChannelStatusUpdate, - EventInternal::AsyncTrade(_) => EventType::AsyncTrade, + EventInternal::BackgroundNotification(_) => EventType::BackgroundNotification, } } } @@ -100,5 +113,5 @@ pub enum EventType { PaymentClaimed, ServiceHealthUpdate, ChannelStatusUpdate, - AsyncTrade, + BackgroundNotification, } diff --git a/mobile/native/src/ln_dlc/mod.rs b/mobile/native/src/ln_dlc/mod.rs index 6d38e3925..6970152c6 100644 --- a/mobile/native/src/ln_dlc/mod.rs +++ b/mobile/native/src/ln_dlc/mod.rs @@ -24,6 +24,7 @@ use bdk::bitcoin::Txid; use bdk::bitcoin::XOnlyPublicKey; use bdk::BlockTime; use bdk::FeeRate; +use bitcoin::hashes::hex::ToHex; use bitcoin::Amount; use coordinator_commons::LspConfig; use coordinator_commons::TradeParams; @@ -35,7 +36,9 @@ use lightning_invoice::Invoice; use ln_dlc_node::channel::JIT_FEE_INVOICE_DESCRIPTION_PREFIX; use ln_dlc_node::config::app_config; use ln_dlc_node::node::rust_dlc_manager::subchannel::LNChannelManager; +use ln_dlc_node::node::rust_dlc_manager::subchannel::SubChannelState; use ln_dlc_node::node::rust_dlc_manager::ChannelId; +use ln_dlc_node::node::rust_dlc_manager::Storage as DlcStorage; use ln_dlc_node::node::LnDlcNodeSettings; use ln_dlc_node::node::NodeInfo; use ln_dlc_node::scorer; @@ -693,3 +696,56 @@ pub async fn trade(trade_params: TradeParams) -> Result<(), (FailureReason, Erro Ok(()) } + +/// initiates the rollover protocol with the coordinator +pub async fn rollover() -> Result<()> { + let node = NODE.get(); + + let dlc_channels = node + .inner + .sub_channel_manager + .get_dlc_manager() + .get_store() + .get_sub_channels()?; + + let dlc_channel = dlc_channels + .into_iter() + .find(|chan| { + chan.counter_party == config::get_coordinator_info().pubkey + && matches!(chan.state, SubChannelState::Signed(_)) + }) + .context("Couldn't find dlc channel to rollover")?; + + let dlc_channel_id = dlc_channel + .get_dlc_channel_id(0) + .context("Couldn't get dlc channel id")?; + + let client = reqwest_client(); + let response = client + .post(format!( + "http://{}/api/rollover/{}", + config::get_http_endpoint(), + dlc_channel_id.to_hex() + )) + .send() + .await + .with_context(|| format!("Failed to rollover dlc with id {}", dlc_channel_id.to_hex()))?; + + if !response.status().is_success() { + let response_text = match response.text().await { + Ok(text) => text, + Err(err) => { + format!("could not decode response {err:#}") + } + }; + + bail!( + "Failed to rollover dlc with id {}. Error: {response_text}", + dlc_channel_id.to_hex() + ) + } + + tracing::info!("Sent rollover request to coordinator successfully"); + + Ok(()) +} diff --git a/mobile/native/src/ln_dlc/node.rs b/mobile/native/src/ln_dlc/node.rs index 5e0787a02..8ef2fe31a 100644 --- a/mobile/native/src/ln_dlc/node.rs +++ b/mobile/native/src/ln_dlc/node.rs @@ -1,4 +1,8 @@ use crate::db; +use crate::event; +use crate::event::BackgroundTask; +use crate::event::EventInternal; +use crate::event::TaskStatus; use crate::trade::order; use crate::trade::position; use crate::trade::position::PositionState; @@ -225,6 +229,10 @@ impl Node { // After handling the `RenewRevoke` message, we need to do some post-processing // based on the fact that the DLC channel has been updated. position::handler::set_position_state(PositionState::Open)?; + + event::publish(&EventInternal::BackgroundNotification( + BackgroundTask::Rollover(TaskStatus::Success), + )); } // ignoring all other channel events. _ => (), @@ -248,10 +256,27 @@ impl Node { .get_dlc_channel_id(0) .context("Could not fetch dlc channel id")?; - let accept_collateral = + let (accept_collateral, expiry_timestamp) = match self.inner.get_contract_by_dlc_channel_id(dlc_channel_id)? { Contract::Confirmed(contract) => { - contract.accepted_contract.accept_params.collateral + let offered_contract = contract.accepted_contract.offered_contract; + let contract_info = offered_contract + .contract_info + .first() + .context("contract info to exist on a signed contract")?; + let oracle_announcement = contract_info + .oracle_announcements + .first() + .context("oracle announcement to exist on signed contract")?; + + let expiry_timestamp = OffsetDateTime::from_unix_timestamp( + oracle_announcement.oracle_event.event_maturity_epoch as i64, + )?; + + ( + contract.accepted_contract.accept_params.collateral, + expiry_timestamp, + ) } _ => bail!( "Confirmed contract not found for channel ID: {}", @@ -262,8 +287,12 @@ impl Node { let filled_order = order::handler::order_filled() .context("Cannot mark order as filled for confirmed DLC")?; - position::handler::update_position_after_dlc_creation(filled_order, accept_collateral) - .context("Failed to update position after DLC creation")?; + position::handler::update_position_after_dlc_creation( + filled_order, + accept_collateral, + expiry_timestamp, + ) + .context("Failed to update position after DLC creation")?; if let Err(e) = self.pay_order_matching_fee(&channel_id) { tracing::error!("{e:#}"); diff --git a/mobile/native/src/orderbook.rs b/mobile/native/src/orderbook.rs index 359224039..91bee3de0 100644 --- a/mobile/native/src/orderbook.rs +++ b/mobile/native/src/orderbook.rs @@ -1,6 +1,8 @@ use crate::config; use crate::event; +use crate::event::BackgroundTask; use crate::event::EventInternal; +use crate::event::TaskStatus; use crate::health::ServiceStatus; use crate::trade::position; use anyhow::Result; @@ -8,8 +10,8 @@ use bdk::bitcoin::secp256k1::SecretKey; use bdk::bitcoin::secp256k1::SECP256K1; use futures::TryStreamExt; use orderbook_commons::best_current_price; +use orderbook_commons::Message; use orderbook_commons::Order; -use orderbook_commons::OrderbookMsg; use orderbook_commons::Prices; use orderbook_commons::Signature; use parking_lot::Mutex; @@ -92,7 +94,7 @@ pub fn subscribe( Ok(Some(msg)) => { tracing::debug!(%msg, "New message from orderbook"); - let msg = match serde_json::from_str::(&msg) { + let msg = match serde_json::from_str::(&msg) { Ok(msg) => msg, Err(e) => { tracing::error!( @@ -103,22 +105,32 @@ pub fn subscribe( }; match msg { - OrderbookMsg::AsyncMatch { order, filled_with } => { - tracing::info!(order_id = %order.id, "Received an async match from orderbook. Reason: {:?}", order.order_reason); - event::publish(&EventInternal::AsyncTrade(order.clone().order_reason.into())); + Message::Rollover => { + tracing::info!("Received a rollover request from orderbook."); + event::publish(&EventInternal::BackgroundNotification(BackgroundTask::Rollover(TaskStatus::Pending))); + + if let Err(e) = position::handler::rollover().await { + tracing::error!("Failed to rollover dlc. Error: {e:#}"); + event::publish(&EventInternal::BackgroundNotification(BackgroundTask::Rollover(TaskStatus::Failed))); + } + }, + Message::AsyncMatch { order, filled_with } => { + let order_reason = order.clone().order_reason.into(); + tracing::info!(order_id = %order.id, "Received an async match from orderbook. Reason: {order_reason:?}"); + event::publish(&EventInternal::BackgroundNotification(BackgroundTask::AsyncTrade(order_reason))); if let Err(e) = position::handler::async_trade(order.clone(), filled_with).await { tracing::error!(order_id = %order.id, "Failed to process async trade. Error: {e:#}"); } }, - OrderbookMsg::Match(filled) => { + Message::Match(filled) => { tracing::info!(order_id = %filled.order_id, "Received match from orderbook"); if let Err(e) = position::handler::trade(filled.clone()).await { tracing::error!(order_id = %filled.order_id, "Trade request sent to coordinator failed. Error: {e:#}"); } }, - OrderbookMsg::AllOrders(initial_orders) => { + Message::AllOrders(initial_orders) => { let mut orders = orders.lock(); if !orders.is_empty() { tracing::debug!("Received new set of initial orders from orderbook, replacing the previously stored orders"); @@ -129,12 +141,12 @@ pub fn subscribe( *orders = initial_orders; update_prices_if_needed(&mut cached_best_price, &orders); }, - OrderbookMsg::NewOrder(order) => { + Message::NewOrder(order) => { let mut orders = orders.lock(); orders.push(order); update_prices_if_needed(&mut cached_best_price, &orders); } - OrderbookMsg::DeleteOrder(order_id) => { + Message::DeleteOrder(order_id) => { let mut orders = orders.lock(); let found = remove_order(&mut orders, order_id); if !found { @@ -142,7 +154,7 @@ pub fn subscribe( } update_prices_if_needed(&mut cached_best_price, &orders); }, - OrderbookMsg::Update(updated_order) => { + Message::Update(updated_order) => { let mut orders = orders.lock(); let found = remove_order(&mut orders, updated_order.id); if !found { diff --git a/mobile/native/src/trade/position/handler.rs b/mobile/native/src/trade/position/handler.rs index c73c5265a..f15f388b3 100644 --- a/mobile/native/src/trade/position/handler.rs +++ b/mobile/native/src/trade/position/handler.rs @@ -17,7 +17,6 @@ use coordinator_commons::TradeParams; use orderbook_commons::FilledWith; use orderbook_commons::Prices; use rust_decimal::prelude::ToPrimitive; -use time::Duration; use time::OffsetDateTime; use trade::ContractSymbol; @@ -103,6 +102,11 @@ pub async fn async_trade(order: orderbook_commons::Order, filled_with: FilledWit Ok(()) } +/// Rollover dlc to new expiry timestamp +pub async fn rollover() -> Result<()> { + ln_dlc::rollover().await +} + /// Fetch the positions from the database pub fn get_positions() -> Result> { db::get_positions() @@ -162,7 +166,11 @@ pub fn rollover_position(expiry_timestamp: OffsetDateTime) -> Result<()> { } /// Create a position after the creation of a DLC channel. -pub fn update_position_after_dlc_creation(filled_order: Order, collateral: u64) -> Result<()> { +pub fn update_position_after_dlc_creation( + filled_order: Order, + collateral: u64, + expiry: OffsetDateTime, +) -> Result<()> { ensure!( db::get_positions()?.is_empty(), "Cannot create a position if one is already open" @@ -172,9 +180,6 @@ pub fn update_position_after_dlc_creation(filled_order: Order, collateral: u64) let average_entry_price = filled_order.execution_price().unwrap_or(0.0); - let tomorrow = OffsetDateTime::now_utc().date() + Duration::days(7); - let expiry = tomorrow.midnight().assume_utc(); - let have_a_position = Position { leverage: filled_order.leverage, quantity: filled_order.quantity, diff --git a/mobile/test/trade_test.dart b/mobile/test/trade_test.dart index 9cb2a49f0..21966a830 100644 --- a/mobile/test/trade_test.dart +++ b/mobile/test/trade_test.dart @@ -2,7 +2,17 @@ import 'package:candlesticks/candlesticks.dart'; import 'package:flutter/material.dart'; import 'package:flutter_test/flutter_test.dart'; import 'package:get_10101/common/amount_denomination_change_notifier.dart'; +@GenerateNiceMocks([MockSpec()]) +import 'package:get_10101/common/application/channel_info_service.dart'; import 'package:get_10101/common/domain/model.dart'; +@GenerateNiceMocks([MockSpec()]) +import 'package:get_10101/features/trade/application/candlestick_service.dart'; +@GenerateNiceMocks([MockSpec()]) +import 'package:get_10101/features/trade/application/order_service.dart'; +@GenerateNiceMocks([MockSpec()]) +import 'package:get_10101/features/trade/application/position_service.dart'; +@GenerateNiceMocks([MockSpec()]) +import 'package:get_10101/features/trade/application/trade_values_service.dart'; import 'package:get_10101/features/trade/candlestick_change_notifier.dart'; import 'package:get_10101/features/trade/domain/price.dart'; import 'package:get_10101/features/trade/order_change_notifier.dart'; @@ -11,6 +21,8 @@ import 'package:get_10101/features/trade/submit_order_change_notifier.dart'; import 'package:get_10101/features/trade/trade_screen.dart'; import 'package:get_10101/features/trade/trade_theme.dart'; import 'package:get_10101/features/trade/trade_value_change_notifier.dart'; +@GenerateNiceMocks([MockSpec()]) +import 'package:get_10101/features/wallet/application/wallet_service.dart'; import 'package:get_10101/features/wallet/domain/wallet_balances.dart'; import 'package:get_10101/features/wallet/domain/wallet_info.dart'; import 'package:get_10101/features/wallet/wallet_change_notifier.dart'; @@ -20,24 +32,6 @@ import 'package:mockito/annotations.dart'; import 'package:mockito/mockito.dart'; import 'package:provider/provider.dart'; -@GenerateNiceMocks([MockSpec()]) -import 'package:get_10101/features/trade/application/trade_values_service.dart'; - -@GenerateNiceMocks([MockSpec()]) -import 'package:get_10101/common/application/channel_info_service.dart'; - -@GenerateNiceMocks([MockSpec()]) -import 'package:get_10101/features/trade/application/order_service.dart'; - -@GenerateNiceMocks([MockSpec()]) -import 'package:get_10101/features/trade/application/position_service.dart'; - -@GenerateNiceMocks([MockSpec()]) -import 'package:get_10101/features/wallet/application/wallet_service.dart'; - -@GenerateNiceMocks([MockSpec()]) -import 'package:get_10101/features/trade/application/candlestick_service.dart'; - import 'trade_test.mocks.dart'; final GoRouter _router = GoRouter( @@ -98,6 +92,7 @@ void main() { when(tradeValueService.calculateQuantity( price: anyNamed('price'), leverage: anyNamed('leverage'), margin: anyNamed('margin'))) .thenReturn(0.1); + when(tradeValueService.getExpiryTimestamp()).thenReturn(DateTime.now()); // assuming this is an initial funding, no channel exists yet when(channelConstraintsService.getChannelInfo()).thenAnswer((_) async {