Skip to content

Commit

Permalink
add --txs support for traces
Browse files Browse the repository at this point in the history
  • Loading branch information
sslivkoff committed Aug 8, 2023
1 parent 36d742c commit d8352b5
Showing 1 changed file with 67 additions and 3 deletions.
70 changes: 67 additions & 3 deletions crates/freeze/src/datasets/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
dataframes::SortableDataFrame,
types::{
conversions::ToVecHex, BlockChunk, CollectError, ColumnType, Dataset, Datatype, RowFilter,
Source, Table, Traces,
Source, Table, Traces, TransactionChunk,
},
with_series, with_series_binary,
};
Expand Down Expand Up @@ -85,12 +85,23 @@ impl Dataset for Traces {
schema: &Table,
_filter: Option<&RowFilter>,
) -> Result<DataFrame, CollectError> {
let rx = fetch_traces(chunk, source).await;
let rx = fetch_block_traces(chunk, source).await;
traces_to_df(rx, schema, source.chain_id).await
}

async fn collect_transaction_chunk(
&self,
chunk: &TransactionChunk,
source: &Source,
schema: &Table,
_filter: Option<&RowFilter>,
) -> Result<DataFrame, CollectError> {
let rx = fetch_transaction_traces(chunk, source).await;
traces_to_df(rx, schema, source.chain_id).await
}
}

async fn fetch_traces(
async fn fetch_block_traces(
block_chunk: &BlockChunk,
source: &Source,
) -> mpsc::Receiver<Result<Vec<Trace>, CollectError>> {
Expand Down Expand Up @@ -125,6 +136,59 @@ async fn fetch_traces(
rx
}

async fn fetch_transaction_traces(
transaction_chunk: &TransactionChunk,
source: &Source,
) -> mpsc::Receiver<Result<Vec<Trace>, CollectError>> {
match transaction_chunk {
TransactionChunk::Values(tx_hashes) => {
let (tx, rx) = mpsc::channel(tx_hashes.len());
for tx_hash in tx_hashes.iter() {
let tx_hash = tx_hash.clone();
let tx = tx.clone();
let provider = source.provider.clone();
let semaphore = source.semaphore.clone();
let rate_limiter = source.rate_limiter.as_ref().map(Arc::clone);
task::spawn(async move {
let _permit = match semaphore {
Some(semaphore) => Some(Arc::clone(&semaphore).acquire_owned().await),
_ => None,
};
if let Some(limiter) = rate_limiter {
Arc::clone(&limiter).until_ready().await;
}
let result = provider
.trace_transaction(H256::from_slice(&tx_hash))
.await
.map_err(CollectError::ProviderError);
match tx.send(result).await {
Ok(_) => {}
Err(tokio::sync::mpsc::error::SendError(_e)) => {
eprintln!("send error, try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests");
std::process::exit(1)
}
}
});
}
rx
}
_ => {
let (tx, rx) = mpsc::channel(1);
let result = Err(CollectError::CollectError(
"transaction value ranges not supported".to_string(),
));
match tx.send(result).await {
Ok(_) => {}
Err(tokio::sync::mpsc::error::SendError(_e)) => {
eprintln!("send error, try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests");
std::process::exit(1)
}
}
rx
}
}
}

fn reward_type_to_string(reward_type: &RewardType) -> String {
match reward_type {
RewardType::Block => "reward".to_string(),
Expand Down

0 comments on commit d8352b5

Please sign in to comment.