diff --git a/crates/cli/src/args.rs b/crates/cli/src/args.rs index 400ca8a0..fb4464f4 100644 --- a/crates/cli/src/args.rs +++ b/crates/cli/src/args.rs @@ -78,6 +78,14 @@ pub struct Args { #[arg(short('l'), long, value_name = "limit", help_heading = "Acquisition Options")] pub requests_per_second: Option, + /// Specify max retries on provider errors + #[arg(long, default_value_t = 5, value_name = "R", help_heading = "Acquisition Options")] + pub max_retries: u32, + + /// Specify initial backoff for retry strategy (ms) + #[arg(long, default_value_t = 500, value_name = "B", help_heading = "Acquisition Options")] + pub initial_backoff: u64, + /// Global number of concurrent requests #[arg(long, value_name = "M", help_heading = "Acquisition Options")] pub max_concurrent_requests: Option, diff --git a/crates/cli/src/parse/source.rs b/crates/cli/src/parse/source.rs index 63116e7b..10cbd0a8 100644 --- a/crates/cli/src/parse/source.rs +++ b/crates/cli/src/parse/source.rs @@ -12,8 +12,9 @@ use crate::args::Args; pub(crate) async fn parse_source(args: &Args) -> Result { // parse network info let rpc_url = parse_rpc_url(args); - let provider = Provider::::try_from(rpc_url) - .map_err(|_e| ParseError::ParseError("could not connect to provider".to_string()))?; + let provider = + Provider::>::new_client(&rpc_url, args.max_retries, args.initial_backoff) + .map_err(|_e| ParseError::ParseError("could not connect to provider".to_string()))?; let chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64(); let rate_limiter = match args.requests_per_second { diff --git a/crates/freeze/src/datasets/traces.rs b/crates/freeze/src/datasets/traces.rs index de37930e..a85039d6 100644 --- a/crates/freeze/src/datasets/traces.rs +++ b/crates/freeze/src/datasets/traces.rs @@ -112,6 +112,7 @@ pub(crate) fn fetch_block_traces( let fetcher = source.fetcher.clone(); task::spawn(async move { let result = fetcher.trace_block(BlockNumber::Number(number.into())).await; + match tx.send(result).await { Ok(_) => {} Err(tokio::sync::mpsc::error::SendError(_e)) => { diff --git a/crates/freeze/src/types/sources.rs b/crates/freeze/src/types/sources.rs index 015e494d..39891c9a 100644 --- a/crates/freeze/src/types/sources.rs +++ b/crates/freeze/src/types/sources.rs @@ -17,7 +17,7 @@ pub type RateLimiter = governor::RateLimiter>, + pub fetcher: Arc>>, /// chain_id of network pub chain_id: u64, /// number of blocks per log request diff --git a/crates/python/src/collect_adapter.rs b/crates/python/src/collect_adapter.rs index 7451b20e..c4933825 100644 --- a/crates/python/src/collect_adapter.rs +++ b/crates/python/src/collect_adapter.rs @@ -24,6 +24,8 @@ use cryo_freeze::collect; requests_per_second = None, max_concurrent_requests = None, max_concurrent_chunks = None, + max_retries = 10, + initial_backoff = 500, dry = false, chunk_size = 1000, n_chunks = None, @@ -67,6 +69,8 @@ pub fn _collect( requests_per_second: Option, max_concurrent_requests: Option, max_concurrent_chunks: Option, + max_retries: u32, + initial_backoff: u64, dry: bool, chunk_size: u64, n_chunks: Option, @@ -107,6 +111,8 @@ pub fn _collect( requests_per_second, max_concurrent_requests, max_concurrent_chunks, + max_retries, + initial_backoff, dry, chunk_size, n_chunks, diff --git a/crates/python/src/freeze_adapter.rs b/crates/python/src/freeze_adapter.rs index bb0291cd..41e5815c 100644 --- a/crates/python/src/freeze_adapter.rs +++ b/crates/python/src/freeze_adapter.rs @@ -25,6 +25,8 @@ use cryo_cli::{run, Args}; requests_per_second = None, max_concurrent_requests = None, max_concurrent_chunks = None, + max_retries = 10, + initial_backoff = 500, dry = false, chunk_size = 1000, n_chunks = None, @@ -68,6 +70,8 @@ pub fn _freeze( requests_per_second: Option, max_concurrent_requests: Option, max_concurrent_chunks: Option, + max_retries: u32, + initial_backoff: u64, dry: bool, chunk_size: u64, n_chunks: Option, @@ -108,6 +112,8 @@ pub fn _freeze( requests_per_second, max_concurrent_requests, max_concurrent_chunks, + max_retries, + initial_backoff, dry, chunk_size, n_chunks,