From 33a7016d65394622ad0f9f751468867c69e90a9d Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Mon, 18 Sep 2023 15:33:19 -0700 Subject: [PATCH] feat: Send a notification prompt when executing AsyncTrade When the other party is not online, send a notification that a position has expired. --- coordinator/src/bin/coordinator.rs | 10 +++++++--- coordinator/src/notifications.rs | 16 +++++++++++++--- coordinator/src/orderbook/trading.rs | 25 +++++++++++++++++++++++-- 3 files changed, 43 insertions(+), 8 deletions(-) diff --git a/coordinator/src/bin/coordinator.rs b/coordinator/src/bin/coordinator.rs index 87cf6a950..6eef98141 100644 --- a/coordinator/src/bin/coordinator.rs +++ b/coordinator/src/bin/coordinator.rs @@ -194,8 +194,14 @@ async fn main() -> Result<()> { } }); + let notification_service = NotificationService::new(opts.fcm_api_key); + let (tx_price_feed, _rx) = broadcast::channel(100); - let (_handle, trading_sender) = trading::start(pool.clone(), tx_price_feed.clone()); + let (_handle, trading_sender) = trading::start( + pool.clone(), + tx_price_feed.clone(), + notification_service.get_sender(), + ); tokio::spawn({ let node = node.clone(); @@ -239,8 +245,6 @@ async fn main() -> Result<()> { tx_price_feed, ); - let notification_service = NotificationService::new(opts.fcm_api_key); - tokio::spawn({ let sender = notification_service.get_sender(); let pool = pool.clone(); diff --git a/coordinator/src/notifications.rs b/coordinator/src/notifications.rs index c5e283a07..35416e8e5 100644 --- a/coordinator/src/notifications.rs +++ b/coordinator/src/notifications.rs @@ -124,7 +124,18 @@ fn build_notification<'a>(kind: NotificationKind) -> fcm::Notification<'a> { } #[derive(Clone, Debug, PartialEq, Eq)] -pub struct FcmToken(pub String); +pub struct FcmToken(String); + +impl FcmToken { + pub fn new(token: String) -> Result { + anyhow::ensure!(!token.is_empty(), "FCM token cannot be empty"); + Ok(Self(token)) + } + + pub fn get(&self) -> &str { + &self.0 + } +} impl Display for FcmToken { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -140,7 +151,7 @@ async fn send_notification<'a>( ) -> Result<()> { anyhow::ensure!(!api_key.is_empty(), "FCM API key is empty"); - let mut message_builder = fcm::MessageBuilder::new(api_key, &fcm_token.0); + let mut message_builder = fcm::MessageBuilder::new(api_key, fcm_token.get()); message_builder.notification(notification); let message = message_builder.finalize(); let response = client @@ -353,4 +364,3 @@ pub mod tests { assert_eq!(notification.user_fcm_token, position_just_expired.1); } } - diff --git a/coordinator/src/orderbook/trading.rs b/coordinator/src/orderbook/trading.rs index 471fa99f1..8ca9dfc35 100644 --- a/coordinator/src/orderbook/trading.rs +++ b/coordinator/src/orderbook/trading.rs @@ -1,3 +1,7 @@ +use crate::db::user; +use crate::notifications::FcmToken; +use crate::notifications::Notification; +use crate::notifications::NotificationKind; use crate::orderbook::db::matches; use crate::orderbook::db::orders; use anyhow::anyhow; @@ -96,6 +100,7 @@ impl From<&TradeParams> for TraderMatchParams { pub fn start( pool: Pool>, tx_price_feed: broadcast::Sender, + notification_sender: mpsc::Sender, ) -> (RemoteHandle>, mpsc::Sender) { let (sender, mut receiver) = mpsc::channel::(TRADING_MESSAGES_BUFFER_SIZE); @@ -110,9 +115,10 @@ pub fn start( let mut conn = pool.get()?; let authenticated_users = authenticated_users.clone(); let tx_price_feed = tx_price_feed.clone(); + let notification_sender = notification_sender.clone(); async move { let new_order = new_order_msg.new_order; - let result = process_new_order(&mut conn, tx_price_feed, new_order, new_order_msg.order_reason, &authenticated_users) + let result = process_new_order(&mut conn, tx_price_feed, new_order, new_order_msg.order_reason, &authenticated_users, notification_sender) .await; if let Err(e) = new_order_msg.sender.send(result).await { tracing::error!("Failed to send new order message! Error: {e:#}"); @@ -160,6 +166,7 @@ async fn process_new_order( new_order: NewOrder, order_reason: OrderReason, authenticated_users: &HashMap>, + notification_sender: mpsc::Sender, ) -> Result { tracing::info!(trader_id=%new_order.trader_id, "Received a new {:?} order", new_order.order_type); @@ -246,7 +253,21 @@ async fn process_new_order( } Err(e) => { tracing::warn!(%trader_id, order_id, "{e:#}"); - // todo(holzeis): send push notification to user + + if let Some(user) = user::by_id(conn, trader_id.to_string())? { + tracing::debug!(%trader_id, order_id, "Sending push notification to user"); + + if let Ok(fcm_token) = FcmToken::new(user.fcm_token) { + notification_sender + .send(Notification { + user_fcm_token: fcm_token, + notification_kind: NotificationKind::PositionExpired, + }) + .await?; + } + } else { + tracing::warn!(%trader_id, order_id, "User has no FCM token"); + } if order.order_type == OrderType::Limit { // FIXME: The maker is currently not connected to the web socket so we