Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Automatically rollover in rollover weekend #1287

Merged
merged 6 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Show loading screen when app starts with an expired position
- Fix: Prevent crashing the app when there's no Internet connection
- feat: Allow exporting the seed phrase even if the Node is offline
- Changed expiry to next Sunday 3 pm UTC
- Automatically rollover if user opens app during rollover weekend

## [1.2.6] - 2023-09-06

Expand Down
15 changes: 14 additions & 1 deletion coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ use coordinator::node;
use coordinator::node::closed_positions;
use coordinator::node::connection;
use coordinator::node::expired_positions;
use coordinator::node::rollover;
use coordinator::node::storage::NodeStorage;
use coordinator::node::unrealized_pnl;
use coordinator::node::Node;
use coordinator::notification;
use coordinator::notification::NewUserMessage;
use coordinator::notification_service::NotificationService;
use coordinator::orderbook::async_match;
use coordinator::orderbook::trading;
use coordinator::routes::router;
use coordinator::run_migration;
Expand Down Expand Up @@ -192,8 +196,16 @@ async fn main() -> Result<()> {
}
});

let (tx_user_feed, _rx) = broadcast::channel::<NewUserMessage>(100);
let (tx_price_feed, _rx) = broadcast::channel(100);
let (_handle, trading_sender) = trading::start(pool.clone(), tx_price_feed.clone());

let (_handle, notifier) = notification::start(tx_user_feed.clone());

let (_handle, trading_sender) =
trading::start(pool.clone(), tx_price_feed.clone(), notifier.clone());

let _handle = async_match::monitor(pool.clone(), tx_user_feed.clone(), notifier.clone());
let _handle = rollover::monitor(pool.clone(), tx_user_feed.clone(), notifier);
Comment on lines +202 to +208
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ Doesn't the fact that you shadow _handle cause the previous RemoteHandles to be dropped stopping the corresponding tasks?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I think there is not much benefit to just returning the RemoteHandle to "ignore" it. What you usually do with a RemoteHandle is store in some other struct so that when you drop that one the corresponding task is cancelled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ Doesn't the fact that you shadow _handle cause the previous RemoteHandles to be dropped stopping the corresponding tasks?

Doesn't look like it. At least it worked during testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I think there is not much benefit to just returning the RemoteHandle to "ignore" it. What you usually do with a RemoteHandle is store in some other struct so that when you drop that one the corresponding task is cancelled.

Well the reason I return it to the top level is so that the tasks live as long as the coordinator. That's why I am doing this, do you think that is not needed? Happy to remove it, but I think when I did, the compiler warned me about it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ Doesn't the fact that you shadow _handle cause the previous RemoteHandles to be dropped stopping the corresponding tasks?

Doesn't look like it. At least it worked during testing.

Confirmed it does not drop it: https://www.reddit.com/r/rust/comments/qpekne/what_actually_happens_when_ypu_shadow_a_variable/.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I think there is not much benefit to just returning the RemoteHandle to "ignore" it. What you usually do with a RemoteHandle is store in some other struct so that when you drop that one the corresponding task is cancelled.

Well the reason I return it to the top level is so that the tasks live as long as the coordinator. That's why I am doing this, do you think that is not needed?

I guess because the coordinator is the whole program it isn't actually useful. It would be handy if we could remake the coordinator in case of catastrophic failure (without restarting the binary). Then you would want to cancel all the old tasks and restart them with a fresh coordinator struct.

Happy to remove it, but I think when I did, the compiler warned me about it.

That's probably because you called remote_handle on the future and returned the RemoteHandle which is marked as must_use. You can obviously just spawn the future without doing any of that, but I don't mind if you wanna keep the current version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got ya. Since this PR is now open for quite some time. I prefer to go ahead with the current change, and adapt the remote_handle stuff in a follow up PR.


tokio::spawn({
let node = node.clone();
Expand Down Expand Up @@ -235,6 +247,7 @@ async fn main() -> Result<()> {
NODE_ALIAS,
trading_sender,
tx_price_feed,
tx_user_feed,
);

let notification_service = NotificationService::new(opts.fcm_api_key);
Expand Down
2 changes: 2 additions & 0 deletions coordinator/src/db/positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl Position {
) -> Result<()> {
let affected_rows = diesel::update(positions::table)
.filter(positions::trader_pubkey.eq(trader_pubkey))
.filter(positions::position_state.eq(PositionState::Rollover))
.set((
positions::position_state.eq(PositionState::Open),
positions::temporary_contract_id.eq(temporary_contract_id.to_hex()),
Expand Down Expand Up @@ -169,6 +170,7 @@ impl Position {
) -> Result<()> {
let affected_rows = diesel::update(positions::table)
.filter(positions::trader_pubkey.eq(trader_pubkey))
.filter(positions::position_state.eq(PositionState::Open))
.set((
positions::expiry_timestamp.eq(expiry_timestamp),
positions::position_state.eq(PositionState::Rollover),
Expand Down
3 changes: 1 addition & 2 deletions coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ use diesel_migrations::EmbeddedMigrations;
use diesel_migrations::MigrationHarness;
use serde_json::json;

mod rollover;

pub mod admin;
pub mod cli;
pub mod db;
pub mod logger;
pub mod metrics;
pub mod node;
pub mod notification;
pub mod notification_service;
pub mod orderbook;
pub mod position;
Expand Down
1 change: 1 addition & 0 deletions coordinator/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub mod closed_positions;
pub mod connection;
pub mod expired_positions;
pub mod order_matching_fee;
pub mod rollover;
pub mod routing_fees;
pub mod storage;
pub mod unrealized_pnl;
Expand Down
7 changes: 3 additions & 4 deletions coordinator/src/node/expired_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::db;
use crate::node::Node;
use crate::orderbook;
use crate::orderbook::trading::NewOrderMessage;
use crate::orderbook::trading::TradingMessage;
use crate::position::models::Position;
use crate::position::models::PositionState;
use anyhow::Context;
Expand All @@ -22,7 +21,7 @@ use time::Duration;
use time::OffsetDateTime;
use tokio::sync::mpsc;

pub async fn close(node: Node, trading_sender: mpsc::Sender<TradingMessage>) -> Result<()> {
pub async fn close(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>) -> Result<()> {
let mut conn = node.pool.get()?;

let positions = db::positions::Position::get_all_open_positions(&mut conn)
Expand Down Expand Up @@ -93,11 +92,11 @@ pub async fn close(node: Node, trading_sender: mpsc::Sender<TradingMessage>) ->
};

let (sender, mut receiver) = mpsc::channel::<Result<Order>>(1);
let message = TradingMessage::NewOrder(NewOrderMessage {
let message = NewOrderMessage {
new_order: new_order.clone(),
order_reason: OrderReason::Expired,
sender,
});
};

if let Err(e) = trading_sender.send(message).await {
tracing::error!(order_id=%new_order.id, trader_id=%new_order.trader_id, "Failed to submit new order for closing expired position. Error: {e:#}");
Expand Down
82 changes: 75 additions & 7 deletions coordinator/src/rollover.rs → coordinator/src/node/rollover.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
use crate::db;
use crate::db::positions;
use crate::node::Node;
use crate::notification::NewUserMessage;
use crate::notification::Notification;
use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
use bitcoin::hashes::hex::ToHex;
use bitcoin::secp256k1::PublicKey;
use bitcoin::XOnlyPublicKey;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::Pool;
use diesel::PgConnection;
use dlc_manager::contract::contract_input::ContractInput;
use dlc_manager::contract::contract_input::ContractInputInfo;
use dlc_manager::contract::contract_input::OracleInput;
use dlc_manager::contract::Contract;
use dlc_manager::contract::ContractDescriptor;
use dlc_manager::ChannelId;
use futures::future::RemoteHandle;
use futures::FutureExt;
use orderbook_commons::Message;
use std::str::FromStr;
use time::Duration;
use time::OffsetDateTime;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use trade::ContractSymbol;

#[derive(Debug, Clone)]
Expand All @@ -29,6 +39,67 @@ struct Rollover {
contract_tx_fee_rate: u64,
}

pub fn monitor(
pool: Pool<ConnectionManager<PgConnection>>,
tx_user_feed: broadcast::Sender<NewUserMessage>,
notifier: mpsc::Sender<Notification>,
) -> RemoteHandle<Result<()>> {
let mut user_feed = tx_user_feed.subscribe();
let (fut, remote_handle) = async move {
while let Ok(new_user_msg) = user_feed.recv().await {
tokio::spawn({
let mut conn = pool.get()?;
let notifier = notifier.clone();
async move {
if let Err(e) =
check_if_eligible_for_rollover(&mut conn, notifier, new_user_msg.new_user)
.await
{
tracing::error!("Failed to check if eligible for rollover. Error: {e:#}");
}
}
});
}
Ok(())
}
.remote_handle();

tokio::spawn(fut);

remote_handle
}

async fn check_if_eligible_for_rollover(
conn: &mut PgConnection,
notifier: mpsc::Sender<Notification>,
trader_id: PublicKey,
) -> Result<()> {
tracing::debug!(%trader_id, "Checking if the users positions is eligible for rollover");
if let Some(position) =
positions::Position::get_open_position_by_trader(conn, trader_id.to_string())?
{
if coordinator_commons::is_in_rollover_weekend(position.expiry_timestamp) {
let next_expiry = coordinator_commons::calculate_next_expiry(OffsetDateTime::now_utc());
if position.expiry_timestamp == next_expiry {
tracing::trace!(%trader_id, position_id=position.id, "Position has already been rolled over");
return Ok(());
}

tracing::debug!(%trader_id, position_id=position.id, "Proposing to rollover users position");

let message = Notification::Message {
trader_id,
message: Message::Rollover,
};
if let Err(e) = notifier.send(message).await {
tracing::debug!("Failed to notify trader. Error: {e:#}");
}
}
}

Ok(())
}

impl Rollover {
pub fn new(contract: Contract) -> Result<Self> {
let contract = match contract {
Expand Down Expand Up @@ -81,11 +152,8 @@ impl Rollover {
}

/// Calculates the maturity time based on the current expiry timestamp.
///
/// todo(holzeis): this should come from a configuration https://github.com/get10101/10101/issues/1029
pub fn maturity_time(&self) -> OffsetDateTime {
let tomorrow = self.expiry_timestamp.date() + Duration::days(7);
tomorrow.midnight().assume_utc()
coordinator_commons::calculate_next_expiry(self.expiry_timestamp)
}
}

Expand Down Expand Up @@ -209,8 +277,8 @@ pub mod tests {
let event_id = rollover.event_id();

// expect expiry in seven days at midnight.
// Thu Aug 24 2023 00:00:00 GMT+0000
assert_eq!(event_id, format!("btcusd1692835200"))
// Sun Aug 20 2023 15:00:00 GMT+0000
assert_eq!(event_id, format!("btcusd1692543600"))
}

#[test]
Expand Down
84 changes: 84 additions & 0 deletions coordinator/src/notification/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use anyhow::Result;
use bitcoin::secp256k1::PublicKey;
use futures::future::RemoteHandle;
use futures::FutureExt;
use orderbook_commons::Message;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use tokio::sync::broadcast;
use tokio::sync::mpsc;

/// This value is arbitrarily set to 100 and defines the message accepted in the notification
/// channel buffer.
const NOTIFICATION_BUFFER_SIZE: usize = 100;

// TODO(holzeis): This enum should be extended to allow for sending push notifications.
pub enum Notification {
Message {
trader_id: PublicKey,
message: Message,
},
}

#[derive(Clone)]
pub struct NewUserMessage {
pub new_user: PublicKey,
pub sender: mpsc::Sender<Message>,
}

pub fn start(
tx_user_feed: broadcast::Sender<NewUserMessage>,
) -> (RemoteHandle<Result<()>>, mpsc::Sender<Notification>) {
let (sender, mut receiver) = mpsc::channel::<Notification>(NOTIFICATION_BUFFER_SIZE);

let authenticated_users = Arc::new(RwLock::new(HashMap::new()));

tokio::task::spawn({
let traders = authenticated_users.clone();
async move {
let mut user_feed = tx_user_feed.subscribe();
while let Ok(new_user_msg) = user_feed.recv().await {
traders
.write()
.expect("RwLock to not be poisoned")
.insert(new_user_msg.new_user, new_user_msg.sender);
}
}
});

let (fut, remote_handle) = {
async move {
while let Some(notification) = receiver.recv().await {
match notification {
Notification::Message { trader_id, message } => {
tracing::info!(%trader_id, "Sending message: {message:?}");

let trader = {
let traders = authenticated_users
.read()
.expect("RwLock to not be poisoned");
traders.get(&trader_id).cloned()
};

match trader {
Some(sender) => {
if let Err(e) = sender.send(message).await {
tracing::warn!("Connection lost to trader {e:#}");
}
}
None => tracing::warn!(%trader_id, "Trader is not connected"),
}
}
}
}

Ok(())
}
.remote_handle()
};

tokio::spawn(fut);

(remote_handle, sender)
}
Loading