Skip to content

Commit

Permalink
add logic for pagination retrieval of events
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgeantonio21 committed Oct 16, 2024
1 parent f3083cd commit 225b4bd
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 23 deletions.
2 changes: 2 additions & 0 deletions atoma-sui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
sui-sdk.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true

[dev-dependencies]
toml.workspace = true
28 changes: 25 additions & 3 deletions atoma-sui/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,28 @@ use sui_sdk::types::base_types::ObjectID;
#[derive(Debug, Deserialize, Serialize)]
pub struct SuiEventSubscriberConfig {
/// The HTTP URL for a Sui RPC node, to which the subscriber will connect
/// This is used for making HTTP requests to the Sui RPC node
http_rpc_node_addr: String,

/// The WebSocket URL for a Sui RPC node, to which the subscriber will connect
/// This is used for establishing WebSocket connections for real-time events
ws_rpc_node_addr: String,
/// The package ID on the Sui network

/// The Atoma's package ID on the Sui network
/// This identifies the specific package (smart contract) to interact with
package_id: ObjectID,

/// The timeout duration for requests
/// This sets the maximum time to wait for a response from the Sui network
request_timeout: Duration,
/// A list of small IDs (purpose may vary based on implementation)

/// Optional value to limit the number of dynamic fields to be retrieved for each iteration
/// of the event subscriber loop
limit: Option<usize>,

/// A list of node small IDs
/// These are values used to identify the Atoma's nodes that are under control by
/// current Sui wallet
small_ids: Vec<u64>,
}

Expand All @@ -29,13 +43,15 @@ impl SuiEventSubscriberConfig {
ws_rpc_node_addr: String,
package_id: ObjectID,
request_timeout: Duration,
limit: Option<usize>,
small_ids: Vec<u64>,
) -> Self {
Self {
http_rpc_node_addr,
ws_rpc_node_addr,
package_id,
request_timeout,
limit,
small_ids,
}
}
Expand All @@ -50,6 +66,11 @@ impl SuiEventSubscriberConfig {
self.ws_rpc_node_addr.clone()
}

/// Getter for `limit`
pub fn limit(&self) -> Option<usize> {
self.limit
}

/// Getter for `package_id`
pub fn package_id(&self) -> ObjectID {
self.package_id
Expand Down Expand Up @@ -116,11 +137,12 @@ pub mod tests {
.parse()
.unwrap(),
Duration::from_secs(5 * 60),
Some(10),
vec![0, 1, 2],
);

let toml_str = toml::to_string(&config).unwrap();
let should_be_toml_str = "http_url = \"\"\nws_url = \"\"\npackage_id = \"0x8d97f1cd6ac663735be08d1d2b6d02a159e711586461306ce60a2b7a6a565a9e\"\nsmall_ids = [0, 1, 2]\n\n[request_timeout]\nsecs = 300\nnanos = 0\n";
let should_be_toml_str = "http_url = \"\"\nws_url = \"\"\npackage_id = \"0x8d97f1cd6ac663735be08d1d2b6d02a159e711586461306ce60a2b7a6a565a9e\"\nlimit = 10\nsmall_ids = [0, 1, 2]\n\n[request_timeout]\nsecs = 300\nnanos = 0\n";
assert_eq!(toml_str, should_be_toml_str);
}
}
3 changes: 3 additions & 0 deletions atoma-sui/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,12 +642,15 @@ pub struct TimeoutInfo {
/// Once this reaches a threshold `MaxTicketTimeouts`, the ticket
/// will be disputed.
pub timed_out_count: u64,

/// If the settlement takes more than this, the settlement can be cut
/// short.
/// See the `try_to_settle` endpoint.
pub timeout_ms: u64,

/// Will be relevant for timeouting.
pub started_in_epoch: u64,

/// Will be relevant for timeouting.
pub started_at_epoch_timestamp_ms: u64,
}
Expand Down
72 changes: 72 additions & 0 deletions atoma-sui/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use crate::{events::AtomaEvent, subscriber::SuiEventSubscriberError};

pub type Result<T> = std::result::Result<T, SuiEventSubscriberError>;

pub(crate) fn handle_atoma_event(event: AtomaEvent) -> Result<()> {
match event {
AtomaEvent::DisputeEvent => {
println!("DisputeEvent");
}
AtomaEvent::SettledEvent => {
println!("SettledEvent");
}
AtomaEvent::PublishedEvent => {
println!("PublishedEvent");
}
AtomaEvent::NewlySampledNodesEvent => {
println!("NewlySampledNodesEvent");
}
AtomaEvent::NodeRegisteredEvent => {
println!("NodeRegisteredEvent");
}
AtomaEvent::NodeSubscribedToModelEvent => {
println!("NodeSubscribedToModelEvent");
}
AtomaEvent::NodeSubscribedToTaskEvent => {
println!("NodeSubscribedToTaskEvent");
}
AtomaEvent::NodeUnsubscribedFromTaskEvent => {
println!("NodeUnsubscribedFromTaskEvent");
}
AtomaEvent::TaskRegisteredEvent => {
println!("TaskRegisteredEvent");
}
AtomaEvent::TaskDeprecationEvent => {
println!("TaskDeprecationEvent");
}
AtomaEvent::FirstSubmissionEvent => {
println!("FirstSubmissionEvent");
}
AtomaEvent::StackCreatedEvent => {
println!("StackCreatedEvent");
}
AtomaEvent::StackTrySettleEvent => {
println!("StackTrySettleEvent");
}
AtomaEvent::NewStackSettlementAttestationEvent => {
println!("NewStackSettlementAttestationEvent");
}
AtomaEvent::StackSettlementTicketEvent => {
println!("StackSettlementTicketEvent");
}
AtomaEvent::StackSettlementTicketClaimedEvent => {
println!("StackSettlementTicketClaimedEvent");
}
AtomaEvent::StackAttestationDisputeEvent => {
println!("StackAttestationDisputeEvent");
}
AtomaEvent::TaskRemovedEvent => {
println!("TaskRemovedEvent");
}
AtomaEvent::RetrySettlementEvent => {
println!("RetrySettlementEvent");
}
AtomaEvent::Text2ImagePromptEvent => {
println!("Text2ImagePromptEvent");
}
AtomaEvent::Text2TextPromptEvent => {
println!("Text2TextPromptEvent");
}
}
Ok(())
}
1 change: 1 addition & 0 deletions atoma-sui/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod config;
pub mod events;
pub mod subscriber;
pub mod handlers;
143 changes: 123 additions & 20 deletions atoma-sui/src/subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
use sui_sdk::{rpc_types::EventFilter, types::event::EventID, SuiClient};
use crate::config::SuiEventSubscriberConfig;
use crate::{
config::SuiEventSubscriberConfig,
events::{AtomaEvent, SuiEventParseError}, handlers::handle_atoma_event,
};
use std::{path::Path, str::FromStr, time::Duration};
use sui_sdk::{
rpc_types::{EventFilter, EventPage},
types::event::EventID,
SuiClient, SuiClientBuilder,
};
use thiserror::Error;
use tracing::{error, info, info_span, instrument, trace, Span};

/// The duration to wait for new events in seconds, if there are no new events.
const DURATION_TO_WAIT_FOR_NEW_EVENTS_IN_MILLIS: u64 = 100;

type Result<T> = std::result::Result<T, SuiEventSubscriberError>;

Expand All @@ -8,44 +21,134 @@ type Result<T> = std::result::Result<T, SuiEventSubscriberError>;
/// This struct provides functionality to subscribe to and process events
/// from the Sui blockchain based on specified filters.
pub struct SuiEventSubscriber {
/// The Sui client used for interacting with the blockchain.
client: SuiClient,
/// The configuration values for the subscriber.
config: SuiEventSubscriberConfig,
/// The event filter used to specify which events to subscribe to.
filter: EventFilter,
/// The HTTP address of the RPC node.
http_rpc_node_addr: String,
/// The ID of the last processed event, used for pagination.
last_event_id: Option<EventID>,
cursor: Option<EventID>,
/// The span used to trace the events subscriber.
span: Span,
}

impl SuiEventSubscriber {
/// Constructor
pub fn new(client: SuiClient, filter: EventFilter, http_rpc_node_addr: String) -> Self {
pub fn new(config: SuiEventSubscriberConfig) -> Self {
let filter = EventFilter::Package(config.package_id());
Self {
client,
config,
filter,
http_rpc_node_addr,
last_event_id: None,
cursor: None,
span: info_span!("events-subscriber"),
}
}

/// Creates a new `SuiEventSubscriber` instance from a configuration file.
///
/// This method reads the configuration from the specified file path and initializes
/// a new `SuiEventSubscriber` with the loaded configuration.
///
/// # Arguments
///
/// * `config_path` - A path-like type that represents the location of the configuration file.
///
/// # Returns
///
/// * `Result<Self>` - A Result containing the new `SuiEventSubscriber` instance if successful,
/// or an error if the configuration couldn't be read.
///
/// # Errors
///
/// This function will return an error if:
/// * The configuration file cannot be read or parsed.
pub fn new_from_config<P: AsRef<Path>>(config_path: P) -> Result<Self> {
let config = SuiEventSubscriberConfig::from_file_path(config_path);
let filter = EventFilter::new(config.package_id());
Ok(Self::new(client, filter, config.http_rpc_node_addr()))
Ok(Self::new(config))
}

/// Builds a SuiClient based on the provided configuration.
///
/// This asynchronous method creates a new SuiClient instance using the settings
/// specified in the SuiEventSubscriberConfig. It sets up the client with the
/// configured request timeout and HTTP RPC node address.
///
/// # Arguments
///
/// * `config` - A reference to a SuiEventSubscriberConfig containing the necessary
/// configuration parameters.
///
/// # Returns
///
/// * `Result<SuiClient>` - A Result containing the newly created SuiClient if successful,
/// or a SuiEventSubscriberError if the client creation fails.
///
/// # Errors
///
/// This function will return an error if:
/// * The SuiClient cannot be built with the provided configuration.
/// * There's a network issue when connecting to the specified RPC node.
#[instrument(skip_all, fields(
http_rpc_node_addr = %config.http_rpc_node_addr()
))]
pub async fn build_client(config: &SuiEventSubscriberConfig) -> Result<SuiClient> {
let client = SuiClientBuilder::default()
.request_timeout(config.request_timeout())
.build(config.http_rpc_node_addr())
.await?;
info!("Client built successfully");
Ok(client)
}

pub fn run(&self) -> Result<(), Error> {
let events = self.client.read_events(
self.http_rpc_node_addr,
self.filter,
self.last_event_id,
)?;
#[instrument(skip_all)]
pub async fn run(mut self) -> Result<()> {
let package_id = self.config.package_id();
let limit = self.config.limit();

let client = Self::build_client(&self.config).await?;

info!("Starting to run events subscriber, for package: {package_id}");

loop {
let event_filter = self.filter.clone();
let EventPage {
data,
next_cursor,
has_next_page,
} = match client
.event_api()
.query_events(event_filter, self.cursor, limit, false)
.await
{
Ok(page) => page,
Err(e) => {
error!("Failed to read events, with error: {e}");
continue;
}
};
self.cursor = next_cursor;
for sui_event in data.iter() {
trace!("Received new event: {sui_event:#?}");
let atoma_event = AtomaEvent::from_str(&sui_event.type_.name.as_str())?;
handle_atoma_event(atoma_event)?;
}

if !has_next_page {
// No new events to read, so let's wait for a while
info!("No new events to read, so let's wait for {DURATION_TO_WAIT_FOR_NEW_EVENTS_IN_MILLIS} millis");
tokio::time::sleep(Duration::from_millis(
DURATION_TO_WAIT_FOR_NEW_EVENTS_IN_MILLIS,
))
.await;
continue;
}
}
}
}

#[derive(Debug, Error)]
pub enum SuiEventSubscriberError {
#[error("Failed to read events: {0}")]
ReadEventsError(String),
ReadEventsError(#[from] sui_sdk::error::Error),
#[error("Failed to parse event: {0}")]
SuiEventParseError(#[from] SuiEventParseError),
}

0 comments on commit 225b4bd

Please sign in to comment.