Skip to content

Commit

Permalink
Merge pull request #1297 from get10101/feat/send-a-notification-when-…
Browse files Browse the repository at this point in the history
…the-order-is-about-to-expire

feat: Send push notifications for expired and expiring positions
  • Loading branch information
klochowicz authored Sep 20, 2023
2 parents 31bba17 + 8f887fc commit 033ff5e
Show file tree
Hide file tree
Showing 16 changed files with 515 additions and 202 deletions.
61 changes: 48 additions & 13 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use anyhow::Context;
use anyhow::Result;
use coordinator::cli::Opts;
use coordinator::logger;
use coordinator::message::spawn_delivering_messages_to_authenticated_users;
use coordinator::message::NewUserMessage;
use coordinator::metrics;
use coordinator::metrics::init_meter;
use coordinator::node;
Expand All @@ -12,9 +14,8 @@ use coordinator::node::rollover;
use coordinator::node::storage::NodeStorage;
use coordinator::node::unrealized_pnl;
use coordinator::node::Node;
use coordinator::notification;
use coordinator::notification::NewUserMessage;
use coordinator::notification_service::NotificationService;
use coordinator::notifications::query_and_send_position_notifications;
use coordinator::notifications::NotificationService;
use coordinator::orderbook::async_match;
use coordinator::orderbook::trading;
use coordinator::routes::router;
Expand Down Expand Up @@ -42,10 +43,15 @@ use tracing::metadata::LevelFilter;

const PROCESS_PROMETHEUS_METRICS: Duration = Duration::from_secs(10);
const PROCESS_INCOMING_DLC_MESSAGES_INTERVAL: Duration = Duration::from_millis(200);
const EXPIRED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(300);
const EXPIRED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(5 * 60);
const CLOSED_POSITION_SYNC_INTERVAL: Duration = Duration::from_secs(30);
const UNREALIZED_PNL_SYNC_INTERVAL: Duration = Duration::from_secs(600);
const UNREALIZED_PNL_SYNC_INTERVAL: Duration = Duration::from_secs(10 * 60);
const CONNECTION_CHECK_INTERVAL: Duration = Duration::from_secs(30);
/// How often to check for expiring/expired positions to send push notifications for.
/// This should be configured in conjunction with the time windows of
/// expiring/expired notifications, ideally a bit less than the time window
/// (e.g. 58min for a 1h time window).
const POSITION_PUSH_NOTIFICATION_INTERVAL: Duration = Duration::from_secs(58 * 60);

const NODE_ALIAS: &str = "10101.finance";

Expand Down Expand Up @@ -199,24 +205,33 @@ async fn main() -> Result<()> {
});

let (tx_user_feed, _rx) = broadcast::channel::<NewUserMessage>(100);

let (tx_price_feed, _rx) = broadcast::channel(100);

let (_handle, notifier) = notification::start(tx_user_feed.clone());
let (_handle, auth_users_notifier) =
spawn_delivering_messages_to_authenticated_users(tx_user_feed.clone());

let notification_service = NotificationService::new(opts.fcm_api_key.clone());

let (_handle, trading_sender) = trading::start(
pool.clone(),
tx_price_feed.clone(),
notifier.clone(),
auth_users_notifier.clone(),
notification_service.get_sender(),
network,
);

let _handle = async_match::monitor(
pool.clone(),
tx_user_feed.clone(),
notifier.clone(),
auth_users_notifier.clone(),
network,
);
let _handle = rollover::monitor(
pool.clone(),
tx_user_feed.clone(),
auth_users_notifier,
network,
);
let _handle = rollover::monitor(pool.clone(), tx_user_feed.clone(), notifier, network);

tokio::spawn({
let node = node.clone();
Expand Down Expand Up @@ -251,7 +266,7 @@ async fn main() -> Result<()> {

let app = router(
node,
pool,
pool.clone(),
settings,
exporter,
opts.p2p_announcement_addresses(),
Expand All @@ -261,9 +276,29 @@ async fn main() -> Result<()> {
tx_user_feed,
);

let notification_service = NotificationService::new(opts.fcm_api_key);
tokio::spawn({
let sender = notification_service.get_sender();
let pool = pool.clone();
async move {
loop {
tracing::debug!("Running expiring/expired position push notification task");
match pool.get() {
Ok(mut conn) => {
if let Err(e) =
query_and_send_position_notifications(&mut conn, &sender).await
{
tracing::error!("Failed to send notifications: {e:#}");
}
}
Err(e) => {
tracing::error!("Failed to get pool connection. Error: {e:?}");
}
}

let _sender = notification_service.get_sender();
tokio::time::sleep(POSITION_PUSH_NOTIFICATION_INTERVAL).await;
}
}
});

// Start the metrics exporter
autometrics::prometheus_exporter::init();
Expand Down
18 changes: 18 additions & 0 deletions coordinator/src/db/positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,24 @@ impl Position {
Ok(positions)
}

pub fn get_all_positions_with_expiry_within(
conn: &mut PgConnection,
start: OffsetDateTime,
end: OffsetDateTime,
) -> QueryResult<Vec<crate::position::models::Position>> {
let positions = positions::table
.filter(positions::expiry_timestamp.gt(start))
.filter(positions::expiry_timestamp.lt(end))
.load::<Position>(conn)?;

let positions = positions
.into_iter()
.map(crate::position::models::Position::from)
.collect();

Ok(positions)
}

pub fn get_all_open_or_closing_positions(
conn: &mut PgConnection,
) -> QueryResult<Vec<crate::position::models::Position>> {
Expand Down
4 changes: 2 additions & 2 deletions coordinator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ pub mod admin;
pub mod cli;
pub mod db;
pub mod logger;
pub mod message;
pub mod metrics;
pub mod node;
pub mod notification;
pub mod notification_service;
pub mod notifications;
pub mod orderbook;
pub mod position;
pub mod routes;
Expand Down
13 changes: 13 additions & 0 deletions coordinator/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,16 @@ pub fn init_tracing(level: LevelFilter, json_format: bool, tokio_console: bool)

Ok(())
}

/// Initialise tracing for tests
#[cfg(test)]
pub(crate) fn init_tracing_for_test() {
static TRACING_TEST_SUBSCRIBER: std::sync::Once = std::sync::Once::new();

TRACING_TEST_SUBSCRIBER.call_once(|| {
tracing_subscriber::fmt()
.with_env_filter("debug")
.with_test_writer()
.init()
})
}
26 changes: 10 additions & 16 deletions coordinator/src/notification/mod.rs → coordinator/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@ use bitcoin::secp256k1::PublicKey;
use futures::future::RemoteHandle;
use futures::FutureExt;
use orderbook_commons::Message;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use tokio::sync::broadcast;
use tokio::sync::mpsc;

/// This value is arbitrarily set to 100 and defines the message accepted in the notification
/// This value is arbitrarily set to 100 and defines the message accepted in the message
/// channel buffer.
const NOTIFICATION_BUFFER_SIZE: usize = 100;

// TODO(holzeis): This enum should be extended to allow for sending push notifications.
pub enum Notification {
Message {
/// Message sent to users via the websocket.
pub enum OrderbookMessage {
TraderMessage {
trader_id: PublicKey,
message: Message,
},
Expand All @@ -27,10 +27,10 @@ pub struct NewUserMessage {
pub sender: mpsc::Sender<Message>,
}

pub fn start(
pub fn spawn_delivering_messages_to_authenticated_users(
tx_user_feed: broadcast::Sender<NewUserMessage>,
) -> (RemoteHandle<Result<()>>, mpsc::Sender<Notification>) {
let (sender, mut receiver) = mpsc::channel::<Notification>(NOTIFICATION_BUFFER_SIZE);
) -> (RemoteHandle<Result<()>>, mpsc::Sender<OrderbookMessage>) {
let (sender, mut receiver) = mpsc::channel::<OrderbookMessage>(NOTIFICATION_BUFFER_SIZE);

let authenticated_users = Arc::new(RwLock::new(HashMap::new()));

Expand All @@ -41,7 +41,6 @@ pub fn start(
while let Ok(new_user_msg) = user_feed.recv().await {
traders
.write()
.expect("RwLock to not be poisoned")
.insert(new_user_msg.new_user, new_user_msg.sender);
}
}
Expand All @@ -51,15 +50,10 @@ pub fn start(
async move {
while let Some(notification) = receiver.recv().await {
match notification {
Notification::Message { trader_id, message } => {
OrderbookMessage::TraderMessage { trader_id, message } => {
tracing::info!(%trader_id, "Sending message: {message:?}");

let trader = {
let traders = authenticated_users
.read()
.expect("RwLock to not be poisoned");
traders.get(&trader_id).cloned()
};
let trader = authenticated_users.read().get(&trader_id).cloned();

match trader {
Some(sender) => {
Expand Down
10 changes: 5 additions & 5 deletions coordinator/src/node/rollover.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::db;
use crate::db::positions;
use crate::message::NewUserMessage;
use crate::message::OrderbookMessage;
use crate::node::Node;
use crate::notification::NewUserMessage;
use crate::notification::Notification;
use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
Expand Down Expand Up @@ -44,7 +44,7 @@ struct Rollover {
pub fn monitor(
pool: Pool<ConnectionManager<PgConnection>>,
tx_user_feed: broadcast::Sender<NewUserMessage>,
notifier: mpsc::Sender<Notification>,
notifier: mpsc::Sender<OrderbookMessage>,
network: Network,
) -> RemoteHandle<Result<()>> {
let mut user_feed = tx_user_feed.subscribe();
Expand Down Expand Up @@ -78,7 +78,7 @@ pub fn monitor(

async fn check_if_eligible_for_rollover(
conn: &mut PgConnection,
notifier: mpsc::Sender<Notification>,
notifier: mpsc::Sender<OrderbookMessage>,
trader_id: PublicKey,
network: Network,
) -> Result<()> {
Expand All @@ -98,7 +98,7 @@ async fn check_if_eligible_for_rollover(

tracing::debug!(%trader_id, position_id=position.id, "Proposing to rollover users position");

let message = Notification::Message {
let message = OrderbookMessage::TraderMessage {
trader_id,
message: Message::Rollover,
};
Expand Down
Loading

0 comments on commit 033ff5e

Please sign in to comment.