From cc8a37dedbef89bc7ed8ac1d8c8627c6afaf7787 Mon Sep 17 00:00:00 2001 From: Martin Stefcek Date: Thu, 22 Aug 2024 13:44:44 +0400 Subject: [PATCH] change discard to forward --- atoma-event-subscribe/sui/src/main.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/atoma-event-subscribe/sui/src/main.rs b/atoma-event-subscribe/sui/src/main.rs index f3e14cba..f6b9ad2c 100644 --- a/atoma-event-subscribe/sui/src/main.rs +++ b/atoma-event-subscribe/sui/src/main.rs @@ -1,8 +1,10 @@ use std::time::Duration; use atoma_sui::subscriber::{SuiSubscriber, SuiSubscriberError}; +use atoma_types::InputSource; use clap::Parser; use sui_sdk::types::base_types::ObjectID; +use tokio::sync::oneshot; use tracing::{error, info}; #[derive(Debug, Parser)] @@ -28,12 +30,20 @@ async fn main() -> Result<(), SuiSubscriberError> { let ws_url = args.ws_addr; let (event_sender, mut event_receiver) = tokio::sync::mpsc::channel(32); - let (input_manager_tx, mut input_manager_rx) = tokio::sync::mpsc::channel(32); + let (input_manager_tx, mut input_manager_rx) = + tokio::sync::mpsc::channel::<(InputSource, oneshot::Sender)>(32); // Spawn a task to discard messages tokio::spawn(async move { - while let Some(_msg) = input_manager_rx.recv().await { - // Discard the message + while let Some((input_source, oneshot)) = input_manager_rx.recv().await { + info!("Received input from source: {:?}", input_source); + let data = match input_source { + InputSource::Firebase { request_id } => request_id, + InputSource::Raw { prompt } => prompt, + }; + if let Err(err) = oneshot.send(data) { + error!("Failed to send response: {:?}", err); + } } });