Skip to content

Commit

Permalink
Merge pull request #1287 from get10101/feat/display-rollover-option-t…
Browse files Browse the repository at this point in the history
…o-user

feat: Automatically rollover in rollover weekend
  • Loading branch information
holzeis authored Sep 19, 2023
2 parents 096c1d0 + 322ab39 commit 0b30115
Show file tree
Hide file tree
Showing 36 changed files with 856 additions and 344 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Show loading screen when app starts with an expired position
- Fix: Prevent crashing the app when there's no Internet connection
- feat: Allow exporting the seed phrase even if the Node is offline
- Changed expiry to next Sunday 3 pm UTC
- Automatically rollover if user opens app during rollover weekend

## [1.2.6] - 2023-09-06

Expand Down
15 changes: 14 additions & 1 deletion coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ use coordinator::node;
use coordinator::node::closed_positions;
use coordinator::node::connection;
use coordinator::node::expired_positions;
use coordinator::node::rollover;
use coordinator::node::storage::NodeStorage;
use coordinator::node::unrealized_pnl;
use coordinator::node::Node;
use coordinator::notification;
use coordinator::notification::NewUserMessage;
use coordinator::notification_service::NotificationService;
use coordinator::orderbook::async_match;
use coordinator::orderbook::trading;
use coordinator::routes::router;
use coordinator::run_migration;
Expand Down Expand Up @@ -192,8 +196,16 @@ async fn main() -> Result<()> {
}
});

let (tx_user_feed, _rx) = broadcast::channel::<NewUserMessage>(100);
let (tx_price_feed, _rx) = broadcast::channel(100);
let (_handle, trading_sender) = trading::start(pool.clone(), tx_price_feed.clone());

let (_handle, notifier) = notification::start(tx_user_feed.clone());

let (_handle, trading_sender) =
trading::start(pool.clone(), tx_price_feed.clone(), notifier.clone());

let _handle = async_match::monitor(pool.clone(), tx_user_feed.clone(), notifier.clone());
let _handle = rollover::monitor(pool.clone(), tx_user_feed.clone(), notifier);

tokio::spawn({
let node = node.clone();
Expand Down Expand Up @@ -235,6 +247,7 @@ async fn main() -> Result<()> {
NODE_ALIAS,
trading_sender,
tx_price_feed,
tx_user_feed,
);

let notification_service = NotificationService::new(opts.fcm_api_key);
Expand Down
2 changes: 2 additions & 0 deletions coordinator/src/db/positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl Position {
) -> Result<()> {
let affected_rows = diesel::update(positions::table)
.filter(positions::trader_pubkey.eq(trader_pubkey))
.filter(positions::position_state.eq(PositionState::Rollover))
.set((
positions::position_state.eq(PositionState::Open),
positions::temporary_contract_id.eq(temporary_contract_id.to_hex()),
Expand Down Expand Up @@ -169,6 +170,7 @@ impl Position {
) -> Result<()> {
let affected_rows = diesel::update(positions::table)
.filter(positions::trader_pubkey.eq(trader_pubkey))
.filter(positions::position_state.eq(PositionState::Open))
.set((
positions::expiry_timestamp.eq(expiry_timestamp),
positions::position_state.eq(PositionState::Rollover),
Expand Down
3 changes: 1 addition & 2 deletions coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ use diesel_migrations::EmbeddedMigrations;
use diesel_migrations::MigrationHarness;
use serde_json::json;

mod rollover;

pub mod admin;
pub mod cli;
pub mod db;
pub mod logger;
pub mod metrics;
pub mod node;
pub mod notification;
pub mod notification_service;
pub mod orderbook;
pub mod position;
Expand Down
1 change: 1 addition & 0 deletions coordinator/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub mod closed_positions;
pub mod connection;
pub mod expired_positions;
pub mod order_matching_fee;
pub mod rollover;
pub mod routing_fees;
pub mod storage;
pub mod unrealized_pnl;
Expand Down
7 changes: 3 additions & 4 deletions coordinator/src/node/expired_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::db;
use crate::node::Node;
use crate::orderbook;
use crate::orderbook::trading::NewOrderMessage;
use crate::orderbook::trading::TradingMessage;
use crate::position::models::Position;
use crate::position::models::PositionState;
use anyhow::Context;
Expand All @@ -22,7 +21,7 @@ use time::Duration;
use time::OffsetDateTime;
use tokio::sync::mpsc;

pub async fn close(node: Node, trading_sender: mpsc::Sender<TradingMessage>) -> Result<()> {
pub async fn close(node: Node, trading_sender: mpsc::Sender<NewOrderMessage>) -> Result<()> {
let mut conn = node.pool.get()?;

let positions = db::positions::Position::get_all_open_positions(&mut conn)
Expand Down Expand Up @@ -93,11 +92,11 @@ pub async fn close(node: Node, trading_sender: mpsc::Sender<TradingMessage>) ->
};

let (sender, mut receiver) = mpsc::channel::<Result<Order>>(1);
let message = TradingMessage::NewOrder(NewOrderMessage {
let message = NewOrderMessage {
new_order: new_order.clone(),
order_reason: OrderReason::Expired,
sender,
});
};

if let Err(e) = trading_sender.send(message).await {
tracing::error!(order_id=%new_order.id, trader_id=%new_order.trader_id, "Failed to submit new order for closing expired position. Error: {e:#}");
Expand Down
84 changes: 77 additions & 7 deletions coordinator/src/rollover.rs → coordinator/src/node/rollover.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
use crate::db;
use crate::db::positions;
use crate::node::Node;
use crate::notification::NewUserMessage;
use crate::notification::Notification;
use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
use bitcoin::hashes::hex::ToHex;
use bitcoin::secp256k1::PublicKey;
use bitcoin::XOnlyPublicKey;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::Pool;
use diesel::PgConnection;
use dlc_manager::contract::contract_input::ContractInput;
use dlc_manager::contract::contract_input::ContractInputInfo;
use dlc_manager::contract::contract_input::OracleInput;
use dlc_manager::contract::Contract;
use dlc_manager::contract::ContractDescriptor;
use dlc_manager::ChannelId;
use futures::future::RemoteHandle;
use futures::FutureExt;
use orderbook_commons::Message;
use std::str::FromStr;
use time::Duration;
use time::OffsetDateTime;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use trade::ContractSymbol;

#[derive(Debug, Clone)]
Expand All @@ -29,6 +39,69 @@ struct Rollover {
contract_tx_fee_rate: u64,
}

pub fn monitor(
pool: Pool<ConnectionManager<PgConnection>>,
tx_user_feed: broadcast::Sender<NewUserMessage>,
notifier: mpsc::Sender<Notification>,
) -> RemoteHandle<Result<()>> {
let mut user_feed = tx_user_feed.subscribe();
let (fut, remote_handle) = async move {
while let Ok(new_user_msg) = user_feed.recv().await {
tokio::spawn({
let mut conn = pool.get()?;
let notifier = notifier.clone();
async move {
if let Err(e) =
check_if_eligible_for_rollover(&mut conn, notifier, new_user_msg.new_user)
.await
{
tracing::error!("Failed to check if eligible for rollover. Error: {e:#}");
}
}
});
}
Ok(())
}
.remote_handle();

tokio::spawn(fut);

remote_handle
}

async fn check_if_eligible_for_rollover(
conn: &mut PgConnection,
notifier: mpsc::Sender<Notification>,
trader_id: PublicKey,
) -> Result<()> {
tracing::debug!(%trader_id, "Checking if the users positions is eligible for rollover");
if let Some(position) =
positions::Position::get_open_position_by_trader(conn, trader_id.to_string())?
{
if coordinator_commons::is_in_rollover_weekend(OffsetDateTime::now_utc())
&& !position.is_expired()
{
let next_expiry = coordinator_commons::calculate_next_expiry(OffsetDateTime::now_utc());
if position.expiry_timestamp == next_expiry {
tracing::trace!(%trader_id, position_id=position.id, "Position has already been rolled over");
return Ok(());
}

tracing::debug!(%trader_id, position_id=position.id, "Proposing to rollover users position");

let message = Notification::Message {
trader_id,
message: Message::Rollover,
};
if let Err(e) = notifier.send(message).await {
tracing::debug!("Failed to notify trader. Error: {e:#}");
}
}
}

Ok(())
}

impl Rollover {
pub fn new(contract: Contract) -> Result<Self> {
let contract = match contract {
Expand Down Expand Up @@ -81,11 +154,8 @@ impl Rollover {
}

/// Calculates the maturity time based on the current expiry timestamp.
///
/// todo(holzeis): this should come from a configuration https://github.com/get10101/10101/issues/1029
pub fn maturity_time(&self) -> OffsetDateTime {
let tomorrow = self.expiry_timestamp.date() + Duration::days(7);
tomorrow.midnight().assume_utc()
coordinator_commons::calculate_next_expiry(self.expiry_timestamp)
}
}

Expand Down Expand Up @@ -209,8 +279,8 @@ pub mod tests {
let event_id = rollover.event_id();

// expect expiry in seven days at midnight.
// Thu Aug 24 2023 00:00:00 GMT+0000
assert_eq!(event_id, format!("btcusd1692835200"))
// Sun Aug 20 2023 15:00:00 GMT+0000
assert_eq!(event_id, format!("btcusd1692543600"))
}

#[test]
Expand Down
84 changes: 84 additions & 0 deletions coordinator/src/notification/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use anyhow::Result;
use bitcoin::secp256k1::PublicKey;
use futures::future::RemoteHandle;
use futures::FutureExt;
use orderbook_commons::Message;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use tokio::sync::broadcast;
use tokio::sync::mpsc;

/// This value is arbitrarily set to 100 and defines the message accepted in the notification
/// channel buffer.
const NOTIFICATION_BUFFER_SIZE: usize = 100;

// TODO(holzeis): This enum should be extended to allow for sending push notifications.
pub enum Notification {
Message {
trader_id: PublicKey,
message: Message,
},
}

#[derive(Clone)]
pub struct NewUserMessage {
pub new_user: PublicKey,
pub sender: mpsc::Sender<Message>,
}

pub fn start(
tx_user_feed: broadcast::Sender<NewUserMessage>,
) -> (RemoteHandle<Result<()>>, mpsc::Sender<Notification>) {
let (sender, mut receiver) = mpsc::channel::<Notification>(NOTIFICATION_BUFFER_SIZE);

let authenticated_users = Arc::new(RwLock::new(HashMap::new()));

tokio::task::spawn({
let traders = authenticated_users.clone();
async move {
let mut user_feed = tx_user_feed.subscribe();
while let Ok(new_user_msg) = user_feed.recv().await {
traders
.write()
.expect("RwLock to not be poisoned")
.insert(new_user_msg.new_user, new_user_msg.sender);
}
}
});

let (fut, remote_handle) = {
async move {
while let Some(notification) = receiver.recv().await {
match notification {
Notification::Message { trader_id, message } => {
tracing::info!(%trader_id, "Sending message: {message:?}");

let trader = {
let traders = authenticated_users
.read()
.expect("RwLock to not be poisoned");
traders.get(&trader_id).cloned()
};

match trader {
Some(sender) => {
if let Err(e) = sender.send(message).await {
tracing::warn!("Connection lost to trader {e:#}");
}
}
None => tracing::warn!(%trader_id, "Trader is not connected"),
}
}
}
}

Ok(())
}
.remote_handle()
};

tokio::spawn(fut);

(remote_handle, sender)
}
Loading

0 comments on commit 0b30115

Please sign in to comment.