diff --git a/crates/freeze/src/datasets/traces.rs b/crates/freeze/src/datasets/traces.rs index 322939de..5e803708 100644 --- a/crates/freeze/src/datasets/traces.rs +++ b/crates/freeze/src/datasets/traces.rs @@ -8,7 +8,7 @@ use crate::{ dataframes::SortableDataFrame, types::{ conversions::ToVecHex, BlockChunk, CollectError, ColumnType, Dataset, Datatype, RowFilter, - Source, Table, Traces, + Source, Table, Traces, TransactionChunk, }, with_series, with_series_binary, }; @@ -85,12 +85,23 @@ impl Dataset for Traces { schema: &Table, _filter: Option<&RowFilter>, ) -> Result { - let rx = fetch_traces(chunk, source).await; + let rx = fetch_block_traces(chunk, source).await; + traces_to_df(rx, schema, source.chain_id).await + } + + async fn collect_transaction_chunk( + &self, + chunk: &TransactionChunk, + source: &Source, + schema: &Table, + _filter: Option<&RowFilter>, + ) -> Result { + let rx = fetch_transaction_traces(chunk, source).await; traces_to_df(rx, schema, source.chain_id).await } } -async fn fetch_traces( +async fn fetch_block_traces( block_chunk: &BlockChunk, source: &Source, ) -> mpsc::Receiver, CollectError>> { @@ -125,6 +136,59 @@ async fn fetch_traces( rx } +async fn fetch_transaction_traces( + transaction_chunk: &TransactionChunk, + source: &Source, +) -> mpsc::Receiver, CollectError>> { + match transaction_chunk { + TransactionChunk::Values(tx_hashes) => { + let (tx, rx) = mpsc::channel(tx_hashes.len()); + for tx_hash in tx_hashes.iter() { + let tx_hash = tx_hash.clone(); + let tx = tx.clone(); + let provider = source.provider.clone(); + let semaphore = source.semaphore.clone(); + let rate_limiter = source.rate_limiter.as_ref().map(Arc::clone); + task::spawn(async move { + let _permit = match semaphore { + Some(semaphore) => Some(Arc::clone(&semaphore).acquire_owned().await), + _ => None, + }; + if let Some(limiter) = rate_limiter { + Arc::clone(&limiter).until_ready().await; + } + let result = provider + .trace_transaction(H256::from_slice(&tx_hash)) + .await + .map_err(CollectError::ProviderError); + match tx.send(result).await { + Ok(_) => {} + Err(tokio::sync::mpsc::error::SendError(_e)) => { + eprintln!("send error, try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests"); + std::process::exit(1) + } + } + }); + } + rx + } + _ => { + let (tx, rx) = mpsc::channel(1); + let result = Err(CollectError::CollectError( + "transaction value ranges not supported".to_string(), + )); + match tx.send(result).await { + Ok(_) => {} + Err(tokio::sync::mpsc::error::SendError(_e)) => { + eprintln!("send error, try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests"); + std::process::exit(1) + } + } + rx + } + } +} + fn reward_type_to_string(reward_type: &RewardType) -> String { match reward_type { RewardType::Block => "reward".to_string(),