From d4b14cadb781be098bede81a05b87b98dc704b45 Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Mon, 11 Sep 2023 09:47:12 +0200 Subject: [PATCH 1/6] chore: Set position expiry only in one place --- coordinator/src/orderbook/trading.rs | 4 +-- coordinator/src/rollover.rs | 6 +--- crates/orderbook-commons/src/lib.rs | 36 +++++++++++++++++-- crates/tests-e2e/tests/rollover_position.rs | 4 +-- .../stable/stable_confirmation_sheet.dart | 4 +-- .../application/trade_values_service.dart | 8 +++-- .../features/trade/domain/trade_values.dart | 8 +++++ .../trade/submit_order_change_notifier.dart | 1 + .../trade_bottom_sheet_confirmation.dart | 3 +- mobile/native/src/api.rs | 5 +++ mobile/native/src/ln_dlc/node.rs | 29 ++++++++++++--- mobile/native/src/trade/position/handler.rs | 10 +++--- mobile/test/trade_test.dart | 31 +++++++--------- 13 files changed, 102 insertions(+), 47 deletions(-) diff --git a/coordinator/src/orderbook/trading.rs b/coordinator/src/orderbook/trading.rs index 471fa99f1..2fa7f7c94 100644 --- a/coordinator/src/orderbook/trading.rs +++ b/coordinator/src/orderbook/trading.rs @@ -25,7 +25,6 @@ use std::cmp::Ordering; use std::collections::HashMap; use std::str::FromStr; use thiserror::Error; -use time::Duration; use time::OffsetDateTime; use tokio::sync::broadcast; use tokio::sync::mpsc; @@ -345,8 +344,7 @@ fn match_order( return Ok(None); } - let tomorrow = OffsetDateTime::now_utc().date() + Duration::days(7); - let expiry_timestamp = tomorrow.midnight().assume_utc(); + let expiry_timestamp = orderbook_commons::get_expiry_timestamp(OffsetDateTime::now_utc()); // For now we hardcode the oracle pubkey here let oracle_pk = XOnlyPublicKey::from_str( diff --git a/coordinator/src/rollover.rs b/coordinator/src/rollover.rs index 770caa72c..4129cf2e0 100644 --- a/coordinator/src/rollover.rs +++ b/coordinator/src/rollover.rs @@ -13,7 +13,6 @@ use dlc_manager::contract::Contract; use dlc_manager::contract::ContractDescriptor; use dlc_manager::ChannelId; use std::str::FromStr; -use time::Duration; use time::OffsetDateTime; use trade::ContractSymbol; @@ -81,11 +80,8 @@ impl Rollover { } /// Calculates the maturity time based on the current expiry timestamp. - /// - /// todo(holzeis): this should come from a configuration https://github.com/get10101/10101/issues/1029 pub fn maturity_time(&self) -> OffsetDateTime { - let tomorrow = self.expiry_timestamp.date() + Duration::days(7); - tomorrow.midnight().assume_utc() + orderbook_commons::get_expiry_timestamp(self.expiry_timestamp) } } diff --git a/crates/orderbook-commons/src/lib.rs b/crates/orderbook-commons/src/lib.rs index 38ff9e369..b80cd54ea 100644 --- a/crates/orderbook-commons/src/lib.rs +++ b/crates/orderbook-commons/src/lib.rs @@ -332,8 +332,7 @@ pub fn get_filled_with_from_matches(matches: Vec) -> Result ) .expect("To be a valid pubkey"); - let tomorrow = OffsetDateTime::now_utc().date() + Duration::days(7); - let expiry_timestamp = tomorrow.midnight().assume_utc(); + let expiry_timestamp = get_expiry_timestamp(OffsetDateTime::now_utc()); Ok(FilledWith { order_id, @@ -352,8 +351,15 @@ pub fn get_filled_with_from_matches(matches: Vec) -> Result }) } +/// todo(holzeis): this should come from a configuration https://github.com/get10101/10101/issues/1029 +pub fn get_expiry_timestamp(from: OffsetDateTime) -> OffsetDateTime { + let tomorrow = from.date() + Duration::days(7); + tomorrow.midnight().assume_utc() +} + #[cfg(test)] mod test { + use crate::get_expiry_timestamp; use crate::FilledWith; use crate::Match; use crate::Signature; @@ -370,6 +376,32 @@ mod test { .unwrap() } + #[test] + fn test_expiry_timestamp() { + // Thu Aug 17 2023 19:13:13 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1692299593).unwrap(); + let expiry = get_expiry_timestamp(from); + + // Thu Aug 24 2023 00:00:00 GMT+0000 + assert_eq!(1692835200, expiry.unix_timestamp()); + } + + #[test] + fn test_next_expiry_timestamp() { + // Thu Aug 24 2023 00:00:00 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1692835199).unwrap(); + let expiry = get_expiry_timestamp(from); + + // Thu Aug 30 2023 00:00:00 GMT+0000 + assert_eq!(1693353600, expiry.unix_timestamp()); + + let from = OffsetDateTime::from_unix_timestamp(1692835200).unwrap(); + let expiry = get_expiry_timestamp(from); + + // Thu Aug 31 2023 00:00:00 GMT+0000 + assert_eq!(1693440000, expiry.unix_timestamp()); + } + #[test] fn test_serialize_signature() { let secret_key = SecretKey::from_slice(&[ diff --git a/crates/tests-e2e/tests/rollover_position.rs b/crates/tests-e2e/tests/rollover_position.rs index 5e5bf658d..ee2d216eb 100644 --- a/crates/tests-e2e/tests/rollover_position.rs +++ b/crates/tests-e2e/tests/rollover_position.rs @@ -4,7 +4,6 @@ use position::PositionState; use tests_e2e::app::AppHandle; use tests_e2e::setup; use tests_e2e::wait_until; -use time::Duration; use time::OffsetDateTime; #[tokio::test] @@ -23,8 +22,7 @@ async fn can_rollover_position() { .unwrap(); let position = test.app.rx.position().expect("position to exist"); - let tomorrow = position.expiry.date() + Duration::days(7); - let new_expiry = tomorrow.midnight().assume_utc(); + let new_expiry = orderbook_commons::get_expiry_timestamp(position.expiry); coordinator .rollover(&dlc_channel.dlc_channel_id.unwrap()) diff --git a/mobile/lib/features/stable/stable_confirmation_sheet.dart b/mobile/lib/features/stable/stable_confirmation_sheet.dart index a66be0e90..435ee08af 100644 --- a/mobile/lib/features/stable/stable_confirmation_sheet.dart +++ b/mobile/lib/features/stable/stable_confirmation_sheet.dart @@ -130,8 +130,6 @@ class _StableBottomSheet extends State { }); } - final now = DateTime.now(); - return Form( key: _formKey, child: Column( @@ -233,7 +231,7 @@ class _StableBottomSheet extends State { const SizedBox(height: 16.0), ValueDataRow( type: ValueType.date, - value: DateTime.utc(now.year, now.month, now.day + 2).toLocal(), + value: tradeValues.expiry.toLocal(), label: "Expiry", valueTextStyle: const TextStyle(fontSize: 18), labelTextStyle: const TextStyle(fontSize: 18)), diff --git a/mobile/lib/features/trade/application/trade_values_service.dart b/mobile/lib/features/trade/application/trade_values_service.dart index 063db318a..b0897d425 100644 --- a/mobile/lib/features/trade/application/trade_values_service.dart +++ b/mobile/lib/features/trade/application/trade_values_service.dart @@ -1,7 +1,7 @@ -import 'package:get_10101/ffi.dart' as rust; -import 'package:get_10101/features/trade/domain/leverage.dart'; import 'package:get_10101/common/domain/model.dart'; import 'package:get_10101/features/trade/domain/direction.dart'; +import 'package:get_10101/features/trade/domain/leverage.dart'; +import 'package:get_10101/ffi.dart' as rust; class TradeValuesService { Amount? calculateMargin( @@ -39,4 +39,8 @@ class TradeValuesService { price: price, leverage: leverage.leverage, direction: direction.toApi()); } } + + DateTime getExpiryTimestamp() { + return DateTime.fromMillisecondsSinceEpoch(rust.api.getExpiryTimestamp() * 1000); + } } diff --git a/mobile/lib/features/trade/domain/trade_values.dart b/mobile/lib/features/trade/domain/trade_values.dart index 009bda73e..5031fa132 100644 --- a/mobile/lib/features/trade/domain/trade_values.dart +++ b/mobile/lib/features/trade/domain/trade_values.dart @@ -16,6 +16,7 @@ class TradeValues { Amount? fee; // This fee is an estimate of the order-matching fee. double fundingRate; + DateTime expiry; // no final so it can be mocked in tests TradeValuesService tradeValuesService; @@ -29,6 +30,7 @@ class TradeValues { required this.liquidationPrice, required this.fee, required this.fundingRate, + required this.expiry, required this.tradeValuesService}); factory TradeValues.fromQuantity( @@ -47,6 +49,8 @@ class TradeValues { Amount? fee = orderMatchingFee(quantity, price); + DateTime expiry = tradeValuesService.getExpiryTimestamp(); + return TradeValues( direction: direction, margin: margin, @@ -56,6 +60,7 @@ class TradeValues { fundingRate: fundingRate, liquidationPrice: liquidationPrice, fee: fee, + expiry: expiry, tradeValuesService: tradeValuesService); } @@ -75,6 +80,8 @@ class TradeValues { Amount? fee = orderMatchingFee(quantity, price); + DateTime expiry = tradeValuesService.getExpiryTimestamp(); + return TradeValues( direction: direction, margin: margin, @@ -84,6 +91,7 @@ class TradeValues { fundingRate: fundingRate, liquidationPrice: liquidationPrice, fee: fee, + expiry: expiry, tradeValuesService: tradeValuesService); } diff --git a/mobile/lib/features/trade/submit_order_change_notifier.dart b/mobile/lib/features/trade/submit_order_change_notifier.dart index fa138abcc..24af68c24 100644 --- a/mobile/lib/features/trade/submit_order_change_notifier.dart +++ b/mobile/lib/features/trade/submit_order_change_notifier.dart @@ -98,6 +98,7 @@ class SubmitOrderChangeNotifier extends ChangeNotifier implements Subscriber { liquidationPrice: position.liquidationPrice, fee: fee, fundingRate: 0, + expiry: position.expiry, tradeValuesService: TradeValuesService()), PositionAction.close); } diff --git a/mobile/lib/features/trade/trade_bottom_sheet_confirmation.dart b/mobile/lib/features/trade/trade_bottom_sheet_confirmation.dart index 3f6a4b433..9e2261f4c 100644 --- a/mobile/lib/features/trade/trade_bottom_sheet_confirmation.dart +++ b/mobile/lib/features/trade/trade_bottom_sheet_confirmation.dart @@ -101,7 +101,6 @@ class TradeBottomSheetConfirmation extends StatelessWidget { pnl = position!.unrealizedPnl != null ? position.unrealizedPnl! : Amount(0); } - DateTime now = DateTime.now().toUtc(); TextStyle dataRowStyle = const TextStyle(fontSize: 14); return Container( @@ -122,7 +121,7 @@ class TradeBottomSheetConfirmation extends StatelessWidget { if (!close) ValueDataRow( type: ValueType.date, - value: DateTime.utc(now.year, now.month, now.day + 7).toLocal(), + value: tradeValues.expiry.toLocal(), label: 'Expiry'), close ? ValueDataRow( diff --git a/mobile/native/src/api.rs b/mobile/native/src/api.rs index 759084e24..6bfa679dc 100644 --- a/mobile/native/src/api.rs +++ b/mobile/native/src/api.rs @@ -34,6 +34,7 @@ use std::ops::Add; use std::str::FromStr; use std::time::Duration; use std::time::SystemTime; +use time::OffsetDateTime; pub use trade::ContractSymbol; pub use trade::Direction; @@ -394,3 +395,7 @@ pub fn get_channel_open_fee_estimate_sat() -> Result { Ok(estimate.ceil() as u64) } + +pub fn get_expiry_timestamp() -> SyncReturn { + SyncReturn(orderbook_commons::get_expiry_timestamp(OffsetDateTime::now_utc()).unix_timestamp()) +} diff --git a/mobile/native/src/ln_dlc/node.rs b/mobile/native/src/ln_dlc/node.rs index 5e0787a02..e11b8a25c 100644 --- a/mobile/native/src/ln_dlc/node.rs +++ b/mobile/native/src/ln_dlc/node.rs @@ -248,10 +248,27 @@ impl Node { .get_dlc_channel_id(0) .context("Could not fetch dlc channel id")?; - let accept_collateral = + let (accept_collateral, expiry_timestamp) = match self.inner.get_contract_by_dlc_channel_id(dlc_channel_id)? { Contract::Confirmed(contract) => { - contract.accepted_contract.accept_params.collateral + let offered_contract = contract.accepted_contract.offered_contract; + let contract_info = offered_contract + .contract_info + .first() + .context("contract info to exist on a signed contract")?; + let oracle_announcement = contract_info + .oracle_announcements + .first() + .context("oracle announcement to exist on signed contract")?; + + let expiry_timestamp = OffsetDateTime::from_unix_timestamp( + oracle_announcement.oracle_event.event_maturity_epoch as i64, + )?; + + ( + contract.accepted_contract.accept_params.collateral, + expiry_timestamp, + ) } _ => bail!( "Confirmed contract not found for channel ID: {}", @@ -262,8 +279,12 @@ impl Node { let filled_order = order::handler::order_filled() .context("Cannot mark order as filled for confirmed DLC")?; - position::handler::update_position_after_dlc_creation(filled_order, accept_collateral) - .context("Failed to update position after DLC creation")?; + position::handler::update_position_after_dlc_creation( + filled_order, + accept_collateral, + expiry_timestamp, + ) + .context("Failed to update position after DLC creation")?; if let Err(e) = self.pay_order_matching_fee(&channel_id) { tracing::error!("{e:#}"); diff --git a/mobile/native/src/trade/position/handler.rs b/mobile/native/src/trade/position/handler.rs index c73c5265a..fce135380 100644 --- a/mobile/native/src/trade/position/handler.rs +++ b/mobile/native/src/trade/position/handler.rs @@ -17,7 +17,6 @@ use coordinator_commons::TradeParams; use orderbook_commons::FilledWith; use orderbook_commons::Prices; use rust_decimal::prelude::ToPrimitive; -use time::Duration; use time::OffsetDateTime; use trade::ContractSymbol; @@ -162,7 +161,11 @@ pub fn rollover_position(expiry_timestamp: OffsetDateTime) -> Result<()> { } /// Create a position after the creation of a DLC channel. -pub fn update_position_after_dlc_creation(filled_order: Order, collateral: u64) -> Result<()> { +pub fn update_position_after_dlc_creation( + filled_order: Order, + collateral: u64, + expiry: OffsetDateTime, +) -> Result<()> { ensure!( db::get_positions()?.is_empty(), "Cannot create a position if one is already open" @@ -172,9 +175,6 @@ pub fn update_position_after_dlc_creation(filled_order: Order, collateral: u64) let average_entry_price = filled_order.execution_price().unwrap_or(0.0); - let tomorrow = OffsetDateTime::now_utc().date() + Duration::days(7); - let expiry = tomorrow.midnight().assume_utc(); - let have_a_position = Position { leverage: filled_order.leverage, quantity: filled_order.quantity, diff --git a/mobile/test/trade_test.dart b/mobile/test/trade_test.dart index 9cb2a49f0..21966a830 100644 --- a/mobile/test/trade_test.dart +++ b/mobile/test/trade_test.dart @@ -2,7 +2,17 @@ import 'package:candlesticks/candlesticks.dart'; import 'package:flutter/material.dart'; import 'package:flutter_test/flutter_test.dart'; import 'package:get_10101/common/amount_denomination_change_notifier.dart'; +@GenerateNiceMocks([MockSpec()]) +import 'package:get_10101/common/application/channel_info_service.dart'; import 'package:get_10101/common/domain/model.dart'; +@GenerateNiceMocks([MockSpec()]) +import 'package:get_10101/features/trade/application/candlestick_service.dart'; +@GenerateNiceMocks([MockSpec()]) +import 'package:get_10101/features/trade/application/order_service.dart'; +@GenerateNiceMocks([MockSpec()]) +import 'package:get_10101/features/trade/application/position_service.dart'; +@GenerateNiceMocks([MockSpec()]) +import 'package:get_10101/features/trade/application/trade_values_service.dart'; import 'package:get_10101/features/trade/candlestick_change_notifier.dart'; import 'package:get_10101/features/trade/domain/price.dart'; import 'package:get_10101/features/trade/order_change_notifier.dart'; @@ -11,6 +21,8 @@ import 'package:get_10101/features/trade/submit_order_change_notifier.dart'; import 'package:get_10101/features/trade/trade_screen.dart'; import 'package:get_10101/features/trade/trade_theme.dart'; import 'package:get_10101/features/trade/trade_value_change_notifier.dart'; +@GenerateNiceMocks([MockSpec()]) +import 'package:get_10101/features/wallet/application/wallet_service.dart'; import 'package:get_10101/features/wallet/domain/wallet_balances.dart'; import 'package:get_10101/features/wallet/domain/wallet_info.dart'; import 'package:get_10101/features/wallet/wallet_change_notifier.dart'; @@ -20,24 +32,6 @@ import 'package:mockito/annotations.dart'; import 'package:mockito/mockito.dart'; import 'package:provider/provider.dart'; -@GenerateNiceMocks([MockSpec()]) -import 'package:get_10101/features/trade/application/trade_values_service.dart'; - -@GenerateNiceMocks([MockSpec()]) -import 'package:get_10101/common/application/channel_info_service.dart'; - -@GenerateNiceMocks([MockSpec()]) -import 'package:get_10101/features/trade/application/order_service.dart'; - -@GenerateNiceMocks([MockSpec()]) -import 'package:get_10101/features/trade/application/position_service.dart'; - -@GenerateNiceMocks([MockSpec()]) -import 'package:get_10101/features/wallet/application/wallet_service.dart'; - -@GenerateNiceMocks([MockSpec()]) -import 'package:get_10101/features/trade/application/candlestick_service.dart'; - import 'trade_test.mocks.dart'; final GoRouter _router = GoRouter( @@ -98,6 +92,7 @@ void main() { when(tradeValueService.calculateQuantity( price: anyNamed('price'), leverage: anyNamed('leverage'), margin: anyNamed('margin'))) .thenReturn(0.1); + when(tradeValueService.getExpiryTimestamp()).thenReturn(DateTime.now()); // assuming this is an initial funding, no channel exists yet when(channelConstraintsService.getChannelInfo()).thenAnswer((_) async { From cdd708ed6fbd40cbe1c754dd80f8856fd9bee57a Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Mon, 11 Sep 2023 13:39:54 +0200 Subject: [PATCH 2/6] feat: Calculate expiry timestamp to Sunday 3pm --- CHANGELOG.md | 1 + coordinator/src/rollover.rs | 4 +- crates/orderbook-commons/src/lib.rs | 136 ++++++++++++++++++++++++---- 3 files changed, 122 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7cc1d29dd..e7326c238 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Show loading screen when app starts with an expired position - Fix: Prevent crashing the app when there's no Internet connection - feat: Allow exporting the seed phrase even if the Node is offline +- Changed expiry to next Sunday 3 pm UTC ## [1.2.6] - 2023-09-06 diff --git a/coordinator/src/rollover.rs b/coordinator/src/rollover.rs index 4129cf2e0..a95e6322b 100644 --- a/coordinator/src/rollover.rs +++ b/coordinator/src/rollover.rs @@ -205,8 +205,8 @@ pub mod tests { let event_id = rollover.event_id(); // expect expiry in seven days at midnight. - // Thu Aug 24 2023 00:00:00 GMT+0000 - assert_eq!(event_id, format!("btcusd1692835200")) + // Sun Aug 20 2023 15:00:00 GMT+0000 + assert_eq!(event_id, format!("btcusd1692543600")) } #[test] diff --git a/crates/orderbook-commons/src/lib.rs b/crates/orderbook-commons/src/lib.rs index b80cd54ea..ca05411aa 100644 --- a/crates/orderbook-commons/src/lib.rs +++ b/crates/orderbook-commons/src/lib.rs @@ -15,8 +15,10 @@ use sha2::digest::FixedOutput; use sha2::Digest; use sha2::Sha256; use std::str::FromStr; +use time::macros::time; use time::Duration; use time::OffsetDateTime; +use time::Weekday; use trade::ContractSymbol; use trade::Direction; use uuid::Uuid; @@ -351,15 +353,38 @@ pub fn get_filled_with_from_matches(matches: Vec) -> Result }) } -/// todo(holzeis): this should come from a configuration https://github.com/get10101/10101/issues/1029 -pub fn get_expiry_timestamp(from: OffsetDateTime) -> OffsetDateTime { - let tomorrow = from.date() + Duration::days(7); - tomorrow.midnight().assume_utc() +/// Calculates the expiry timestamp at the next Sunday at 3 pm UTC from a given offset date time. +/// If the argument falls in between Friday, 3 pm UTC and Sunday, 3pm UTC, the expiry will be +/// calculated to next weeks Sunday at 3 pm +pub fn get_expiry_timestamp(time: OffsetDateTime) -> OffsetDateTime { + let days = if is_in_rollover_weekend(time) || time.weekday() == Weekday::Sunday { + // if the provided time is in the rollover weekend or on a sunday, we expire the sunday the + // week after. + 7 - time.weekday().number_from_monday() + 7 + } else { + 7 - time.weekday().number_from_monday() + }; + let time = time.date().with_hms(15, 0, 0).expect("to fit into time"); + + (time + Duration::days(days as i64)).assume_utc() +} + +/// Checks whether the provided expiry date is eligible for a rollover +/// +/// Returns true if the given date falls in between friday 15 pm UTC and sunday 15 pm UTC +pub fn is_in_rollover_weekend(timestamp: OffsetDateTime) -> bool { + match timestamp.weekday() { + Weekday::Friday => timestamp.time() >= time!(15:00), + Weekday::Saturday => true, + Weekday::Sunday => timestamp.time() < time!(15:00), + _ => false, + } } #[cfg(test)] mod test { use crate::get_expiry_timestamp; + use crate::is_in_rollover_weekend; use crate::FilledWith; use crate::Match; use crate::Signature; @@ -377,29 +402,106 @@ mod test { } #[test] - fn test_expiry_timestamp() { - // Thu Aug 17 2023 19:13:13 GMT+0000 - let from = OffsetDateTime::from_unix_timestamp(1692299593).unwrap(); + fn test_is_not_in_rollover_weekend() { + // Wed Aug 09 2023 09:30:23 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691573423).unwrap(); + assert!(!is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_is_just_in_rollover_weekend_friday() { + // Fri Aug 11 2023 15:00:00 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691766000).unwrap(); + assert!(is_in_rollover_weekend(expiry)); + + // Fri Aug 11 2023 15:00:01 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691766001).unwrap(); + assert!(is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_is_in_rollover_weekend_saturday() { + // Sat Aug 12 2023 16:00:00 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691856000).unwrap(); + assert!(is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_is_just_in_rollover_weekend_sunday() { + // Sun Aug 13 2023 14:59:59 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691938799).unwrap(); + assert!(is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_is_just_not_in_rollover_weekend_sunday() { + // Sun Aug 13 2023 15:00:00 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691938800).unwrap(); + assert!(!is_in_rollover_weekend(expiry)); + + // Sun Aug 13 2023 15:00:01 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691938801).unwrap(); + assert!(!is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_expiry_timestamp_before_friday_15pm() { + // Wed Aug 09 2023 09:30:23 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691573423).unwrap(); + let expiry = get_expiry_timestamp(from); + + // Sun Aug 13 2023 15:00:00 GMT+0000 + assert_eq!(1691938800, expiry.unix_timestamp()); + } + + #[test] + fn test_expiry_timestamp_just_before_friday_15pm() { + // Fri Aug 11 2023 14:59:59 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691765999).unwrap(); + let expiry = get_expiry_timestamp(from); + + // Sun Aug 13 2023 15:00:00 GMT+0000 + assert_eq!(1691938800, expiry.unix_timestamp()); + } + + #[test] + fn test_expiry_timestamp_just_after_friday_15pm() { + // Fri Aug 11 2023 15:00:01 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691766001).unwrap(); let expiry = get_expiry_timestamp(from); - // Thu Aug 24 2023 00:00:00 GMT+0000 - assert_eq!(1692835200, expiry.unix_timestamp()); + // Sun Aug 20 2023 15:00:00 GMT+0000 + assert_eq!(1692543600, expiry.unix_timestamp()); } #[test] - fn test_next_expiry_timestamp() { - // Thu Aug 24 2023 00:00:00 GMT+0000 - let from = OffsetDateTime::from_unix_timestamp(1692835199).unwrap(); + fn test_expiry_timestamp_at_friday_15pm() { + // Fri Aug 11 2023 15:00:00 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691766000).unwrap(); let expiry = get_expiry_timestamp(from); - // Thu Aug 30 2023 00:00:00 GMT+0000 - assert_eq!(1693353600, expiry.unix_timestamp()); + // Sun Aug 20 2023 15:00:00 GMT+0000 + assert_eq!(1692543600, expiry.unix_timestamp()); + } - let from = OffsetDateTime::from_unix_timestamp(1692835200).unwrap(); + #[test] + fn test_expiry_timestamp_after_sunday_15pm() { + // Sun Aug 06 2023 16:00:00 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691337600).unwrap(); + let expiry = get_expiry_timestamp(from); + + // Sun Aug 13 2023 15:00:00 GMT+0000 + assert_eq!(1691938800, expiry.unix_timestamp()); + } + + #[test] + fn test_expiry_timestamp_on_saturday() { + // // Sat Aug 12 2023 16:00:00 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691856000).unwrap(); let expiry = get_expiry_timestamp(from); - // Thu Aug 31 2023 00:00:00 GMT+0000 - assert_eq!(1693440000, expiry.unix_timestamp()); + // Sun Aug 20 2023 15:00:00 GMT+0000 + assert_eq!(1692543600, expiry.unix_timestamp()); } #[test] From 48e857a04fd5ef6dd927b7ba8acccec09820592a Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Tue, 12 Sep 2023 13:51:04 +0200 Subject: [PATCH 3/6] feat: Automatically rollover if user opens app during rollover weekend. --- CHANGELOG.md | 1 + coordinator/src/orderbook/trading.rs | 30 +++++++++- crates/orderbook-commons/src/lib.rs | 1 + crates/tests-e2e/src/test_subscriber.rs | 2 +- mobile/lib/common/domain/background_task.dart | 51 +++++++++++++++++ .../trade/async_order_change_notifier.dart | 10 ++-- .../trade/rollover_change_notifier.dart | 54 ++++++++++++++++++ mobile/lib/main.dart | 10 +++- mobile/native/src/event/api.rs | 43 +++++++++++++- mobile/native/src/event/mod.rs | 21 +++++-- mobile/native/src/ln_dlc/mod.rs | 56 +++++++++++++++++++ mobile/native/src/ln_dlc/node.rs | 8 +++ mobile/native/src/orderbook.rs | 16 +++++- mobile/native/src/trade/position/handler.rs | 5 ++ 14 files changed, 291 insertions(+), 17 deletions(-) create mode 100644 mobile/lib/common/domain/background_task.dart create mode 100644 mobile/lib/features/trade/rollover_change_notifier.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index e7326c238..73cba68c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fix: Prevent crashing the app when there's no Internet connection - feat: Allow exporting the seed phrase even if the Node is offline - Changed expiry to next Sunday 3 pm UTC +- Automatically rollover if user opens app during rollover weekend ## [1.2.6] - 2023-09-06 diff --git a/coordinator/src/orderbook/trading.rs b/coordinator/src/orderbook/trading.rs index 2fa7f7c94..5a573a35c 100644 --- a/coordinator/src/orderbook/trading.rs +++ b/coordinator/src/orderbook/trading.rs @@ -1,3 +1,4 @@ +use crate::db::positions; use crate::orderbook::db::matches; use crate::orderbook::db::orders; use anyhow::anyhow; @@ -129,7 +130,7 @@ pub fn start( let authenticated_users = authenticated_users.clone(); async move { tracing::debug!(trader_id=%new_user_msg.new_user, "Checking if the user needs to be notified about pending matches"); - if let Err(e) = process_pending_match(&mut conn, &authenticated_users, new_user_msg.new_user).await { + if let Err(e) = process_pending_actions(&mut conn, &authenticated_users, new_user_msg.new_user).await { tracing::error!("Failed to process pending match. Error: {e:#}"); } } @@ -273,8 +274,10 @@ async fn process_new_order( Ok(order) } -/// Notifies the trader if a pending match is waiting for them. -async fn process_pending_match( +/// Checks if there are any immediate actions to be processed by the app +/// - Pending Match +/// - Rollover +async fn process_pending_actions( conn: &mut PgConnection, authenticated_users: &HashMap>, trader_id: PublicKey, @@ -293,6 +296,27 @@ async fn process_pending_match( if let Err(e) = notify_trader(trader_id, message, authenticated_users).await { tracing::warn!("Failed to notify trader. Error: {e:#}"); } + } else if let Some(position) = + positions::Position::get_open_position_by_trader(conn, trader_id.to_string())? + { + tracing::debug!(%trader_id, position_id=position.id, "Checking if the users positions is eligible for rollover"); + + if orderbook_commons::is_in_rollover_weekend(position.expiry_timestamp) { + let next_expiry = orderbook_commons::get_expiry_timestamp(OffsetDateTime::now_utc()); + if position.expiry_timestamp == next_expiry { + tracing::trace!(%trader_id, position_id=position.id, "Position has already been rolled over"); + return Ok(()); + } + + tracing::debug!(%trader_id, position_id=position.id, "Proposing to rollover users position"); + + let message = OrderbookMsg::Rollover; + if let Err(e) = notify_trader(trader_id, message, authenticated_users).await { + // if that happens, it's most likely that the user already closed its app again + // and we can simply wait for the user to come online again to retry. + tracing::debug!("Failed to notify trader. Error: {e:#}"); + } + } } Ok(()) diff --git a/crates/orderbook-commons/src/lib.rs b/crates/orderbook-commons/src/lib.rs index ca05411aa..9e1ef7cd5 100644 --- a/crates/orderbook-commons/src/lib.rs +++ b/crates/orderbook-commons/src/lib.rs @@ -130,6 +130,7 @@ pub enum OrderbookMsg { order: Order, filled_with: FilledWith, }, + Rollover, } /// A match for an order diff --git a/crates/tests-e2e/src/test_subscriber.rs b/crates/tests-e2e/src/test_subscriber.rs index b3c4c8118..f814a55b6 100644 --- a/crates/tests-e2e/src/test_subscriber.rs +++ b/crates/tests-e2e/src/test_subscriber.rs @@ -199,7 +199,7 @@ impl Senders { native::event::EventInternal::PaymentClaimed(_amount_msats) => { unreachable!("PaymentClaimed event should not be sent to the subscriber"); } - native::event::EventInternal::AsyncTrade(_order_id) => { + native::event::EventInternal::BackgroundNotification(_task) => { // ignored } } diff --git a/mobile/lib/common/domain/background_task.dart b/mobile/lib/common/domain/background_task.dart new file mode 100644 index 000000000..e6c9afe90 --- /dev/null +++ b/mobile/lib/common/domain/background_task.dart @@ -0,0 +1,51 @@ +import 'package:get_10101/bridge_generated/bridge_definitions.dart' as bridge; +import 'package:get_10101/features/trade/domain/order.dart'; + +class AsyncTrade { + final OrderReason orderReason; + + AsyncTrade({required this.orderReason}); + + static AsyncTrade fromApi(bridge.BackgroundTask_AsyncTrade asyncTrade) { + return AsyncTrade(orderReason: OrderReason.fromApi(asyncTrade.field0)); + } + + static bridge.BackgroundTask apiDummy() { + return bridge.BackgroundTask_AsyncTrade(OrderReason.apiDummy()); + } +} + +enum TaskStatus { + pending, + failed, + success; + + static TaskStatus fromApi(bridge.TaskStatus taskStatus) { + switch (taskStatus) { + case bridge.TaskStatus.Pending: + return TaskStatus.pending; + case bridge.TaskStatus.Failed: + return TaskStatus.failed; + case bridge.TaskStatus.Success: + return TaskStatus.success; + } + } + + static bridge.TaskStatus apiDummy() { + return bridge.TaskStatus.Pending; + } +} + +class Rollover { + final TaskStatus taskStatus; + + Rollover({required this.taskStatus}); + + static Rollover fromApi(bridge.BackgroundTask_Rollover rollover) { + return Rollover(taskStatus: TaskStatus.fromApi(rollover.field0)); + } + + static bridge.BackgroundTask apiDummy() { + return bridge.BackgroundTask_Rollover(TaskStatus.apiDummy()); + } +} diff --git a/mobile/lib/features/trade/async_order_change_notifier.dart b/mobile/lib/features/trade/async_order_change_notifier.dart index 347508a64..5cff390f4 100644 --- a/mobile/lib/features/trade/async_order_change_notifier.dart +++ b/mobile/lib/features/trade/async_order_change_notifier.dart @@ -2,6 +2,7 @@ import 'package:f_logs/model/flog/flog.dart'; import 'package:flutter/material.dart'; import 'package:get_10101/bridge_generated/bridge_definitions.dart' as bridge; import 'package:get_10101/common/application/event_service.dart'; +import 'package:get_10101/common/domain/background_task.dart'; import 'package:get_10101/common/global_keys.dart'; import 'package:get_10101/features/trade/application/order_service.dart'; import 'package:get_10101/features/trade/domain/order.dart'; @@ -26,9 +27,10 @@ class AsyncOrderChangeNotifier extends ChangeNotifier implements Subscriber { @override void notify(bridge.Event event) { - if (event is bridge.Event_AsyncTrade) { - OrderReason reason = OrderReason.fromApi(event.field0); - FLog.debug(text: "Received a async trade event. Reason: $reason"); + if (event is bridge.Event_BackgroundNotification && + event.field0 is bridge.BackgroundTask_AsyncTrade) { + AsyncTrade asyncTrade = AsyncTrade.fromApi(event.field0 as bridge.BackgroundTask_AsyncTrade); + FLog.debug(text: "Received a async trade event. Reason: ${asyncTrade.orderReason}"); showDialog( context: shellNavigatorKey.currentContext!, builder: (context) { @@ -47,7 +49,7 @@ class AsyncOrderChangeNotifier extends ChangeNotifier implements Subscriber { } late Widget content; - switch (reason) { + switch (asyncTrade.orderReason) { case OrderReason.expired: content = const Text("Your position has been closed due to expiry."); case OrderReason.manual: diff --git a/mobile/lib/features/trade/rollover_change_notifier.dart b/mobile/lib/features/trade/rollover_change_notifier.dart new file mode 100644 index 000000000..03cc748a2 --- /dev/null +++ b/mobile/lib/features/trade/rollover_change_notifier.dart @@ -0,0 +1,54 @@ +import 'package:f_logs/model/flog/flog.dart'; +import 'package:flutter/material.dart'; +import 'package:get_10101/bridge_generated/bridge_definitions.dart' as bridge; +import 'package:get_10101/common/application/event_service.dart'; +import 'package:get_10101/common/domain/background_task.dart'; +import 'package:get_10101/common/global_keys.dart'; +import 'package:get_10101/features/trade/order_submission_status_dialog.dart'; +import 'package:provider/provider.dart'; + +class RolloverChangeNotifier extends ChangeNotifier implements Subscriber { + late TaskStatus taskStatus; + + @override + void notify(bridge.Event event) { + if (event is bridge.Event_BackgroundNotification && + event.field0 is bridge.BackgroundTask_Rollover) { + Rollover rollover = Rollover.fromApi(event.field0 as bridge.BackgroundTask_Rollover); + FLog.debug(text: "Received a rollover event. Status: ${rollover.taskStatus}"); + + taskStatus = rollover.taskStatus; + + if (taskStatus == TaskStatus.pending) { + // initialize dialog for the pending task + showDialog( + context: shellNavigatorKey.currentContext!, + builder: (context) { + TaskStatus status = context.watch().taskStatus; + + // todo(holzeis): Reusing the order submission status dialog is not nice, but it's actually suitable for any task execution that has pending, + // failed and success states. We may should consider renaming this dialog for its more generic purpose. + OrderSubmissionStatusDialogType type = OrderSubmissionStatusDialogType.pendingSubmit; + switch (status) { + case TaskStatus.pending: + type = OrderSubmissionStatusDialogType.successfulSubmit; + case TaskStatus.failed: + type = OrderSubmissionStatusDialogType.failedFill; + case TaskStatus.success: + type = OrderSubmissionStatusDialogType.filled; + } + + late Widget content = const Text("Rolling over your position"); + + return OrderSubmissionStatusDialog(title: "Catching up!", type: type, content: content); + }, + ); + } else { + // notify dialog about changed task status + notifyListeners(); + } + } else { + FLog.warning(text: "Received unexpected event: ${event.toString()}"); + } + } +} diff --git a/mobile/lib/main.dart b/mobile/lib/main.dart index 3607719b3..670bda3fe 100644 --- a/mobile/lib/main.dart +++ b/mobile/lib/main.dart @@ -12,6 +12,7 @@ import 'package:get_10101/common/application/channel_info_service.dart'; import 'package:get_10101/common/application/event_service.dart'; import 'package:get_10101/common/channel_status_notifier.dart'; import 'package:get_10101/common/color.dart'; +import 'package:get_10101/common/domain/background_task.dart'; import 'package:get_10101/common/domain/service_status.dart'; import 'package:get_10101/common/global_keys.dart'; import 'package:get_10101/common/service_status_notifier.dart'; @@ -27,6 +28,7 @@ import 'package:get_10101/features/trade/domain/position.dart'; import 'package:get_10101/features/trade/domain/price.dart'; import 'package:get_10101/features/trade/order_change_notifier.dart'; import 'package:get_10101/features/trade/position_change_notifier.dart'; +import 'package:get_10101/features/trade/rollover_change_notifier.dart'; import 'package:get_10101/features/trade/submit_order_change_notifier.dart'; import 'package:get_10101/features/trade/trade_screen.dart'; import 'package:get_10101/features/trade/trade_theme.dart'; @@ -87,6 +89,7 @@ void main() async { ChangeNotifierProvider(create: (context) => ServiceStatusNotifier()), ChangeNotifierProvider(create: (context) => ChannelStatusNotifier()), ChangeNotifierProvider(create: (context) => AsyncOrderChangeNotifier(OrderService())), + ChangeNotifierProvider(create: (context) => RolloverChangeNotifier()), Provider(create: (context) => Environment.parse()), Provider(create: (context) => channelInfoService) ], child: const TenTenOneApp())); @@ -477,6 +480,7 @@ void subscribeToNotifiers(BuildContext context) { final channelStatusNotifier = context.read(); final stableValuesChangeNotifier = context.read(); final asyncOrderChangeNotifier = context.read(); + final rolloverChangeNotifier = context.read(); eventService.subscribe( orderChangeNotifier, bridge.Event.orderUpdateNotification(Order.apiDummy())); @@ -509,7 +513,11 @@ void subscribeToNotifiers(BuildContext context) { eventService.subscribe( asyncOrderChangeNotifier, bridge.Event.orderUpdateNotification(Order.apiDummy())); - eventService.subscribe(asyncOrderChangeNotifier, bridge.Event.asyncTrade(OrderReason.apiDummy())); + eventService.subscribe( + asyncOrderChangeNotifier, bridge.Event.backgroundNotification(AsyncTrade.apiDummy())); + + eventService.subscribe( + rolloverChangeNotifier, bridge.Event.backgroundNotification(Rollover.apiDummy())); channelStatusNotifier.subscribe(eventService); diff --git a/mobile/native/src/event/api.rs b/mobile/native/src/event/api.rs index 61f7330e3..6a4cf9a8e 100644 --- a/mobile/native/src/event/api.rs +++ b/mobile/native/src/event/api.rs @@ -1,4 +1,5 @@ use crate::api::WalletInfo; +use crate::event; use crate::event::subscriber::Subscriber; use crate::event::EventInternal; use crate::event::EventType; @@ -25,7 +26,14 @@ pub enum Event { PriceUpdateNotification(BestPrice), ServiceHealthUpdate(ServiceUpdate), ChannelStatusUpdate(ChannelStatus), + BackgroundNotification(BackgroundTask), +} + +#[frb] +#[derive(Clone)] +pub enum BackgroundTask { AsyncTrade(OrderReason), + Rollover(TaskStatus), } impl From for Event { @@ -64,7 +72,9 @@ impl From for Event { EventInternal::PaymentClaimed(_) => { unreachable!("This internal event is not exposed to the UI") } - EventInternal::AsyncTrade(reason) => Event::AsyncTrade(reason.into()), + EventInternal::BackgroundNotification(task) => { + Event::BackgroundNotification(task.into()) + } } } } @@ -100,7 +110,7 @@ impl Subscriber for FlutterSubscriber { EventType::PriceUpdateNotification, EventType::ServiceHealthUpdate, EventType::ChannelStatusUpdate, - EventType::AsyncTrade, + EventType::BackgroundNotification, ] } } @@ -111,6 +121,35 @@ impl FlutterSubscriber { } } +impl From for BackgroundTask { + fn from(value: event::BackgroundTask) -> Self { + match value { + event::BackgroundTask::AsyncTrade(order_reason) => { + BackgroundTask::AsyncTrade(order_reason.into()) + } + event::BackgroundTask::Rollover(status) => BackgroundTask::Rollover(status.into()), + } + } +} + +#[frb] +#[derive(Clone)] +pub enum TaskStatus { + Pending, + Failed, + Success, +} + +impl From for TaskStatus { + fn from(value: event::TaskStatus) -> Self { + match value { + event::TaskStatus::Pending => TaskStatus::Pending, + event::TaskStatus::Failed => TaskStatus::Failed, + event::TaskStatus::Success => TaskStatus::Success, + } + } +} + /// The best bid and ask price for a contract. /// /// Best prices come from an orderbook. Contrary to the `Price` struct, we can have no price diff --git a/mobile/native/src/event/mod.rs b/mobile/native/src/event/mod.rs index 1bf2ec3c0..1d30f9349 100644 --- a/mobile/native/src/event/mod.rs +++ b/mobile/native/src/event/mod.rs @@ -30,7 +30,6 @@ pub fn publish(event: &EventInternal) { pub enum EventInternal { Init(String), Log(String), - AsyncTrade(OrderReason), OrderUpdateNotification(Order), WalletInfoUpdateNotification(WalletInfo), OrderFilledWith(Box), @@ -41,6 +40,20 @@ pub enum EventInternal { PaymentClaimed(u64), ServiceHealthUpdate(ServiceUpdate), ChannelStatusUpdate(ChannelStatus), + BackgroundNotification(BackgroundTask), +} + +#[derive(Clone, Debug)] +pub enum BackgroundTask { + AsyncTrade(OrderReason), + Rollover(TaskStatus), +} + +#[derive(Clone, Debug)] +pub enum TaskStatus { + Pending, + Failed, + Success, } impl fmt::Display for EventInternal { @@ -58,7 +71,7 @@ impl fmt::Display for EventInternal { EventInternal::PaymentClaimed(_) => "PaymentClaimed", EventInternal::ServiceHealthUpdate(_) => "ServiceHealthUpdate", EventInternal::ChannelStatusUpdate(_) => "ChannelStatusUpdate", - EventInternal::AsyncTrade(_) => "AsyncTrade", + EventInternal::BackgroundNotification(_) => "BackgroundNotification", } .fmt(f) } @@ -81,7 +94,7 @@ impl From for EventType { EventInternal::PaymentClaimed(_) => EventType::PaymentClaimed, EventInternal::ServiceHealthUpdate(_) => EventType::ServiceHealthUpdate, EventInternal::ChannelStatusUpdate(_) => EventType::ChannelStatusUpdate, - EventInternal::AsyncTrade(_) => EventType::AsyncTrade, + EventInternal::BackgroundNotification(_) => EventType::BackgroundNotification, } } } @@ -100,5 +113,5 @@ pub enum EventType { PaymentClaimed, ServiceHealthUpdate, ChannelStatusUpdate, - AsyncTrade, + BackgroundNotification, } diff --git a/mobile/native/src/ln_dlc/mod.rs b/mobile/native/src/ln_dlc/mod.rs index 6d38e3925..6970152c6 100644 --- a/mobile/native/src/ln_dlc/mod.rs +++ b/mobile/native/src/ln_dlc/mod.rs @@ -24,6 +24,7 @@ use bdk::bitcoin::Txid; use bdk::bitcoin::XOnlyPublicKey; use bdk::BlockTime; use bdk::FeeRate; +use bitcoin::hashes::hex::ToHex; use bitcoin::Amount; use coordinator_commons::LspConfig; use coordinator_commons::TradeParams; @@ -35,7 +36,9 @@ use lightning_invoice::Invoice; use ln_dlc_node::channel::JIT_FEE_INVOICE_DESCRIPTION_PREFIX; use ln_dlc_node::config::app_config; use ln_dlc_node::node::rust_dlc_manager::subchannel::LNChannelManager; +use ln_dlc_node::node::rust_dlc_manager::subchannel::SubChannelState; use ln_dlc_node::node::rust_dlc_manager::ChannelId; +use ln_dlc_node::node::rust_dlc_manager::Storage as DlcStorage; use ln_dlc_node::node::LnDlcNodeSettings; use ln_dlc_node::node::NodeInfo; use ln_dlc_node::scorer; @@ -693,3 +696,56 @@ pub async fn trade(trade_params: TradeParams) -> Result<(), (FailureReason, Erro Ok(()) } + +/// initiates the rollover protocol with the coordinator +pub async fn rollover() -> Result<()> { + let node = NODE.get(); + + let dlc_channels = node + .inner + .sub_channel_manager + .get_dlc_manager() + .get_store() + .get_sub_channels()?; + + let dlc_channel = dlc_channels + .into_iter() + .find(|chan| { + chan.counter_party == config::get_coordinator_info().pubkey + && matches!(chan.state, SubChannelState::Signed(_)) + }) + .context("Couldn't find dlc channel to rollover")?; + + let dlc_channel_id = dlc_channel + .get_dlc_channel_id(0) + .context("Couldn't get dlc channel id")?; + + let client = reqwest_client(); + let response = client + .post(format!( + "http://{}/api/rollover/{}", + config::get_http_endpoint(), + dlc_channel_id.to_hex() + )) + .send() + .await + .with_context(|| format!("Failed to rollover dlc with id {}", dlc_channel_id.to_hex()))?; + + if !response.status().is_success() { + let response_text = match response.text().await { + Ok(text) => text, + Err(err) => { + format!("could not decode response {err:#}") + } + }; + + bail!( + "Failed to rollover dlc with id {}. Error: {response_text}", + dlc_channel_id.to_hex() + ) + } + + tracing::info!("Sent rollover request to coordinator successfully"); + + Ok(()) +} diff --git a/mobile/native/src/ln_dlc/node.rs b/mobile/native/src/ln_dlc/node.rs index e11b8a25c..8ef2fe31a 100644 --- a/mobile/native/src/ln_dlc/node.rs +++ b/mobile/native/src/ln_dlc/node.rs @@ -1,4 +1,8 @@ use crate::db; +use crate::event; +use crate::event::BackgroundTask; +use crate::event::EventInternal; +use crate::event::TaskStatus; use crate::trade::order; use crate::trade::position; use crate::trade::position::PositionState; @@ -225,6 +229,10 @@ impl Node { // After handling the `RenewRevoke` message, we need to do some post-processing // based on the fact that the DLC channel has been updated. position::handler::set_position_state(PositionState::Open)?; + + event::publish(&EventInternal::BackgroundNotification( + BackgroundTask::Rollover(TaskStatus::Success), + )); } // ignoring all other channel events. _ => (), diff --git a/mobile/native/src/orderbook.rs b/mobile/native/src/orderbook.rs index 359224039..4637ad3c9 100644 --- a/mobile/native/src/orderbook.rs +++ b/mobile/native/src/orderbook.rs @@ -1,6 +1,8 @@ use crate::config; use crate::event; +use crate::event::BackgroundTask; use crate::event::EventInternal; +use crate::event::TaskStatus; use crate::health::ServiceStatus; use crate::trade::position; use anyhow::Result; @@ -103,9 +105,19 @@ pub fn subscribe( }; match msg { + OrderbookMsg::Rollover => { + tracing::info!("Received a rollover request from orderbook."); + event::publish(&EventInternal::BackgroundNotification(BackgroundTask::Rollover(TaskStatus::Pending))); + + if let Err(e) = position::handler::rollover().await { + tracing::error!("Failed to rollover dlc. Error: {e:#}"); + event::publish(&EventInternal::BackgroundNotification(BackgroundTask::Rollover(TaskStatus::Failed))); + } + }, OrderbookMsg::AsyncMatch { order, filled_with } => { - tracing::info!(order_id = %order.id, "Received an async match from orderbook. Reason: {:?}", order.order_reason); - event::publish(&EventInternal::AsyncTrade(order.clone().order_reason.into())); + let order_reason = order.clone().order_reason.into(); + tracing::info!(order_id = %order.id, "Received an async match from orderbook. Reason: {order_reason:?}"); + event::publish(&EventInternal::BackgroundNotification(BackgroundTask::AsyncTrade(order_reason))); if let Err(e) = position::handler::async_trade(order.clone(), filled_with).await { tracing::error!(order_id = %order.id, "Failed to process async trade. Error: {e:#}"); diff --git a/mobile/native/src/trade/position/handler.rs b/mobile/native/src/trade/position/handler.rs index fce135380..f15f388b3 100644 --- a/mobile/native/src/trade/position/handler.rs +++ b/mobile/native/src/trade/position/handler.rs @@ -102,6 +102,11 @@ pub async fn async_trade(order: orderbook_commons::Order, filled_with: FilledWit Ok(()) } +/// Rollover dlc to new expiry timestamp +pub async fn rollover() -> Result<()> { + ln_dlc::rollover().await +} + /// Fetch the positions from the database pub fn get_positions() -> Result> { db::get_positions() From 63932dc15304ad4d630f08c4e02c5b6c6e5b0f2a Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Wed, 13 Sep 2023 10:03:10 +0200 Subject: [PATCH 4/6] fix: Only update open or rollover positions Without that fix we were updating all positions of the user! --- coordinator/src/db/positions.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/coordinator/src/db/positions.rs b/coordinator/src/db/positions.rs index 93914b561..7b4fadcdc 100644 --- a/coordinator/src/db/positions.rs +++ b/coordinator/src/db/positions.rs @@ -134,6 +134,7 @@ impl Position { ) -> Result<()> { let affected_rows = diesel::update(positions::table) .filter(positions::trader_pubkey.eq(trader_pubkey)) + .filter(positions::position_state.eq(PositionState::Rollover)) .set(( positions::position_state.eq(PositionState::Open), positions::temporary_contract_id.eq(temporary_contract_id.to_hex()), @@ -169,6 +170,7 @@ impl Position { ) -> Result<()> { let affected_rows = diesel::update(positions::table) .filter(positions::trader_pubkey.eq(trader_pubkey)) + .filter(positions::position_state.eq(PositionState::Open)) .set(( positions::expiry_timestamp.eq(expiry_timestamp), positions::position_state.eq(PositionState::Rollover), From f3fa8db5d7b31cc2466867e8845ca63ca8596f66 Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Sun, 17 Sep 2023 12:03:46 +0200 Subject: [PATCH 5/6] chore: Separate components into separate modules Before everything was handled by the trading module. Now the following responsibilities have been split up. - Rollover: Moved to the coordinator component and responsible for proposing a rollover. - Async Match: Moved into a dedicated component to check if an async match needs to be executed by the app. - Notification: Responsible for the users and sending messages to them. Note 1, the websocket communication is still in one component and not separated between coordinator and orderbook. It would have been correct to split that up as well, but the effort and additional complexity was IMHO not worth it. Note 2, we have multiple commons crates, which are very much related to each ohter. I think we should combine all of those into a single one, to simplify things. --- coordinator/src/bin/coordinator.rs | 15 +- coordinator/src/lib.rs | 3 +- coordinator/src/node.rs | 1 + coordinator/src/node/expired_positions.rs | 7 +- coordinator/src/{ => node}/rollover.rs | 74 +++++- coordinator/src/notification/mod.rs | 84 +++++++ coordinator/src/orderbook/async_match.rs | 110 +++++++++ coordinator/src/orderbook/mod.rs | 1 + coordinator/src/orderbook/routes.rs | 13 +- coordinator/src/orderbook/trading.rs | 259 +++----------------- coordinator/src/orderbook/websocket.rs | 31 ++- coordinator/src/routes.rs | 16 +- crates/coordinator-commons/src/lib.rs | 142 +++++++++++ crates/orderbook-commons/src/lib.rs | 183 +------------- crates/tests-e2e/tests/rollover_position.rs | 2 +- mobile/native/src/api.rs | 4 +- mobile/native/src/orderbook.rs | 18 +- 17 files changed, 517 insertions(+), 446 deletions(-) rename coordinator/src/{ => node}/rollover.rs (83%) create mode 100644 coordinator/src/notification/mod.rs create mode 100644 coordinator/src/orderbook/async_match.rs diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index 68cce2c7f..a2e290ad5 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -8,10 +8,14 @@ use coordinator::node; use coordinator::node::closed_positions; use coordinator::node::connection; use coordinator::node::expired_positions; +use coordinator::node::rollover; use coordinator::node::storage::NodeStorage; use coordinator::node::unrealized_pnl; use coordinator::node::Node; +use coordinator::notification; +use coordinator::notification::NewUserMessage; use coordinator::notification_service::NotificationService; +use coordinator::orderbook::async_match; use coordinator::orderbook::trading; use coordinator::routes::router; use coordinator::run_migration; @@ -192,8 +196,16 @@ async fn main() -> Result<()> { } }); + let (tx_user_feed, _rx) = broadcast::channel::(100); let (tx_price_feed, _rx) = broadcast::channel(100); - let (_handle, trading_sender) = trading::start(pool.clone(), tx_price_feed.clone()); + + let (_handle, notifier) = notification::start(tx_user_feed.clone()); + + let (_handle, trading_sender) = + trading::start(pool.clone(), tx_price_feed.clone(), notifier.clone()); + + let _handle = async_match::monitor(pool.clone(), tx_user_feed.clone(), notifier.clone()); + let _handle = rollover::monitor(pool.clone(), tx_user_feed.clone(), notifier); tokio::spawn({ let node = node.clone(); @@ -235,6 +247,7 @@ async fn main() -> Result<()> { NODE_ALIAS, trading_sender, tx_price_feed, + tx_user_feed, ); let notification_service = NotificationService::new(opts.fcm_api_key); diff --git a/coordinator/src/lib.rs b/coordinator/src/lib.rs index 7e652db59..1fad8a83d 100644 --- a/coordinator/src/lib.rs +++ b/coordinator/src/lib.rs @@ -8,14 +8,13 @@ use diesel_migrations::EmbeddedMigrations; use diesel_migrations::MigrationHarness; use serde_json::json; -mod rollover; - pub mod admin; pub mod cli; pub mod db; pub mod logger; pub mod metrics; pub mod node; +pub mod notification; pub mod notification_service; pub mod orderbook; pub mod position; diff --git a/coordinator/src/node.rs b/coordinator/src/node.rs index bff81487d..6fd82c905 100644 --- a/coordinator/src/node.rs +++ b/coordinator/src/node.rs @@ -62,6 +62,7 @@ pub mod closed_positions; pub mod connection; pub mod expired_positions; pub mod order_matching_fee; +pub mod rollover; pub mod routing_fees; pub mod storage; pub mod unrealized_pnl; diff --git a/coordinator/src/node/expired_positions.rs b/coordinator/src/node/expired_positions.rs index 56762eb0d..e669e1fbc 100644 --- a/coordinator/src/node/expired_positions.rs +++ b/coordinator/src/node/expired_positions.rs @@ -2,7 +2,6 @@ use crate::db; use crate::node::Node; use crate::orderbook; use crate::orderbook::trading::NewOrderMessage; -use crate::orderbook::trading::TradingMessage; use crate::position::models::Position; use crate::position::models::PositionState; use anyhow::Context; @@ -22,7 +21,7 @@ use time::Duration; use time::OffsetDateTime; use tokio::sync::mpsc; -pub async fn close(node: Node, trading_sender: mpsc::Sender) -> Result<()> { +pub async fn close(node: Node, trading_sender: mpsc::Sender) -> Result<()> { let mut conn = node.pool.get()?; let positions = db::positions::Position::get_all_open_positions(&mut conn) @@ -93,11 +92,11 @@ pub async fn close(node: Node, trading_sender: mpsc::Sender) -> }; let (sender, mut receiver) = mpsc::channel::>(1); - let message = TradingMessage::NewOrder(NewOrderMessage { + let message = NewOrderMessage { new_order: new_order.clone(), order_reason: OrderReason::Expired, sender, - }); + }; if let Err(e) = trading_sender.send(message).await { tracing::error!(order_id=%new_order.id, trader_id=%new_order.trader_id, "Failed to submit new order for closing expired position. Error: {e:#}"); diff --git a/coordinator/src/rollover.rs b/coordinator/src/node/rollover.rs similarity index 83% rename from coordinator/src/rollover.rs rename to coordinator/src/node/rollover.rs index a95e6322b..42d1b2415 100644 --- a/coordinator/src/rollover.rs +++ b/coordinator/src/node/rollover.rs @@ -1,19 +1,30 @@ use crate::db; +use crate::db::positions; use crate::node::Node; +use crate::notification::NewUserMessage; +use crate::notification::Notification; use anyhow::bail; use anyhow::Context; use anyhow::Result; use bitcoin::hashes::hex::ToHex; use bitcoin::secp256k1::PublicKey; use bitcoin::XOnlyPublicKey; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; +use diesel::PgConnection; use dlc_manager::contract::contract_input::ContractInput; use dlc_manager::contract::contract_input::ContractInputInfo; use dlc_manager::contract::contract_input::OracleInput; use dlc_manager::contract::Contract; use dlc_manager::contract::ContractDescriptor; use dlc_manager::ChannelId; +use futures::future::RemoteHandle; +use futures::FutureExt; +use orderbook_commons::Message; use std::str::FromStr; use time::OffsetDateTime; +use tokio::sync::broadcast; +use tokio::sync::mpsc; use trade::ContractSymbol; #[derive(Debug, Clone)] @@ -28,6 +39,67 @@ struct Rollover { contract_tx_fee_rate: u64, } +pub fn monitor( + pool: Pool>, + tx_user_feed: broadcast::Sender, + notifier: mpsc::Sender, +) -> RemoteHandle> { + let mut user_feed = tx_user_feed.subscribe(); + let (fut, remote_handle) = async move { + while let Ok(new_user_msg) = user_feed.recv().await { + tokio::spawn({ + let mut conn = pool.get()?; + let notifier = notifier.clone(); + async move { + if let Err(e) = + check_if_eligible_for_rollover(&mut conn, notifier, new_user_msg.new_user) + .await + { + tracing::error!("Failed to check if eligible for rollover. Error: {e:#}"); + } + } + }); + } + Ok(()) + } + .remote_handle(); + + tokio::spawn(fut); + + remote_handle +} + +async fn check_if_eligible_for_rollover( + conn: &mut PgConnection, + notifier: mpsc::Sender, + trader_id: PublicKey, +) -> Result<()> { + tracing::debug!(%trader_id, "Checking if the users positions is eligible for rollover"); + if let Some(position) = + positions::Position::get_open_position_by_trader(conn, trader_id.to_string())? + { + if coordinator_commons::is_in_rollover_weekend(position.expiry_timestamp) { + let next_expiry = coordinator_commons::calculate_next_expiry(OffsetDateTime::now_utc()); + if position.expiry_timestamp == next_expiry { + tracing::trace!(%trader_id, position_id=position.id, "Position has already been rolled over"); + return Ok(()); + } + + tracing::debug!(%trader_id, position_id=position.id, "Proposing to rollover users position"); + + let message = Notification::Message { + trader_id, + message: Message::Rollover, + }; + if let Err(e) = notifier.send(message).await { + tracing::debug!("Failed to notify trader. Error: {e:#}"); + } + } + } + + Ok(()) +} + impl Rollover { pub fn new(contract: Contract) -> Result { let contract = match contract { @@ -81,7 +153,7 @@ impl Rollover { /// Calculates the maturity time based on the current expiry timestamp. pub fn maturity_time(&self) -> OffsetDateTime { - orderbook_commons::get_expiry_timestamp(self.expiry_timestamp) + coordinator_commons::calculate_next_expiry(self.expiry_timestamp) } } diff --git a/coordinator/src/notification/mod.rs b/coordinator/src/notification/mod.rs new file mode 100644 index 000000000..2e5092b4d --- /dev/null +++ b/coordinator/src/notification/mod.rs @@ -0,0 +1,84 @@ +use anyhow::Result; +use bitcoin::secp256k1::PublicKey; +use futures::future::RemoteHandle; +use futures::FutureExt; +use orderbook_commons::Message; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::RwLock; +use tokio::sync::broadcast; +use tokio::sync::mpsc; + +/// This value is arbitrarily set to 100 and defines the message accepted in the notification +/// channel buffer. +const NOTIFICATION_BUFFER_SIZE: usize = 100; + +// TODO(holzeis): This enum should be extended to allow for sending push notifications. +pub enum Notification { + Message { + trader_id: PublicKey, + message: Message, + }, +} + +#[derive(Clone)] +pub struct NewUserMessage { + pub new_user: PublicKey, + pub sender: mpsc::Sender, +} + +pub fn start( + tx_user_feed: broadcast::Sender, +) -> (RemoteHandle>, mpsc::Sender) { + let (sender, mut receiver) = mpsc::channel::(NOTIFICATION_BUFFER_SIZE); + + let authenticated_users = Arc::new(RwLock::new(HashMap::new())); + + tokio::task::spawn({ + let traders = authenticated_users.clone(); + async move { + let mut user_feed = tx_user_feed.subscribe(); + while let Ok(new_user_msg) = user_feed.recv().await { + traders + .write() + .expect("RwLock to not be poisoned") + .insert(new_user_msg.new_user, new_user_msg.sender); + } + } + }); + + let (fut, remote_handle) = { + async move { + while let Some(notification) = receiver.recv().await { + match notification { + Notification::Message { trader_id, message } => { + tracing::info!(%trader_id, "Sending message: {message:?}"); + + let trader = { + let traders = authenticated_users + .read() + .expect("RwLock to not be poisoned"); + traders.get(&trader_id).cloned() + }; + + match trader { + Some(sender) => { + if let Err(e) = sender.send(message).await { + tracing::warn!("Connection lost to trader {e:#}"); + } + } + None => tracing::warn!(%trader_id, "Trader is not connected"), + } + } + } + } + + Ok(()) + } + .remote_handle() + }; + + tokio::spawn(fut); + + (remote_handle, sender) +} diff --git a/coordinator/src/orderbook/async_match.rs b/coordinator/src/orderbook/async_match.rs new file mode 100644 index 000000000..9a04e1d54 --- /dev/null +++ b/coordinator/src/orderbook/async_match.rs @@ -0,0 +1,110 @@ +use crate::notification::NewUserMessage; +use crate::notification::Notification; +use crate::orderbook::db::matches; +use crate::orderbook::db::orders; +use anyhow::ensure; +use anyhow::Result; +use bitcoin::secp256k1::PublicKey; +use bitcoin::XOnlyPublicKey; +use diesel::r2d2::ConnectionManager; +use diesel::r2d2::Pool; +use diesel::PgConnection; +use futures::future::RemoteHandle; +use futures::FutureExt; +use orderbook_commons::FilledWith; +use orderbook_commons::Match; +use orderbook_commons::Matches; +use orderbook_commons::Message; +use orderbook_commons::OrderReason; +use orderbook_commons::OrderState; +use std::str::FromStr; +use time::OffsetDateTime; +use tokio::sync::broadcast; +use tokio::sync::mpsc; + +pub fn monitor( + pool: Pool>, + tx_user_feed: broadcast::Sender, + notifier: mpsc::Sender, +) -> RemoteHandle> { + let mut user_feed = tx_user_feed.subscribe(); + let (fut, remote_handle) = async move { + while let Ok(new_user_msg) = user_feed.recv().await { + tokio::spawn({ + let mut conn = pool.get()?; + let notifier = notifier.clone(); + async move { + tracing::debug!(trader_id=%new_user_msg.new_user, "Checking if the user needs to be notified about pending matches"); + if let Err(e) = process_pending_match(&mut conn, notifier, new_user_msg.new_user).await { + tracing::error!("Failed to process pending match. Error: {e:#}"); + } + } + }); + } + Ok(()) + }.remote_handle(); + + tokio::spawn(fut); + + remote_handle +} + +/// Checks if there are any pending matches +async fn process_pending_match( + conn: &mut PgConnection, + notifier: mpsc::Sender, + trader_id: PublicKey, +) -> Result<()> { + if let Some(order) = orders::get_by_trader_id_and_state(conn, trader_id, OrderState::Matched)? { + tracing::debug!(%trader_id, order_id=%order.id, "Notifying trader about pending match"); + + let matches = matches::get_matches_by_order_id(conn, order.id)?; + let filled_with = get_filled_with_from_matches(matches)?; + + let message = match order.order_reason { + OrderReason::Manual => Message::Match(filled_with), + OrderReason::Expired => Message::AsyncMatch { order, filled_with }, + }; + + let msg = Notification::Message { trader_id, message }; + if let Err(e) = notifier.send(msg).await { + tracing::error!("Failed to send notification. Error: {e:#}"); + } + } + + Ok(()) +} + +fn get_filled_with_from_matches(matches: Vec) -> Result { + ensure!( + !matches.is_empty(), + "Need at least one matches record to construct a FilledWith" + ); + + let order_id = matches + .first() + .expect("to have at least one match") + .order_id; + let oracle_pk = XOnlyPublicKey::from_str( + "16f88cf7d21e6c0f46bcbc983a4e3b19726c6c98858cc31c83551a88fde171c0", + ) + .expect("To be a valid pubkey"); + + let expiry_timestamp = coordinator_commons::calculate_next_expiry(OffsetDateTime::now_utc()); + + Ok(FilledWith { + order_id, + expiry_timestamp, + oracle_pk, + matches: matches + .iter() + .map(|m| Match { + id: m.id, + order_id: m.order_id, + quantity: m.quantity, + pubkey: m.match_trader_id, + execution_price: m.execution_price, + }) + .collect(), + }) +} diff --git a/coordinator/src/orderbook/mod.rs b/coordinator/src/orderbook/mod.rs index 7ab6837db..79026b3e6 100644 --- a/coordinator/src/orderbook/mod.rs +++ b/coordinator/src/orderbook/mod.rs @@ -1,3 +1,4 @@ +pub mod async_match; pub mod db; pub mod routes; pub mod trading; diff --git a/coordinator/src/orderbook/routes.rs b/coordinator/src/orderbook/routes.rs index 3e90247a6..83f636af5 100644 --- a/coordinator/src/orderbook/routes.rs +++ b/coordinator/src/orderbook/routes.rs @@ -1,7 +1,6 @@ use crate::orderbook; use crate::orderbook::trading::NewOrderMessage; use crate::orderbook::trading::TradingError; -use crate::orderbook::trading::TradingMessage; use crate::orderbook::websocket::websocket_connection; use crate::routes::AppState; use crate::AppError; @@ -16,10 +15,10 @@ use axum::Json; use diesel::r2d2::ConnectionManager; use diesel::r2d2::PooledConnection; use diesel::PgConnection; +use orderbook_commons::Message; use orderbook_commons::NewOrder; use orderbook_commons::Order; use orderbook_commons::OrderReason; -use orderbook_commons::OrderbookMsg; use serde::de; use serde::Deserialize; use serde::Deserializer; @@ -94,11 +93,11 @@ pub async fn post_order( ) -> Result, AppError> { let (sender, mut receiver) = mpsc::channel::>(1); - let message = TradingMessage::NewOrder(NewOrderMessage { + let message = NewOrderMessage { new_order, order_reason: OrderReason::Manual, sender, - }); + }; state.trading_sender.send(message).await.map_err(|e| { AppError::InternalServerError(format!("Failed to send new order message: {e:#}")) })?; @@ -118,7 +117,7 @@ pub async fn post_order( Ok(Json(order)) } -fn update_pricefeed(pricefeed_msg: OrderbookMsg, sender: Sender) { +fn update_pricefeed(pricefeed_msg: Message, sender: Sender) { match sender.send(pricefeed_msg) { Ok(_) => { tracing::trace!("Pricefeed updated") @@ -143,7 +142,7 @@ pub async fn put_order( let order = orderbook::db::orders::set_is_taken(&mut conn, order_id, updated_order.taken) .map_err(|e| AppError::InternalServerError(format!("Failed to update order: {e:#}")))?; let sender = state.tx_price_feed.clone(); - update_pricefeed(OrderbookMsg::Update(order.clone()), sender); + update_pricefeed(Message::Update(order.clone()), sender); Ok(Json(order)) } @@ -157,7 +156,7 @@ pub async fn delete_order( .map_err(|e| AppError::InternalServerError(format!("Failed to delete order: {e:#}")))?; if deleted > 0 { let sender = state.tx_price_feed.clone(); - update_pricefeed(OrderbookMsg::DeleteOrder(order_id), sender); + update_pricefeed(Message::DeleteOrder(order_id), sender); } Ok(Json(deleted)) diff --git a/coordinator/src/orderbook/trading.rs b/coordinator/src/orderbook/trading.rs index 5a573a35c..a54ce06a2 100644 --- a/coordinator/src/orderbook/trading.rs +++ b/coordinator/src/orderbook/trading.rs @@ -1,4 +1,4 @@ -use crate::db::positions; +use crate::notification::Notification; use crate::orderbook::db::matches; use crate::orderbook::db::orders; use anyhow::anyhow; @@ -15,15 +15,14 @@ use futures::future::RemoteHandle; use futures::FutureExt; use orderbook_commons::FilledWith; use orderbook_commons::Match; +use orderbook_commons::Message; use orderbook_commons::NewOrder; use orderbook_commons::Order; use orderbook_commons::OrderReason; use orderbook_commons::OrderState; use orderbook_commons::OrderType; -use orderbook_commons::OrderbookMsg; use rust_decimal::Decimal; use std::cmp::Ordering; -use std::collections::HashMap; use std::str::FromStr; use thiserror::Error; use time::OffsetDateTime; @@ -32,26 +31,15 @@ use tokio::sync::mpsc; use trade::Direction; use uuid::Uuid; -/// This value is arbitrarily set to 100 and defines the message accepted in the trading messages -/// channel buffer. -const TRADING_MESSAGES_BUFFER_SIZE: usize = 100; - -pub enum TradingMessage { - NewOrder(NewOrderMessage), - NewUser(NewUserMessage), -} - +/// This value is arbitrarily set to 100 and defines the number of new order messages buffered +/// in the channel. +const NEW_ORDERS_BUFFER_SIZE: usize = 100; pub struct NewOrderMessage { pub new_order: NewOrder, pub order_reason: OrderReason, pub sender: mpsc::Sender>, } -pub struct NewUserMessage { - pub new_user: PublicKey, - pub sender: mpsc::Sender, -} - #[derive(Error, Debug, PartialEq)] pub enum TradingError { #[error("Invalid order: {0}")] @@ -95,52 +83,37 @@ impl From<&TradeParams> for TraderMatchParams { /// the trading task by spawning a new tokio task that is handling messages pub fn start( pool: Pool>, - tx_price_feed: broadcast::Sender, -) -> (RemoteHandle>, mpsc::Sender) { - let (sender, mut receiver) = mpsc::channel::(TRADING_MESSAGES_BUFFER_SIZE); - - let mut authenticated_users = HashMap::new(); + tx_price_feed: broadcast::Sender, + notifier: mpsc::Sender, +) -> (RemoteHandle>, mpsc::Sender) { + let (sender, mut receiver) = mpsc::channel::(NEW_ORDERS_BUFFER_SIZE); let (fut, remote_handle) = async move { - - while let Some(trading_message) = receiver.recv().await { - match trading_message { - TradingMessage::NewOrder(new_order_msg) => { - tokio::spawn({ - let mut conn = pool.get()?; - let authenticated_users = authenticated_users.clone(); - let tx_price_feed = tx_price_feed.clone(); - async move { - let new_order = new_order_msg.new_order; - let result = process_new_order(&mut conn, tx_price_feed, new_order, new_order_msg.order_reason, &authenticated_users) - .await; - if let Err(e) = new_order_msg.sender.send(result).await { - tracing::error!("Failed to send new order message! Error: {e:#}"); - } - } - }); - } - TradingMessage::NewUser(new_user_msg) => { - tracing::info!(trader_id=%new_user_msg.new_user, "User logged in to 10101"); - - authenticated_users.insert(new_user_msg.new_user, new_user_msg.sender); - - tokio::spawn({ - let mut conn = pool.get()?; - let authenticated_users = authenticated_users.clone(); - async move { - tracing::debug!(trader_id=%new_user_msg.new_user, "Checking if the user needs to be notified about pending matches"); - if let Err(e) = process_pending_actions(&mut conn, &authenticated_users, new_user_msg.new_user).await { - tracing::error!("Failed to process pending match. Error: {e:#}"); - } - } - }); + while let Some(new_order_msg) = receiver.recv().await { + tokio::spawn({ + let mut conn = pool.get()?; + let tx_price_feed = tx_price_feed.clone(); + let notifier = notifier.clone(); + async move { + let new_order = new_order_msg.new_order; + let result = process_new_order( + &mut conn, + notifier, + tx_price_feed, + new_order, + new_order_msg.order_reason, + ) + .await; + if let Err(e) = new_order_msg.sender.send(result).await { + tracing::error!("Failed to send new order message! Error: {e:#}"); + } } - } + }); } Ok(()) - }.remote_handle(); + } + .remote_handle(); tokio::spawn(fut); @@ -156,10 +129,10 @@ pub fn start( /// Market order: find match and notify traders async fn process_new_order( conn: &mut PgConnection, - tx_price_feed: broadcast::Sender, + notifier: mpsc::Sender, + tx_price_feed: broadcast::Sender, new_order: NewOrder, order_reason: OrderReason, - authenticated_users: &HashMap>, ) -> Result { tracing::info!(trader_id=%new_order.trader_id, "Received a new {:?} order", new_order.order_type); @@ -181,7 +154,7 @@ async fn process_new_order( if new_order.order_type == OrderType::Limit { // we only tell everyone about new limit orders tx_price_feed - .send(OrderbookMsg::NewOrder(order.clone())) + .send(Message::NewOrder(order.clone())) .map_err(|error| anyhow!("Could not update price feed due to '{error}'"))?; } else { // reject new order if there is already a matched order waiting for execution. @@ -232,14 +205,15 @@ async fn process_new_order( tracing::info!(%trader_id, order_id, "Notifying trader about match"); let message = match &order.order_reason { - OrderReason::Manual => OrderbookMsg::Match(match_param.filled_with.clone()), - OrderReason::Expired => OrderbookMsg::AsyncMatch { + OrderReason::Manual => Message::Match(match_param.filled_with.clone()), + OrderReason::Expired => Message::AsyncMatch { order: order.clone(), filled_with: match_param.filled_with.clone(), }, }; - let order_state = match notify_trader(trader_id, message, authenticated_users).await { + let msg = Notification::Message { trader_id, message }; + let order_state = match notifier.send(msg).await { Ok(()) => { tracing::debug!(%trader_id, order_id, "Successfully notified trader"); OrderState::Matched @@ -274,54 +248,6 @@ async fn process_new_order( Ok(order) } -/// Checks if there are any immediate actions to be processed by the app -/// - Pending Match -/// - Rollover -async fn process_pending_actions( - conn: &mut PgConnection, - authenticated_users: &HashMap>, - trader_id: PublicKey, -) -> Result<()> { - if let Some(order) = orders::get_by_trader_id_and_state(conn, trader_id, OrderState::Matched)? { - tracing::debug!(%trader_id, order_id=%order.id, "Notifying trader about pending match"); - - let matches = matches::get_matches_by_order_id(conn, order.id)?; - let filled_with = orderbook_commons::get_filled_with_from_matches(matches)?; - - let message = match order.order_reason { - OrderReason::Manual => OrderbookMsg::Match(filled_with), - OrderReason::Expired => OrderbookMsg::AsyncMatch { order, filled_with }, - }; - - if let Err(e) = notify_trader(trader_id, message, authenticated_users).await { - tracing::warn!("Failed to notify trader. Error: {e:#}"); - } - } else if let Some(position) = - positions::Position::get_open_position_by_trader(conn, trader_id.to_string())? - { - tracing::debug!(%trader_id, position_id=position.id, "Checking if the users positions is eligible for rollover"); - - if orderbook_commons::is_in_rollover_weekend(position.expiry_timestamp) { - let next_expiry = orderbook_commons::get_expiry_timestamp(OffsetDateTime::now_utc()); - if position.expiry_timestamp == next_expiry { - tracing::trace!(%trader_id, position_id=position.id, "Position has already been rolled over"); - return Ok(()); - } - - tracing::debug!(%trader_id, position_id=position.id, "Proposing to rollover users position"); - - let message = OrderbookMsg::Rollover; - if let Err(e) = notify_trader(trader_id, message, authenticated_users).await { - // if that happens, it's most likely that the user already closed its app again - // and we can simply wait for the user to come online again to retry. - tracing::debug!("Failed to notify trader. Error: {e:#}"); - } - } - } - - Ok(()) -} - /// Matches a provided market order with limit orders from the DB /// /// If the order is a long order, we return the short orders sorted by price (highest first) @@ -368,7 +294,7 @@ fn match_order( return Ok(None); } - let expiry_timestamp = orderbook_commons::get_expiry_timestamp(OffsetDateTime::now_utc()); + let expiry_timestamp = coordinator_commons::calculate_next_expiry(OffsetDateTime::now_utc()); // For now we hardcode the oracle pubkey here let oracle_pk = XOnlyPublicKey::from_str( @@ -453,45 +379,20 @@ fn sort_orders(mut orders: Vec, is_long: bool) -> Vec { orders } -async fn notify_trader( - trader_id: PublicKey, - message: OrderbookMsg, - traders: &HashMap>, -) -> Result<()> { - match traders.get(&trader_id) { - None => bail!("Trader is not connected"), - Some(sender) => sender - .send(message) - .await - .map_err(|err| anyhow!("Connection lost to trader {err:#}")), - } -} - #[cfg(test)] pub mod tests { use crate::orderbook::trading::match_order; - use crate::orderbook::trading::notify_trader; use crate::orderbook::trading::sort_orders; - use crate::orderbook::trading::MatchParams; - use crate::orderbook::trading::TraderMatchParams; use bitcoin::secp256k1::PublicKey; - use bitcoin::secp256k1::SecretKey; - use bitcoin::secp256k1::SECP256K1; - use bitcoin::XOnlyPublicKey; - use orderbook_commons::FilledWith; - use orderbook_commons::Match; use orderbook_commons::Order; use orderbook_commons::OrderReason; use orderbook_commons::OrderState; use orderbook_commons::OrderType; - use orderbook_commons::OrderbookMsg; use rust_decimal::Decimal; use rust_decimal_macros::dec; - use std::collections::HashMap; use std::str::FromStr; use time::Duration; use time::OffsetDateTime; - use tokio::sync::mpsc; use trade::ContractSymbol; use trade::Direction; use uuid::Uuid; @@ -788,88 +689,4 @@ pub mod tests { assert!(matched_orders.is_none()); } - - #[tokio::test] - async fn given_matches_will_notify_all_traders() { - let trader_key = SecretKey::from_slice(&b"Me noob, don't lose money pleazz"[..]).unwrap(); - let trader_pub_key = trader_key.public_key(SECP256K1); - let maker_key = SecretKey::from_slice(&b"I am a king trader mate, right!?"[..]).unwrap(); - let maker_pub_key = maker_key.public_key(SECP256K1); - let trader_order_id = Uuid::new_v4(); - let maker_order_id = Uuid::new_v4(); - let oracle_pk = XOnlyPublicKey::from_str( - "16f88cf7d21e6c0f46bcbc983a4e3b19726c6c98858cc31c83551a88fde171c0", - ) - .unwrap(); - let maker_order_price = dec!(20_000); - let expiry_timestamp = OffsetDateTime::now_utc(); - let matched_orders = MatchParams { - taker_match: TraderMatchParams { - trader_id: trader_pub_key, - filled_with: FilledWith { - order_id: trader_order_id, - expiry_timestamp, - oracle_pk, - matches: vec![Match { - id: Uuid::new_v4(), - order_id: maker_order_id, - quantity: dec!(100), - pubkey: maker_pub_key, - execution_price: maker_order_price, - }], - }, - }, - makers_matches: vec![TraderMatchParams { - trader_id: maker_pub_key, - filled_with: FilledWith { - order_id: maker_order_id, - expiry_timestamp, - oracle_pk, - matches: vec![Match { - id: Uuid::new_v4(), - order_id: trader_order_id, - quantity: dec!(100), - pubkey: trader_pub_key, - execution_price: maker_order_price, - }], - }, - }], - }; - let mut traders = HashMap::new(); - let (maker_sender, mut maker_receiver) = mpsc::channel::(1); - let (trader_sender, mut trader_receiver) = mpsc::channel::(1); - traders.insert(maker_pub_key, maker_sender); - traders.insert(trader_pub_key, trader_sender); - - for match_param in matched_orders.matches() { - notify_trader( - match_param.trader_id, - OrderbookMsg::Match(match_param.filled_with.clone()), - &traders, - ) - .await - .unwrap(); - } - - let maker_msg = maker_receiver.recv().await.unwrap(); - let trader_msg = trader_receiver.recv().await.unwrap(); - - match maker_msg { - OrderbookMsg::Match(msg) => { - assert_eq!(msg.order_id, maker_order_id) - } - _ => { - panic!("Invalid message received") - } - } - - match trader_msg { - OrderbookMsg::Match(msg) => { - assert_eq!(msg.order_id, trader_order_id) - } - _ => { - panic!("Invalid message received") - } - } - } } diff --git a/coordinator/src/orderbook/websocket.rs b/coordinator/src/orderbook/websocket.rs index 1147b1221..52fe4efbd 100644 --- a/coordinator/src/orderbook/websocket.rs +++ b/coordinator/src/orderbook/websocket.rs @@ -1,13 +1,12 @@ +use crate::notification::NewUserMessage; use crate::orderbook; -use crate::orderbook::trading::NewUserMessage; -use crate::orderbook::trading::TradingMessage; use crate::routes::AppState; -use axum::extract::ws::Message; +use axum::extract::ws::Message as WebsocketMessage; use axum::extract::ws::WebSocket; use futures::SinkExt; use futures::StreamExt; use orderbook_commons::create_sign_message; -use orderbook_commons::OrderbookMsg; +use orderbook_commons::Message; use orderbook_commons::OrderbookRequest; use orderbook_commons::Signature; use std::sync::Arc; @@ -25,7 +24,7 @@ pub async fn websocket_connection(stream: WebSocket, state: Arc) { // We subscribe *before* sending the "joined" message, so that we will also // display it to our client. - let mut rx = state.tx_price_feed.subscribe(); + let mut price_feed = state.tx_price_feed.subscribe(); let mut conn = match state.pool.clone().get() { Ok(conn) => conn, @@ -44,11 +43,11 @@ pub async fn websocket_connection(stream: WebSocket, state: Arc) { }; // Now send the "all orders" to the new client. - if let Ok(msg) = serde_json::to_string(&OrderbookMsg::AllOrders(orders)) { - let _ = sender.send(Message::Text(msg)).await; + if let Ok(msg) = serde_json::to_string(&Message::AllOrders(orders)) { + let _ = sender.send(WebsocketMessage::Text(msg)).await; } - let (local_sender, mut local_receiver) = mpsc::channel::(100); + let (local_sender, mut local_receiver) = mpsc::channel::(100); let mut local_recv_task = tokio::spawn(async move { while let Some(local_msg) = local_receiver.recv().await { @@ -56,7 +55,7 @@ pub async fn websocket_connection(stream: WebSocket, state: Arc) { Ok(msg) => { if let Err(err) = tokio::time::timeout( WEBSOCKET_SEND_TIMEOUT, - sender.send(Message::Text(msg.clone())), + sender.send(WebsocketMessage::Text(msg.clone())), ) .await { @@ -76,7 +75,7 @@ pub async fn websocket_connection(stream: WebSocket, state: Arc) { let mut send_task = { let local_sender = local_sender.clone(); tokio::spawn(async move { - while let Ok(st) = rx.recv().await { + while let Ok(st) = price_feed.recv().await { if let Err(error) = local_sender.send(st).await { tracing::error!("Could not send message {error:#}"); return; @@ -88,28 +87,28 @@ pub async fn websocket_connection(stream: WebSocket, state: Arc) { // Spawn a task that takes messages from the websocket let local_sender = local_sender.clone(); let mut recv_task = tokio::spawn(async move { - while let Some(Ok(Message::Text(text))) = receiver.next().await { + while let Some(Ok(WebsocketMessage::Text(text))) = receiver.next().await { match serde_json::from_str(text.as_str()) { Ok(OrderbookRequest::Authenticate(Signature { signature, pubkey })) => { let msg = create_sign_message(); match signature.verify(&msg, &pubkey) { Ok(_) => { - if let Err(e) = local_sender.send(OrderbookMsg::Authenticated).await { + if let Err(e) = local_sender.send(Message::Authenticated).await { tracing::error!("Could not respond to user {e:#}"); return; } - let message = TradingMessage::NewUser(NewUserMessage { + let message = NewUserMessage { new_user: pubkey, sender: local_sender.clone(), - }); - if let Err(e) = state.trading_sender.send(message).await { + }; + if let Err(e) = state.tx_user_feed.send(message) { tracing::error!("Could not send trading message. Error: {e:#}"); } } Err(err) => { if let Err(er) = local_sender - .send(OrderbookMsg::InvalidAuthentication(format!( + .send(Message::InvalidAuthentication(format!( "Could not authenticate {err:#}" ))) .await diff --git a/coordinator/src/routes.rs b/coordinator/src/routes.rs index 1e9a3d572..5425c5537 100644 --- a/coordinator/src/routes.rs +++ b/coordinator/src/routes.rs @@ -11,13 +11,14 @@ use crate::admin::send_payment; use crate::admin::sign_message; use crate::db::user; use crate::node::Node; +use crate::notification::NewUserMessage; use crate::orderbook::routes::delete_order; use crate::orderbook::routes::get_order; use crate::orderbook::routes::get_orders; use crate::orderbook::routes::post_order; use crate::orderbook::routes::put_order; use crate::orderbook::routes::websocket_handler; -use crate::orderbook::trading::TradingMessage; +use crate::orderbook::trading::NewOrderMessage; use crate::settings::Settings; use crate::AppError; use autometrics::autometrics; @@ -48,7 +49,7 @@ use ln_dlc_node::node::peer_manager::alias_as_bytes; use ln_dlc_node::node::peer_manager::broadcast_node_announcement; use ln_dlc_node::node::NodeInfo; use opentelemetry_prometheus::PrometheusExporter; -use orderbook_commons::OrderbookMsg; +use orderbook_commons::Message; use orderbook_commons::RouteHintHop; use prometheus::Encoder; use prometheus::TextEncoder; @@ -65,8 +66,9 @@ use tracing::instrument; pub struct AppState { pub node: Node, // Channel used to send messages to all connected clients. - pub tx_price_feed: broadcast::Sender, - pub trading_sender: mpsc::Sender, + pub tx_price_feed: broadcast::Sender, + pub tx_user_feed: broadcast::Sender, + pub trading_sender: mpsc::Sender, pub pool: Pool>, pub settings: RwLock, pub exporter: PrometheusExporter, @@ -82,14 +84,16 @@ pub fn router( exporter: PrometheusExporter, announcement_addresses: Vec, node_alias: &str, - trading_sender: mpsc::Sender, - tx_price_feed: broadcast::Sender, + trading_sender: mpsc::Sender, + tx_price_feed: broadcast::Sender, + tx_user_feed: broadcast::Sender, ) -> Router { let app_state = Arc::new(AppState { node, pool, settings: RwLock::new(settings), tx_price_feed, + tx_user_feed, trading_sender, exporter, announcement_addresses, diff --git a/crates/coordinator-commons/src/lib.rs b/crates/coordinator-commons/src/lib.rs index e239f7b5a..dd23d4e2b 100644 --- a/crates/coordinator-commons/src/lib.rs +++ b/crates/coordinator-commons/src/lib.rs @@ -3,6 +3,10 @@ use orderbook_commons::FilledWith; use rust_decimal::Decimal; use serde::Deserialize; use serde::Serialize; +use time::macros::time; +use time::Duration; +use time::OffsetDateTime; +use time::Weekday; use trade::ContractSymbol; use trade::Direction; @@ -78,3 +82,141 @@ pub struct TokenUpdateParams { pub pubkey: String, pub fcm_token: String, } + +/// Calculates the expiry timestamp at the next Sunday at 3 pm UTC from a given offset date time. +/// If the argument falls in between Friday, 3 pm UTC and Sunday, 3pm UTC, the expiry will be +/// calculated to next weeks Sunday at 3 pm +pub fn calculate_next_expiry(time: OffsetDateTime) -> OffsetDateTime { + let days = if is_in_rollover_weekend(time) || time.weekday() == Weekday::Sunday { + // if the provided time is in the rollover weekend or on a sunday, we expire the sunday the + // week after. + 7 - time.weekday().number_from_monday() + 7 + } else { + 7 - time.weekday().number_from_monday() + }; + let time = time.date().with_hms(15, 0, 0).expect("to fit into time"); + + (time + Duration::days(days as i64)).assume_utc() +} + +/// Checks whether the provided expiry date is eligible for a rollover +/// +/// Returns true if the given date falls in between friday 15 pm UTC and sunday 15 pm UTC +pub fn is_in_rollover_weekend(timestamp: OffsetDateTime) -> bool { + match timestamp.weekday() { + Weekday::Friday => timestamp.time() >= time!(15:00), + Weekday::Saturday => true, + Weekday::Sunday => timestamp.time() < time!(15:00), + _ => false, + } +} + +#[cfg(test)] +mod test { + use crate::calculate_next_expiry; + use crate::is_in_rollover_weekend; + use time::OffsetDateTime; + + #[test] + fn test_is_not_in_rollover_weekend() { + // Wed Aug 09 2023 09:30:23 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691573423).unwrap(); + assert!(!is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_is_just_in_rollover_weekend_friday() { + // Fri Aug 11 2023 15:00:00 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691766000).unwrap(); + assert!(is_in_rollover_weekend(expiry)); + + // Fri Aug 11 2023 15:00:01 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691766001).unwrap(); + assert!(is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_is_in_rollover_weekend_saturday() { + // Sat Aug 12 2023 16:00:00 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691856000).unwrap(); + assert!(is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_is_just_in_rollover_weekend_sunday() { + // Sun Aug 13 2023 14:59:59 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691938799).unwrap(); + assert!(is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_is_just_not_in_rollover_weekend_sunday() { + // Sun Aug 13 2023 15:00:00 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691938800).unwrap(); + assert!(!is_in_rollover_weekend(expiry)); + + // Sun Aug 13 2023 15:00:01 GMT+0000 + let expiry = OffsetDateTime::from_unix_timestamp(1691938801).unwrap(); + assert!(!is_in_rollover_weekend(expiry)); + } + + #[test] + fn test_expiry_timestamp_before_friday_15pm() { + // Wed Aug 09 2023 09:30:23 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691573423).unwrap(); + let expiry = calculate_next_expiry(from); + + // Sun Aug 13 2023 15:00:00 GMT+0000 + assert_eq!(1691938800, expiry.unix_timestamp()); + } + + #[test] + fn test_expiry_timestamp_just_before_friday_15pm() { + // Fri Aug 11 2023 14:59:59 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691765999).unwrap(); + let expiry = calculate_next_expiry(from); + + // Sun Aug 13 2023 15:00:00 GMT+0000 + assert_eq!(1691938800, expiry.unix_timestamp()); + } + + #[test] + fn test_expiry_timestamp_just_after_friday_15pm() { + // Fri Aug 11 2023 15:00:01 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691766001).unwrap(); + let expiry = calculate_next_expiry(from); + + // Sun Aug 20 2023 15:00:00 GMT+0000 + assert_eq!(1692543600, expiry.unix_timestamp()); + } + + #[test] + fn test_expiry_timestamp_at_friday_15pm() { + // Fri Aug 11 2023 15:00:00 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691766000).unwrap(); + let expiry = calculate_next_expiry(from); + + // Sun Aug 20 2023 15:00:00 GMT+0000 + assert_eq!(1692543600, expiry.unix_timestamp()); + } + + #[test] + fn test_expiry_timestamp_after_sunday_15pm() { + // Sun Aug 06 2023 16:00:00 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691337600).unwrap(); + let expiry = calculate_next_expiry(from); + + // Sun Aug 13 2023 15:00:00 GMT+0000 + assert_eq!(1691938800, expiry.unix_timestamp()); + } + + #[test] + fn test_expiry_timestamp_on_saturday() { + // // Sat Aug 12 2023 16:00:00 GMT+0000 + let from = OffsetDateTime::from_unix_timestamp(1691856000).unwrap(); + let expiry = calculate_next_expiry(from); + + // Sun Aug 20 2023 15:00:00 GMT+0000 + assert_eq!(1692543600, expiry.unix_timestamp()); + } +} diff --git a/crates/orderbook-commons/src/lib.rs b/crates/orderbook-commons/src/lib.rs index 9e1ef7cd5..f571f6904 100644 --- a/crates/orderbook-commons/src/lib.rs +++ b/crates/orderbook-commons/src/lib.rs @@ -2,11 +2,9 @@ pub use crate::order_matching_fee::order_matching_fee_taker; pub use crate::price::best_current_price; pub use crate::price::Price; pub use crate::price::Prices; -use anyhow::ensure; -use anyhow::Result; use rust_decimal::prelude::ToPrimitive; use rust_decimal::Decimal; -use secp256k1::Message; +use secp256k1::Message as SecpMessage; use secp256k1::PublicKey; use secp256k1::XOnlyPublicKey; use serde::Deserialize; @@ -14,11 +12,7 @@ use serde::Serialize; use sha2::digest::FixedOutput; use sha2::Digest; use sha2::Sha256; -use std::str::FromStr; -use time::macros::time; -use time::Duration; use time::OffsetDateTime; -use time::Weekday; use trade::ContractSymbol; use trade::Direction; use uuid::Uuid; @@ -69,11 +63,11 @@ pub struct Signature { pub signature: secp256k1::ecdsa::Signature, } -pub fn create_sign_message() -> Message { +pub fn create_sign_message() -> SecpMessage { let sign_message = "Hello it's me Mario".to_string(); let hashed_message = Sha256::new().chain_update(sign_message).finalize_fixed(); - let msg = Message::from_slice(hashed_message.as_slice()) + let msg = SecpMessage::from_slice(hashed_message.as_slice()) .expect("The message is static, hence this should never happen"); msg } @@ -117,8 +111,10 @@ pub enum OrderbookRequest { Authenticate(Signature), } +// TODO(holzeis): The message enum should not be in the orderbook-commons crate as it also contains +// coordinator messages. We should move all common crates into a single one. #[derive(Serialize, Clone, Deserialize, Debug)] -pub enum OrderbookMsg { +pub enum Message { AllOrders(Vec), NewOrder(Order), DeleteOrder(Uuid), @@ -320,72 +316,8 @@ pub struct Matches { pub updated_at: OffsetDateTime, } -pub fn get_filled_with_from_matches(matches: Vec) -> Result { - ensure!( - !matches.is_empty(), - "Need at least one matches record to construct a FilledWith" - ); - - let order_id = matches - .first() - .expect("to have at least one match") - .order_id; - let oracle_pk = XOnlyPublicKey::from_str( - "16f88cf7d21e6c0f46bcbc983a4e3b19726c6c98858cc31c83551a88fde171c0", - ) - .expect("To be a valid pubkey"); - - let expiry_timestamp = get_expiry_timestamp(OffsetDateTime::now_utc()); - - Ok(FilledWith { - order_id, - expiry_timestamp, - oracle_pk, - matches: matches - .iter() - .map(|m| Match { - id: m.id, - order_id: m.order_id, - quantity: m.quantity, - pubkey: m.match_trader_id, - execution_price: m.execution_price, - }) - .collect(), - }) -} - -/// Calculates the expiry timestamp at the next Sunday at 3 pm UTC from a given offset date time. -/// If the argument falls in between Friday, 3 pm UTC and Sunday, 3pm UTC, the expiry will be -/// calculated to next weeks Sunday at 3 pm -pub fn get_expiry_timestamp(time: OffsetDateTime) -> OffsetDateTime { - let days = if is_in_rollover_weekend(time) || time.weekday() == Weekday::Sunday { - // if the provided time is in the rollover weekend or on a sunday, we expire the sunday the - // week after. - 7 - time.weekday().number_from_monday() + 7 - } else { - 7 - time.weekday().number_from_monday() - }; - let time = time.date().with_hms(15, 0, 0).expect("to fit into time"); - - (time + Duration::days(days as i64)).assume_utc() -} - -/// Checks whether the provided expiry date is eligible for a rollover -/// -/// Returns true if the given date falls in between friday 15 pm UTC and sunday 15 pm UTC -pub fn is_in_rollover_weekend(timestamp: OffsetDateTime) -> bool { - match timestamp.weekday() { - Weekday::Friday => timestamp.time() >= time!(15:00), - Weekday::Saturday => true, - Weekday::Sunday => timestamp.time() < time!(15:00), - _ => false, - } -} - #[cfg(test)] mod test { - use crate::get_expiry_timestamp; - use crate::is_in_rollover_weekend; use crate::FilledWith; use crate::Match; use crate::Signature; @@ -402,109 +334,6 @@ mod test { .unwrap() } - #[test] - fn test_is_not_in_rollover_weekend() { - // Wed Aug 09 2023 09:30:23 GMT+0000 - let expiry = OffsetDateTime::from_unix_timestamp(1691573423).unwrap(); - assert!(!is_in_rollover_weekend(expiry)); - } - - #[test] - fn test_is_just_in_rollover_weekend_friday() { - // Fri Aug 11 2023 15:00:00 GMT+0000 - let expiry = OffsetDateTime::from_unix_timestamp(1691766000).unwrap(); - assert!(is_in_rollover_weekend(expiry)); - - // Fri Aug 11 2023 15:00:01 GMT+0000 - let expiry = OffsetDateTime::from_unix_timestamp(1691766001).unwrap(); - assert!(is_in_rollover_weekend(expiry)); - } - - #[test] - fn test_is_in_rollover_weekend_saturday() { - // Sat Aug 12 2023 16:00:00 GMT+0000 - let expiry = OffsetDateTime::from_unix_timestamp(1691856000).unwrap(); - assert!(is_in_rollover_weekend(expiry)); - } - - #[test] - fn test_is_just_in_rollover_weekend_sunday() { - // Sun Aug 13 2023 14:59:59 GMT+0000 - let expiry = OffsetDateTime::from_unix_timestamp(1691938799).unwrap(); - assert!(is_in_rollover_weekend(expiry)); - } - - #[test] - fn test_is_just_not_in_rollover_weekend_sunday() { - // Sun Aug 13 2023 15:00:00 GMT+0000 - let expiry = OffsetDateTime::from_unix_timestamp(1691938800).unwrap(); - assert!(!is_in_rollover_weekend(expiry)); - - // Sun Aug 13 2023 15:00:01 GMT+0000 - let expiry = OffsetDateTime::from_unix_timestamp(1691938801).unwrap(); - assert!(!is_in_rollover_weekend(expiry)); - } - - #[test] - fn test_expiry_timestamp_before_friday_15pm() { - // Wed Aug 09 2023 09:30:23 GMT+0000 - let from = OffsetDateTime::from_unix_timestamp(1691573423).unwrap(); - let expiry = get_expiry_timestamp(from); - - // Sun Aug 13 2023 15:00:00 GMT+0000 - assert_eq!(1691938800, expiry.unix_timestamp()); - } - - #[test] - fn test_expiry_timestamp_just_before_friday_15pm() { - // Fri Aug 11 2023 14:59:59 GMT+0000 - let from = OffsetDateTime::from_unix_timestamp(1691765999).unwrap(); - let expiry = get_expiry_timestamp(from); - - // Sun Aug 13 2023 15:00:00 GMT+0000 - assert_eq!(1691938800, expiry.unix_timestamp()); - } - - #[test] - fn test_expiry_timestamp_just_after_friday_15pm() { - // Fri Aug 11 2023 15:00:01 GMT+0000 - let from = OffsetDateTime::from_unix_timestamp(1691766001).unwrap(); - let expiry = get_expiry_timestamp(from); - - // Sun Aug 20 2023 15:00:00 GMT+0000 - assert_eq!(1692543600, expiry.unix_timestamp()); - } - - #[test] - fn test_expiry_timestamp_at_friday_15pm() { - // Fri Aug 11 2023 15:00:00 GMT+0000 - let from = OffsetDateTime::from_unix_timestamp(1691766000).unwrap(); - let expiry = get_expiry_timestamp(from); - - // Sun Aug 20 2023 15:00:00 GMT+0000 - assert_eq!(1692543600, expiry.unix_timestamp()); - } - - #[test] - fn test_expiry_timestamp_after_sunday_15pm() { - // Sun Aug 06 2023 16:00:00 GMT+0000 - let from = OffsetDateTime::from_unix_timestamp(1691337600).unwrap(); - let expiry = get_expiry_timestamp(from); - - // Sun Aug 13 2023 15:00:00 GMT+0000 - assert_eq!(1691938800, expiry.unix_timestamp()); - } - - #[test] - fn test_expiry_timestamp_on_saturday() { - // // Sat Aug 12 2023 16:00:00 GMT+0000 - let from = OffsetDateTime::from_unix_timestamp(1691856000).unwrap(); - let expiry = get_expiry_timestamp(from); - - // Sun Aug 20 2023 15:00:00 GMT+0000 - assert_eq!(1692543600, expiry.unix_timestamp()); - } - #[test] fn test_serialize_signature() { let secret_key = SecretKey::from_slice(&[ diff --git a/crates/tests-e2e/tests/rollover_position.rs b/crates/tests-e2e/tests/rollover_position.rs index ee2d216eb..8e6af79dc 100644 --- a/crates/tests-e2e/tests/rollover_position.rs +++ b/crates/tests-e2e/tests/rollover_position.rs @@ -22,7 +22,7 @@ async fn can_rollover_position() { .unwrap(); let position = test.app.rx.position().expect("position to exist"); - let new_expiry = orderbook_commons::get_expiry_timestamp(position.expiry); + let new_expiry = coordinator_commons::calculate_next_expiry(position.expiry); coordinator .rollover(&dlc_channel.dlc_channel_id.unwrap()) diff --git a/mobile/native/src/api.rs b/mobile/native/src/api.rs index 6bfa679dc..0d049ee68 100644 --- a/mobile/native/src/api.rs +++ b/mobile/native/src/api.rs @@ -397,5 +397,7 @@ pub fn get_channel_open_fee_estimate_sat() -> Result { } pub fn get_expiry_timestamp() -> SyncReturn { - SyncReturn(orderbook_commons::get_expiry_timestamp(OffsetDateTime::now_utc()).unix_timestamp()) + SyncReturn( + coordinator_commons::calculate_next_expiry(OffsetDateTime::now_utc()).unix_timestamp(), + ) } diff --git a/mobile/native/src/orderbook.rs b/mobile/native/src/orderbook.rs index 4637ad3c9..91bee3de0 100644 --- a/mobile/native/src/orderbook.rs +++ b/mobile/native/src/orderbook.rs @@ -10,8 +10,8 @@ use bdk::bitcoin::secp256k1::SecretKey; use bdk::bitcoin::secp256k1::SECP256K1; use futures::TryStreamExt; use orderbook_commons::best_current_price; +use orderbook_commons::Message; use orderbook_commons::Order; -use orderbook_commons::OrderbookMsg; use orderbook_commons::Prices; use orderbook_commons::Signature; use parking_lot::Mutex; @@ -94,7 +94,7 @@ pub fn subscribe( Ok(Some(msg)) => { tracing::debug!(%msg, "New message from orderbook"); - let msg = match serde_json::from_str::(&msg) { + let msg = match serde_json::from_str::(&msg) { Ok(msg) => msg, Err(e) => { tracing::error!( @@ -105,7 +105,7 @@ pub fn subscribe( }; match msg { - OrderbookMsg::Rollover => { + Message::Rollover => { tracing::info!("Received a rollover request from orderbook."); event::publish(&EventInternal::BackgroundNotification(BackgroundTask::Rollover(TaskStatus::Pending))); @@ -114,7 +114,7 @@ pub fn subscribe( event::publish(&EventInternal::BackgroundNotification(BackgroundTask::Rollover(TaskStatus::Failed))); } }, - OrderbookMsg::AsyncMatch { order, filled_with } => { + Message::AsyncMatch { order, filled_with } => { let order_reason = order.clone().order_reason.into(); tracing::info!(order_id = %order.id, "Received an async match from orderbook. Reason: {order_reason:?}"); event::publish(&EventInternal::BackgroundNotification(BackgroundTask::AsyncTrade(order_reason))); @@ -123,14 +123,14 @@ pub fn subscribe( tracing::error!(order_id = %order.id, "Failed to process async trade. Error: {e:#}"); } }, - OrderbookMsg::Match(filled) => { + Message::Match(filled) => { tracing::info!(order_id = %filled.order_id, "Received match from orderbook"); if let Err(e) = position::handler::trade(filled.clone()).await { tracing::error!(order_id = %filled.order_id, "Trade request sent to coordinator failed. Error: {e:#}"); } }, - OrderbookMsg::AllOrders(initial_orders) => { + Message::AllOrders(initial_orders) => { let mut orders = orders.lock(); if !orders.is_empty() { tracing::debug!("Received new set of initial orders from orderbook, replacing the previously stored orders"); @@ -141,12 +141,12 @@ pub fn subscribe( *orders = initial_orders; update_prices_if_needed(&mut cached_best_price, &orders); }, - OrderbookMsg::NewOrder(order) => { + Message::NewOrder(order) => { let mut orders = orders.lock(); orders.push(order); update_prices_if_needed(&mut cached_best_price, &orders); } - OrderbookMsg::DeleteOrder(order_id) => { + Message::DeleteOrder(order_id) => { let mut orders = orders.lock(); let found = remove_order(&mut orders, order_id); if !found { @@ -154,7 +154,7 @@ pub fn subscribe( } update_prices_if_needed(&mut cached_best_price, &orders); }, - OrderbookMsg::Update(updated_order) => { + Message::Update(updated_order) => { let mut orders = orders.lock(); let found = remove_order(&mut orders, updated_order.id); if !found { From 322ab39521ab3b2717913b986caef09a3c8a3de3 Mon Sep 17 00:00:00 2001 From: Richard Holzeis Date: Tue, 19 Sep 2023 13:53:17 +0200 Subject: [PATCH 6/6] fix: Do not rollover an expired position We need to provide the current timestamp instead of the position expiry, as it would have been otherwise never eligible for rollover. --- coordinator/src/node/rollover.rs | 4 +++- coordinator/src/position/models.rs | 5 +++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/coordinator/src/node/rollover.rs b/coordinator/src/node/rollover.rs index 42d1b2415..63a27a3ea 100644 --- a/coordinator/src/node/rollover.rs +++ b/coordinator/src/node/rollover.rs @@ -78,7 +78,9 @@ async fn check_if_eligible_for_rollover( if let Some(position) = positions::Position::get_open_position_by_trader(conn, trader_id.to_string())? { - if coordinator_commons::is_in_rollover_weekend(position.expiry_timestamp) { + if coordinator_commons::is_in_rollover_weekend(OffsetDateTime::now_utc()) + && !position.is_expired() + { let next_expiry = coordinator_commons::calculate_next_expiry(OffsetDateTime::now_utc()); if position.expiry_timestamp == next_expiry { tracing::trace!(%trader_id, position_id=position.id, "Position has already been rolled over"); diff --git a/coordinator/src/position/models.rs b/coordinator/src/position/models.rs index 5b0750500..2e9ede8ee 100644 --- a/coordinator/src/position/models.rs +++ b/coordinator/src/position/models.rs @@ -74,6 +74,11 @@ pub struct Position { } impl Position { + // Returns true if the position is expired + pub fn is_expired(&self) -> bool { + OffsetDateTime::now_utc() >= self.expiry_timestamp + } + /// Calculates the profit and loss for the coordinator in satoshis pub fn calculate_coordinator_pnl(&self, quote: Quote) -> Result { let closing_price = match self.closing_price {