Skip to content

Commit

Permalink
feat: Send a notification prompt when executing AsyncTrade
Browse files Browse the repository at this point in the history
When the other party is not online, send a notification that a position has expired.
  • Loading branch information
klochowicz committed Sep 18, 2023
1 parent db313e8 commit 33a7016
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 8 deletions.
10 changes: 7 additions & 3 deletions coordinator/src/bin/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,14 @@ async fn main() -> Result<()> {
}
});

let notification_service = NotificationService::new(opts.fcm_api_key);

let (tx_price_feed, _rx) = broadcast::channel(100);
let (_handle, trading_sender) = trading::start(pool.clone(), tx_price_feed.clone());
let (_handle, trading_sender) = trading::start(
pool.clone(),
tx_price_feed.clone(),
notification_service.get_sender(),
);

tokio::spawn({
let node = node.clone();
Expand Down Expand Up @@ -239,8 +245,6 @@ async fn main() -> Result<()> {
tx_price_feed,
);

let notification_service = NotificationService::new(opts.fcm_api_key);

tokio::spawn({
let sender = notification_service.get_sender();
let pool = pool.clone();
Expand Down
16 changes: 13 additions & 3 deletions coordinator/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,18 @@ fn build_notification<'a>(kind: NotificationKind) -> fcm::Notification<'a> {
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct FcmToken(pub String);
pub struct FcmToken(String);

impl FcmToken {
pub fn new(token: String) -> Result<Self> {
anyhow::ensure!(!token.is_empty(), "FCM token cannot be empty");
Ok(Self(token))
}

pub fn get(&self) -> &str {
&self.0
}
}

impl Display for FcmToken {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -140,7 +151,7 @@ async fn send_notification<'a>(
) -> Result<()> {
anyhow::ensure!(!api_key.is_empty(), "FCM API key is empty");

let mut message_builder = fcm::MessageBuilder::new(api_key, &fcm_token.0);
let mut message_builder = fcm::MessageBuilder::new(api_key, fcm_token.get());
message_builder.notification(notification);
let message = message_builder.finalize();
let response = client
Expand Down Expand Up @@ -353,4 +364,3 @@ pub mod tests {
assert_eq!(notification.user_fcm_token, position_just_expired.1);
}
}

25 changes: 23 additions & 2 deletions coordinator/src/orderbook/trading.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use crate::db::user;
use crate::notifications::FcmToken;
use crate::notifications::Notification;
use crate::notifications::NotificationKind;
use crate::orderbook::db::matches;
use crate::orderbook::db::orders;
use anyhow::anyhow;
Expand Down Expand Up @@ -96,6 +100,7 @@ impl From<&TradeParams> for TraderMatchParams {
pub fn start(
pool: Pool<ConnectionManager<PgConnection>>,
tx_price_feed: broadcast::Sender<OrderbookMsg>,
notification_sender: mpsc::Sender<Notification>,
) -> (RemoteHandle<Result<()>>, mpsc::Sender<TradingMessage>) {
let (sender, mut receiver) = mpsc::channel::<TradingMessage>(TRADING_MESSAGES_BUFFER_SIZE);

Expand All @@ -110,9 +115,10 @@ pub fn start(
let mut conn = pool.get()?;
let authenticated_users = authenticated_users.clone();
let tx_price_feed = tx_price_feed.clone();
let notification_sender = notification_sender.clone();
async move {
let new_order = new_order_msg.new_order;
let result = process_new_order(&mut conn, tx_price_feed, new_order, new_order_msg.order_reason, &authenticated_users)
let result = process_new_order(&mut conn, tx_price_feed, new_order, new_order_msg.order_reason, &authenticated_users, notification_sender)
.await;
if let Err(e) = new_order_msg.sender.send(result).await {
tracing::error!("Failed to send new order message! Error: {e:#}");
Expand Down Expand Up @@ -160,6 +166,7 @@ async fn process_new_order(
new_order: NewOrder,
order_reason: OrderReason,
authenticated_users: &HashMap<PublicKey, mpsc::Sender<OrderbookMsg>>,
notification_sender: mpsc::Sender<Notification>,
) -> Result<Order> {
tracing::info!(trader_id=%new_order.trader_id, "Received a new {:?} order", new_order.order_type);

Expand Down Expand Up @@ -246,7 +253,21 @@ async fn process_new_order(
}
Err(e) => {
tracing::warn!(%trader_id, order_id, "{e:#}");
// todo(holzeis): send push notification to user

if let Some(user) = user::by_id(conn, trader_id.to_string())? {
tracing::debug!(%trader_id, order_id, "Sending push notification to user");

if let Ok(fcm_token) = FcmToken::new(user.fcm_token) {
notification_sender
.send(Notification {
user_fcm_token: fcm_token,
notification_kind: NotificationKind::PositionExpired,
})
.await?;
}
} else {
tracing::warn!(%trader_id, order_id, "User has no FCM token");
}

if order.order_type == OrderType::Limit {
// FIXME: The maker is currently not connected to the web socket so we
Expand Down

0 comments on commit 33a7016

Please sign in to comment.