From 2e798265f73e338e29057ee3ba761150cd9f796f Mon Sep 17 00:00:00 2001 From: sslivkoff Date: Wed, 26 Jul 2023 19:06:09 +0000 Subject: [PATCH] apply foundry's formatting config --- crates/cli/src/args.rs | 17 +-- crates/cli/src/parse/args.rs | 9 +- crates/cli/src/parse/blocks.rs | 44 ++----- crates/cli/src/parse/file_output.rs | 52 +++----- crates/cli/src/parse/query.rs | 40 ++---- crates/cli/src/parse/source.rs | 6 +- crates/cli/src/run.rs | 9 +- crates/cli/src/summaries.rs | 66 +++------- crates/freeze/src/collect.rs | 13 +- crates/freeze/src/datasets/balance_diffs.rs | 12 +- crates/freeze/src/datasets/blocks.rs | 119 +++++------------- .../src/datasets/blocks_and_transactions.rs | 28 ++--- crates/freeze/src/datasets/code_diffs.rs | 12 +- crates/freeze/src/datasets/logs.rs | 40 +++--- crates/freeze/src/datasets/nonce_diffs.rs | 12 +- crates/freeze/src/datasets/state_diffs.rs | 36 ++---- crates/freeze/src/datasets/storage_diffs.rs | 12 +- crates/freeze/src/datasets/traces.rs | 39 +++--- crates/freeze/src/datasets/transactions.rs | 15 +-- crates/freeze/src/datasets/vm_traces.rs | 50 ++------ crates/freeze/src/freeze.rs | 62 +++------ crates/freeze/src/lib.rs | 3 +- crates/freeze/src/types/chunks/chunk.rs | 7 +- crates/freeze/src/types/chunks/chunk_ops.rs | 12 +- crates/freeze/src/types/chunks/mod.rs | 5 +- crates/freeze/src/types/chunks/subchunks.rs | 4 +- crates/freeze/src/types/conversions.rs | 4 +- crates/freeze/src/types/dataframes/export.rs | 11 +- crates/freeze/src/types/dataframes/sort.rs | 9 +- crates/freeze/src/types/datatypes/multi.rs | 48 ++----- crates/freeze/src/types/datatypes/scalar.rs | 34 ++--- crates/freeze/src/types/mod.rs | 33 ++--- crates/freeze/src/types/queries.rs | 18 +-- crates/freeze/src/types/schemas.rs | 41 ++---- crates/freeze/src/types/sources.rs | 28 ++--- crates/freeze/src/types/summaries.rs | 30 +---- crates/python/src/collect_adapter.rs | 6 +- crates/python/src/freeze_adapter.rs | 12 +- 38 files changed, 275 insertions(+), 723 deletions(-) diff --git a/crates/cli/src/args.rs b/crates/cli/src/args.rs index d9a3f09c..225b289b 100644 --- a/crates/cli/src/args.rs +++ b/crates/cli/src/args.rs @@ -40,8 +40,8 @@ pub struct Args { // long, // allow_hyphen_values(true), // help_heading = "Content Options", - // help = "Select by data transaction instead of by block,\ncan be a list or a file, see syntax below", - // )] + // help = "Select by data transaction instead of by block,\ncan be a list or a file, see + // syntax below", )] // pub txs: Vec, /// Columns to include alongside the default output, /// use `all` to include all available columns @@ -74,12 +74,7 @@ pub struct Args { pub network_name: Option, /// Ratelimit on requests per second - #[arg( - short('l'), - long, - value_name = "limit", - help_heading = "Acquisition Options" - )] + #[arg(short('l'), long, value_name = "limit", help_heading = "Acquisition Options")] pub requests_per_second: Option, /// Global number of concurrent requests @@ -146,11 +141,7 @@ pub struct Args { pub contract: Option, /// [logs] filter logs by topic0 - #[arg( - long, - visible_alias = "event", - help_heading = "Dataset-specific Options" - )] + #[arg(long, visible_alias = "event", help_heading = "Dataset-specific Options")] pub topic0: Option, /// [logs] filter logs by topic1 diff --git a/crates/cli/src/parse/args.rs b/crates/cli/src/parse/args.rs index fc4f7b2c..49425e67 100644 --- a/crates/cli/src/parse/args.rs +++ b/crates/cli/src/parse/args.rs @@ -1,15 +1,10 @@ use std::sync::Arc; -use cryo_freeze::FileOutput; -use cryo_freeze::MultiQuery; -use cryo_freeze::ParseError; -use cryo_freeze::Source; +use cryo_freeze::{FileOutput, MultiQuery, ParseError, Source}; use crate::args::Args; -use super::file_output; -use super::query; -use super::source; +use super::{file_output, query, source}; /// parse options for running freeze pub async fn parse_opts(args: &Args) -> Result<(MultiQuery, Source, FileOutput), ParseError> { diff --git a/crates/cli/src/parse/blocks.rs b/crates/cli/src/parse/blocks.rs index 433babfe..872cd65a 100644 --- a/crates/cli/src/parse/blocks.rs +++ b/crates/cli/src/parse/blocks.rs @@ -2,11 +2,7 @@ use std::sync::Arc; use ethers::prelude::*; -use cryo_freeze::BlockChunk; -use cryo_freeze::Chunk; -use cryo_freeze::ChunkData; -use cryo_freeze::ParseError; -use cryo_freeze::Subchunk; +use cryo_freeze::{BlockChunk, Chunk, ChunkData, ParseError, Subchunk}; use crate::args::Args; @@ -16,10 +12,7 @@ pub(crate) async fn parse_blocks( ) -> Result, ParseError> { let block_chunks = parse_block_inputs(&args.blocks, &provider).await?; let block_chunks = if args.align { - block_chunks - .into_iter() - .filter_map(|x| x.align(args.chunk_size)) - .collect() + block_chunks.into_iter().filter_map(|x| x.align(args.chunk_size)).collect() } else { block_chunks }; @@ -28,10 +21,7 @@ pub(crate) async fn parse_blocks( None => block_chunks.subchunk_by_size(&args.chunk_size), }; let block_chunks = apply_reorg_buffer(block_chunks, args.reorg_buffer, &provider).await?; - let chunks: Vec = block_chunks - .iter() - .map(|x| Chunk::Block(x.clone())) - .collect(); + let chunks: Vec = block_chunks.iter().map(|x| Chunk::Block(x.clone())).collect(); Ok(chunks) } @@ -45,9 +35,7 @@ async fn parse_block_inputs( let first_input = inputs.get(0).ok_or_else(|| { ParseError::ParseError("Failed to get the first input".to_string()) })?; - parse_block_token(first_input, true, provider) - .await - .map(|x| vec![x]) + parse_block_token(first_input, true, provider).await.map(|x| vec![x]) } _ => { let mut chunks = Vec::new(); @@ -132,19 +120,15 @@ async fn parse_block_number( provider: &Provider, ) -> Result { match (block_ref, range_position) { - ("latest", _) => provider - .get_block_number() - .await - .map(|n| n.as_u64()) - .map_err(|_e| { - ParseError::ParseError("Error retrieving latest block number".to_string()) - }), + ("latest", _) => provider.get_block_number().await.map(|n| n.as_u64()).map_err(|_e| { + ParseError::ParseError("Error retrieving latest block number".to_string()) + }), ("", RangePosition::First) => Ok(0), - ("", RangePosition::Last) => provider - .get_block_number() - .await - .map(|n| n.as_u64()) - .map_err(|_e| ParseError::ParseError("Error retrieving last block number".to_string())), + ("", RangePosition::Last) => { + provider.get_block_number().await.map(|n| n.as_u64()).map_err(|_e| { + ParseError::ParseError("Error retrieving last block number".to_string()) + }) + } ("", RangePosition::None) => Err(ParseError::ParseError("invalid input".to_string())), _ if block_ref.ends_with('B') | block_ref.ends_with('b') => { let s = &block_ref[..block_ref.len() - 1]; @@ -182,9 +166,7 @@ async fn apply_reorg_buffer( let latest_block = match provider.get_block_number().await { Ok(result) => result.as_u64(), Err(_e) => { - return Err(ParseError::ParseError( - "reorg buffer parse error".to_string(), - )) + return Err(ParseError::ParseError("reorg buffer parse error".to_string())) } }; let max_allowed = latest_block - reorg_filter; diff --git a/crates/cli/src/parse/file_output.rs b/crates/cli/src/parse/file_output.rs index ae1695b8..cb6010ed 100644 --- a/crates/cli/src/parse/file_output.rs +++ b/crates/cli/src/parse/file_output.rs @@ -2,10 +2,7 @@ use std::fs; use polars::prelude::*; -use cryo_freeze::FileFormat; -use cryo_freeze::FileOutput; -use cryo_freeze::ParseError; -use cryo_freeze::Source; +use cryo_freeze::{FileFormat, FileOutput, ParseError, Source}; use crate::args::Args; @@ -19,12 +16,7 @@ pub(crate) fn parse_file_output(args: &Args, source: &Source) -> Result {} - Err(e) => { - return Err(ParseError::ParseError(format!( - "Error creating directory: {}", - e - ))) - } + Err(e) => return Err(ParseError::ParseError(format!("Error creating directory: {}", e))), }; let file_suffix = &args.file_suffix; @@ -72,9 +64,9 @@ pub(crate) fn parse_network_name(args: &Args, chain_id: u64) -> String { pub(crate) fn parse_output_format(args: &Args) -> Result { match (args.csv, args.json) { - (true, true) => Err(ParseError::ParseError( - "choose one of parquet, csv, or json".to_string(), - )), + (true, true) => { + Err(ParseError::ParseError("choose one of parquet, csv, or json".to_string())) + } (true, _) => Ok(FileFormat::Csv), (_, true) => Ok(FileFormat::Json), (false, false) => Ok(FileFormat::Parquet), @@ -90,44 +82,30 @@ fn parse_compression(input: &Vec) -> Result match level_str.parse::() { Ok(level) => match GzipLevel::try_new(level) { Ok(gzip_level) => Ok(ParquetCompression::Gzip(Some(gzip_level))), - Err(_) => Err(ParseError::ParseError( - "Invalid compression level".to_string(), - )), + Err(_) => Err(ParseError::ParseError("Invalid compression level".to_string())), }, - Err(_) => Err(ParseError::ParseError( - "Invalid compression level".to_string(), - )), + Err(_) => Err(ParseError::ParseError("Invalid compression level".to_string())), }, [algorithm, level_str] if algorithm.as_str() == "brotli" => { match level_str.parse::() { Ok(level) => match BrotliLevel::try_new(level) { Ok(brotli_level) => Ok(ParquetCompression::Brotli(Some(brotli_level))), - Err(_) => Err(ParseError::ParseError( - "Invalid compression level".to_string(), - )), + Err(_) => Err(ParseError::ParseError("Invalid compression level".to_string())), }, - Err(_) => Err(ParseError::ParseError( - "Invalid compression level".to_string(), - )), + Err(_) => Err(ParseError::ParseError("Invalid compression level".to_string())), } } [algorithm, level_str] if algorithm.as_str() == "zstd" => match level_str.parse::() { Ok(level) => match ZstdLevel::try_new(level) { Ok(zstd_level) => Ok(ParquetCompression::Zstd(Some(zstd_level))), - Err(_) => Err(ParseError::ParseError( - "Invalid compression level".to_string(), - )), + Err(_) => Err(ParseError::ParseError("Invalid compression level".to_string())), }, - Err(_) => Err(ParseError::ParseError( - "Invalid compression level".to_string(), - )), + Err(_) => Err(ParseError::ParseError("Invalid compression level".to_string())), }, - [algorithm] if ["gzip", "brotli", "zstd"].contains(&algorithm.as_str()) => Err( - ParseError::ParseError("Missing compression level".to_string()), - ), - _ => Err(ParseError::ParseError( - "Invalid compression algorithm".to_string(), - )), + [algorithm] if ["gzip", "brotli", "zstd"].contains(&algorithm.as_str()) => { + Err(ParseError::ParseError("Missing compression level".to_string())) + } + _ => Err(ParseError::ParseError("Invalid compression algorithm".to_string())), } } diff --git a/crates/cli/src/parse/query.rs b/crates/cli/src/parse/query.rs index e9bbdbed..47d671ef 100644 --- a/crates/cli/src/parse/query.rs +++ b/crates/cli/src/parse/query.rs @@ -1,19 +1,11 @@ -use std::collections::HashMap; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use ethers::prelude::*; use hex::FromHex; -use cryo_freeze::ColumnEncoding; -use cryo_freeze::Datatype; -use cryo_freeze::FileFormat; -use cryo_freeze::MultiQuery; -use cryo_freeze::ParseError; -use cryo_freeze::RowFilter; -use cryo_freeze::Table; +use cryo_freeze::{ColumnEncoding, Datatype, FileFormat, MultiQuery, ParseError, RowFilter, Table}; -use super::blocks; -use super::file_output; +use super::{blocks, file_output}; use crate::args::Args; pub(crate) async fn parse_query( @@ -33,18 +25,11 @@ pub(crate) async fn parse_query( parse_topic(&args.topic2), parse_topic(&args.topic3), ]; - let row_filter = RowFilter { - address: contract, - topics, - }; + let row_filter = RowFilter { address: contract, topics }; let mut row_filters: HashMap = HashMap::new(); row_filters.insert(Datatype::Logs, row_filter); - let query = MultiQuery { - schemas, - chunks, - row_filters, - }; + let query = MultiQuery { schemas, chunks, row_filters }; Ok(query) } @@ -74,10 +59,7 @@ fn parse_datatypes(raw_inputs: &Vec) -> Result, ParseError "vm_traces" => Datatype::VmTraces, "opcode_traces" => Datatype::VmTraces, _ => { - return Err(ParseError::ParseError(format!( - "invalid datatype {}", - datatype - ))) + return Err(ParseError::ParseError(format!("invalid datatype {}", datatype))) } }; datatypes.push(datatype) @@ -124,14 +106,12 @@ fn parse_sort( datatypes: &Vec, ) -> Result>>, ParseError> { match raw_sort { - None => Ok(HashMap::from_iter(datatypes.iter().map(|datatype| { - (*datatype, Some(datatype.dataset().default_sort())) - }))), + None => Ok(HashMap::from_iter( + datatypes.iter().map(|datatype| (*datatype, Some(datatype.dataset().default_sort()))), + )), Some(raw_sort) => { if (raw_sort.len() == 1) && (raw_sort[0] == "none") { - Ok(HashMap::from_iter( - datatypes.iter().map(|datatype| (*datatype, None)), - )) + Ok(HashMap::from_iter(datatypes.iter().map(|datatype| (*datatype, None)))) } else if raw_sort.is_empty() { Err(ParseError::ParseError( "must specify columns to sort by, use `none` to disable sorting".to_string(), diff --git a/crates/cli/src/parse/source.rs b/crates/cli/src/parse/source.rs index 5259151a..c97aae75 100644 --- a/crates/cli/src/parse/source.rs +++ b/crates/cli/src/parse/source.rs @@ -1,13 +1,11 @@ use std::env; use ethers::prelude::*; -use governor::Quota; -use governor::RateLimiter; +use governor::{Quota, RateLimiter}; use polars::prelude::*; use std::num::NonZeroU32; -use cryo_freeze::ParseError; -use cryo_freeze::Source; +use cryo_freeze::{ParseError, Source}; use crate::args::Args; diff --git a/crates/cli/src/run.rs b/crates/cli/src/run.rs index 5bf27a1e..cafaed4b 100644 --- a/crates/cli/src/run.rs +++ b/crates/cli/src/run.rs @@ -1,10 +1,7 @@ use std::time::SystemTime; -use crate::args; -use crate::parse; -use crate::summaries; -use cryo_freeze::FreezeError; -use cryo_freeze::FreezeSummary; +use crate::{args, parse, summaries}; +use cryo_freeze::{FreezeError, FreezeSummary}; /// run freeze for given Args pub async fn run(args: args::Args) -> Result, FreezeError> { @@ -35,7 +32,7 @@ pub async fn run(args: args::Args) -> Result, FreezeError> } Err(e) => { println!("{}", e); - return Err(e); + return Err(e) } } }; diff --git a/crates/cli/src/summaries.rs b/crates/cli/src/summaries.rs index f7a5685a..902f0392 100644 --- a/crates/cli/src/summaries.rs +++ b/crates/cli/src/summaries.rs @@ -5,15 +5,9 @@ use colored::Colorize; use std::time::SystemTime; use thousands::Separable; -use cryo_freeze::BlockChunk; -use cryo_freeze::Chunk; -use cryo_freeze::ChunkData; -use cryo_freeze::Datatype; -use cryo_freeze::FileOutput; -use cryo_freeze::FreezeSummary; -use cryo_freeze::MultiQuery; -use cryo_freeze::Source; -use cryo_freeze::Table; +use cryo_freeze::{ + BlockChunk, Chunk, ChunkData, Datatype, FileOutput, FreezeSummary, MultiQuery, Source, Table, +}; const TITLE_R: u8 = 0; const TITLE_G: u8 = 225; @@ -21,9 +15,7 @@ const TITLE_B: u8 = 0; pub(crate) fn print_header>(header: A) { let header_str = header.as_ref().white().bold(); - let underline = "─" - .repeat(header_str.len()) - .truecolor(TITLE_R, TITLE_G, TITLE_B); + let underline = "─".repeat(header_str.len()).truecolor(TITLE_R, TITLE_G, TITLE_B); println!("{}", header_str); println!("{}", underline); } @@ -52,10 +44,7 @@ pub(crate) fn print_cryo_summary(query: &MultiQuery, source: &Source, sink: &Fil }) .collect(); print_block_chunks(block_chunks); - print_bullet( - "max concurrent chunks", - source.max_concurrent_chunks.separate_with_commas(), - ); + print_bullet("max concurrent chunks", source.max_concurrent_chunks.separate_with_commas()); if query.schemas.contains_key(&Datatype::Logs) { print_bullet("inner request size", source.inner_request_size.to_string()); }; @@ -97,11 +86,7 @@ fn print_schema(name: &Datatype, schema: &Table) { } println!(); if let Some(sort_cols) = schema.sort_columns.clone() { - println!( - "sorting {} by: {}", - name.dataset().name(), - sort_cols.join(", ") - ); + println!("sorting {} by: {}", name.dataset().name(), sort_cols.join(", ")); } else { println!("sorting disabled for {}", name.dataset().name()); } @@ -130,7 +115,7 @@ pub(crate) fn print_cryo_conclusion( Ok(duration) => duration, Err(_e) => { println!("error computing system time, aborting"); - return; + return } }; let seconds = duration.as_secs(); @@ -139,17 +124,10 @@ pub(crate) fn print_cryo_conclusion( print_header("collection summary"); print_bullet("total duration", duration_string); - print_bullet( - "t_start", - dt_start.format("%Y-%m-%d %H:%M:%S%.3f").to_string(), - ); + print_bullet("t_start", dt_start.format("%Y-%m-%d %H:%M:%S%.3f").to_string()); print_bullet( "t_end", - " ".to_string() - + dt_data_done - .format("%Y-%m-%d %H:%M:%S%.3f") - .to_string() - .as_str(), + " ".to_string() + dt_data_done.format("%Y-%m-%d %H:%M:%S%.3f").to_string().as_str(), ); let block_chunks: Vec = query .chunks @@ -160,21 +138,11 @@ pub(crate) fn print_cryo_conclusion( }) .collect(); let n_chunks = block_chunks.len(); - print_bullet( - "chunks errored", - freeze_summary.n_errored.separate_with_commas(), - ); - print_bullet( - "chunks skipped", - freeze_summary.n_skipped.separate_with_commas(), - ); + print_bullet("chunks errored", freeze_summary.n_errored.separate_with_commas()); + print_bullet("chunks skipped", freeze_summary.n_skipped.separate_with_commas()); print_bullet( "chunks collected", - format!( - "{} / {}", - freeze_summary.n_completed.separate_with_commas(), - n_chunks - ), + format!("{} / {}", freeze_summary.n_completed.separate_with_commas(), n_chunks), ); let total_blocks = block_chunks.size() as f64; let blocks_completed = @@ -187,14 +155,8 @@ pub(crate) fn print_cryo_conclusion( let blocks_per_day = blocks_per_hour * 24.0; print_bullet("blocks per second", format_float(blocks_per_second)); print_bullet("blocks per minute", format_float(blocks_per_minute)); - print_bullet( - "blocks per hour", - " ".to_string() + format_float(blocks_per_hour).as_str(), - ); - print_bullet( - "blocks per day", - " ".to_string() + format_float(blocks_per_day).as_str(), - ); + print_bullet("blocks per hour", " ".to_string() + format_float(blocks_per_hour).as_str()); + print_bullet("blocks per day", " ".to_string() + format_float(blocks_per_day).as_str()); } fn format_float(number: f64) -> String { diff --git a/crates/freeze/src/collect.rs b/crates/freeze/src/collect.rs index 6b2627a1..4dc68524 100644 --- a/crates/freeze/src/collect.rs +++ b/crates/freeze/src/collect.rs @@ -2,22 +2,13 @@ use std::collections::HashMap; use polars::prelude::*; -use crate::types::Chunk; -use crate::types::CollectError; -use crate::types::Datatype; -use crate::types::MultiQuery; -use crate::types::SingleQuery; -use crate::types::Source; +use crate::types::{Chunk, CollectError, Datatype, MultiQuery, SingleQuery, Source}; /// collect data and return as dataframe pub async fn collect(query: SingleQuery, source: Source) -> Result { let chunk: Chunk = query.chunks.into(); let filter = query.row_filter.as_ref(); - query - .datatype - .dataset() - .collect_chunk(&chunk, &source, &query.schema, filter) - .await + query.datatype.dataset().collect_chunk(&chunk, &source, &query.schema, filter).await } /// collect data and return as dataframe diff --git a/crates/freeze/src/datasets/balance_diffs.rs b/crates/freeze/src/datasets/balance_diffs.rs index d97b2265..f9eb2696 100644 --- a/crates/freeze/src/datasets/balance_diffs.rs +++ b/crates/freeze/src/datasets/balance_diffs.rs @@ -3,15 +3,9 @@ use std::collections::HashMap; use polars::prelude::*; use super::state_diffs; -use crate::types::BalanceDiffs; -use crate::types::BlockChunk; -use crate::types::CollectError; -use crate::types::ColumnType; -use crate::types::Dataset; -use crate::types::Datatype; -use crate::types::RowFilter; -use crate::types::Source; -use crate::types::Table; +use crate::types::{ + BalanceDiffs, BlockChunk, CollectError, ColumnType, Dataset, Datatype, RowFilter, Source, Table, +}; #[async_trait::async_trait] impl Dataset for BalanceDiffs { diff --git a/crates/freeze/src/datasets/blocks.rs b/crates/freeze/src/datasets/blocks.rs index 5eb69122..6b12b152 100644 --- a/crates/freeze/src/datasets/blocks.rs +++ b/crates/freeze/src/datasets/blocks.rs @@ -1,25 +1,17 @@ -use std::collections::HashMap; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use ethers::prelude::*; use polars::prelude::*; -use tokio::sync::mpsc; -use tokio::task; +use tokio::{sync::mpsc, task}; -use crate::dataframes::SortableDataFrame; -use crate::types::conversions::ToVecHex; -use crate::types::conversions::ToVecU8; -use crate::types::BlockChunk; -use crate::types::Blocks; -use crate::types::CollectError; -use crate::types::ColumnType; -use crate::types::Dataset; -use crate::types::Datatype; -use crate::types::RowFilter; -use crate::types::Source; -use crate::types::Table; -use crate::with_series; -use crate::with_series_binary; +use crate::{ + dataframes::SortableDataFrame, + types::{ + conversions::{ToVecHex, ToVecU8}, + BlockChunk, Blocks, CollectError, ColumnType, Dataset, Datatype, RowFilter, Source, Table, + }, + with_series, with_series_binary, +}; #[async_trait::async_trait] impl Dataset for Blocks { @@ -53,15 +45,7 @@ impl Dataset for Blocks { } fn default_columns(&self) -> Vec<&'static str> { - vec![ - "number", - "hash", - "timestamp", - "author", - "gas_used", - "extra_data", - "base_fee_per_gas", - ] + vec!["number", "hash", "timestamp", "author", "gas_used", "extra_data", "base_fee_per_gas"] } fn default_sort(&self) -> Vec { @@ -104,10 +88,7 @@ async fn fetch_blocks( if let Some(limiter) = rate_limiter { Arc::clone(&limiter).until_ready().await; } - let block = provider - .get_block(number) - .await - .map_err(CollectError::ProviderError); + let block = provider.get_block(number).await.map_err(CollectError::ProviderError); match tx.send(block).await { Ok(_) => {} Err(tokio::sync::mpsc::error::SendError(_e)) => { @@ -143,11 +124,8 @@ pub(crate) async fn blocks_to_dfs( chain_id: u64, ) -> Result<(Option, Option), CollectError> { // initialize - let mut block_columns = if blocks_schema.is_none() { - BlockColumns::new(0) - } else { - BlockColumns::new(100) - }; + let mut block_columns = + if blocks_schema.is_none() { BlockColumns::new(0) } else { BlockColumns::new(100) }; let mut transaction_columns = if transactions_schema.is_none() { TransactionColumns::new(0) } else { @@ -174,11 +152,11 @@ pub(crate) async fn blocks_to_dfs( // _ => return Err(CollectError::TooManyRequestsError), Err(e) => { println!("{:?}", e); - return Err(CollectError::TooManyRequestsError); + return Err(CollectError::TooManyRequestsError) } Ok(None) => { println!("NONE"); - return Err(CollectError::TooManyRequestsError); + return Err(CollectError::TooManyRequestsError) } } } @@ -258,9 +236,7 @@ impl BlockColumns { cols.push(Series::new("chain_id", vec![chain_id; n_rows as usize])); } - DataFrame::new(cols) - .map_err(CollectError::PolarsError) - .sort_by_schema(schema) + DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema) } } @@ -317,21 +293,14 @@ impl TransactionColumns { with_series!(cols, "gas_limit", self.gas_limit, schema); with_series!(cols, "gas_price", self.gas_price, schema); with_series!(cols, "transaction_type", self.transaction_type, schema); - with_series!( - cols, - "max_priority_fee_per_gas", - self.max_priority_fee_per_gas, - schema - ); + with_series!(cols, "max_priority_fee_per_gas", self.max_priority_fee_per_gas, schema); with_series!(cols, "max_fee_per_gas", self.max_fee_per_gas, schema); if schema.has_column("chain_id") { cols.push(Series::new("chain_id", vec![chain_id; n_rows])); } - DataFrame::new(cols) - .map_err(CollectError::PolarsError) - .sort_by_schema(schema) + DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema) } } @@ -343,9 +312,7 @@ fn process_block(block: &Block, schema: &Table, columns: &mut BlockColum } } if schema.has_column("parent_hash") { - columns - .parent_hash - .push(block.parent_hash.as_bytes().to_vec()); + columns.parent_hash.push(block.parent_hash.as_bytes().to_vec()); } if schema.has_column("author") { match block.author { @@ -354,19 +321,13 @@ fn process_block(block: &Block, schema: &Table, columns: &mut BlockColum } } if schema.has_column("state_root") { - columns - .state_root - .push(block.state_root.as_bytes().to_vec()); + columns.state_root.push(block.state_root.as_bytes().to_vec()); } if schema.has_column("transactions_root") { - columns - .transactions_root - .push(block.transactions_root.as_bytes().to_vec()); + columns.transactions_root.push(block.transactions_root.as_bytes().to_vec()); } if schema.has_column("receipts_root") { - columns - .receipts_root - .push(block.receipts_root.as_bytes().to_vec()); + columns.receipts_root.push(block.receipts_root.as_bytes().to_vec()); } if schema.has_column("number") { match block.number { @@ -381,25 +342,19 @@ fn process_block(block: &Block, schema: &Table, columns: &mut BlockColum columns.extra_data.push(block.extra_data.to_vec()); } if schema.has_column("logs_bloom") { - columns - .logs_bloom - .push(block.logs_bloom.map(|x| x.0.to_vec())); + columns.logs_bloom.push(block.logs_bloom.map(|x| x.0.to_vec())); } if schema.has_column("timestamp") { columns.timestamp.push(block.timestamp.as_u32()); } if schema.has_column("total_difficulty") { - columns - .total_difficulty - .push(block.total_difficulty.map(|x| x.to_vec_u8())); + columns.total_difficulty.push(block.total_difficulty.map(|x| x.to_vec_u8())); } if schema.has_column("size") { columns.size.push(block.size.map(|x| x.as_u32())); } if schema.has_column("base_fee_per_gas") { - columns - .base_fee_per_gas - .push(block.base_fee_per_gas.map(|value| value.as_u64())); + columns.base_fee_per_gas.push(block.base_fee_per_gas.map(|value| value.as_u64())); } } @@ -412,9 +367,9 @@ fn process_transaction(tx: &Transaction, schema: &Table, columns: &mut Transacti } if schema.has_column("transaction_index") { match tx.transaction_index { - Some(transaction_index) => columns - .transaction_index - .push(Some(transaction_index.as_u64())), + Some(transaction_index) => { + columns.transaction_index.push(Some(transaction_index.as_u64())) + } None => columns.transaction_index.push(None), } } @@ -426,9 +381,7 @@ fn process_transaction(tx: &Transaction, schema: &Table, columns: &mut Transacti } if schema.has_column("to_address") { match tx.to { - Some(to_address) => columns - .to_address - .push(Some(to_address.as_bytes().to_vec())), + Some(to_address) => columns.to_address.push(Some(to_address.as_bytes().to_vec())), None => columns.to_address.push(None), } } @@ -445,14 +398,10 @@ fn process_transaction(tx: &Transaction, schema: &Table, columns: &mut Transacti columns.gas_limit.push(tx.gas.as_u32()); } if schema.has_column("gas_price") { - columns - .gas_price - .push(tx.gas_price.map(|gas_price| gas_price.as_u64())); + columns.gas_price.push(tx.gas_price.map(|gas_price| gas_price.as_u64())); } if schema.has_column("transaction_type") { - columns - .transaction_type - .push(tx.transaction_type.map(|value| value.as_u32())); + columns.transaction_type.push(tx.transaction_type.map(|value| value.as_u32())); } if schema.has_column("max_priority_fee_per_gas") { columns @@ -460,8 +409,6 @@ fn process_transaction(tx: &Transaction, schema: &Table, columns: &mut Transacti .push(tx.max_priority_fee_per_gas.map(|value| value.as_u64())); } if schema.has_column("max_fee_per_gas") { - columns - .max_fee_per_gas - .push(tx.max_fee_per_gas.map(|value| value.as_u64())); + columns.max_fee_per_gas.push(tx.max_fee_per_gas.map(|value| value.as_u64())); } } diff --git a/crates/freeze/src/datasets/blocks_and_transactions.rs b/crates/freeze/src/datasets/blocks_and_transactions.rs index 8436969b..c884c675 100644 --- a/crates/freeze/src/datasets/blocks_and_transactions.rs +++ b/crates/freeze/src/datasets/blocks_and_transactions.rs @@ -1,20 +1,14 @@ -use std::collections::HashMap; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use ethers::prelude::*; use polars::prelude::*; -use tokio::sync::mpsc; -use tokio::task; +use tokio::{sync::mpsc, task}; use super::blocks; -use crate::types::BlockChunk; -use crate::types::BlocksAndTransactions; -use crate::types::CollectError; -use crate::types::Datatype; -use crate::types::MultiDataset; -use crate::types::RowFilter; -use crate::types::Source; -use crate::types::Table; +use crate::types::{ + BlockChunk, BlocksAndTransactions, CollectError, Datatype, MultiDataset, RowFilter, Source, + Table, +}; #[async_trait::async_trait] impl MultiDataset for BlocksAndTransactions { @@ -23,9 +17,7 @@ impl MultiDataset for BlocksAndTransactions { } fn datatypes(&self) -> HashSet { - [Datatype::Blocks, Datatype::Transactions] - .into_iter() - .collect() + [Datatype::Blocks, Datatype::Transactions].into_iter().collect() } async fn collect_block_chunk( @@ -75,10 +67,8 @@ pub(crate) async fn fetch_blocks_and_transactions( if let Some(limiter) = rate_limiter { Arc::clone(&limiter).until_ready().await; } - let block = provider - .get_block_with_txs(number) - .await - .map_err(CollectError::ProviderError); + let block = + provider.get_block_with_txs(number).await.map_err(CollectError::ProviderError); match tx.send(block).await { Ok(_) => {} Err(tokio::sync::mpsc::error::SendError(_e)) => { diff --git a/crates/freeze/src/datasets/code_diffs.rs b/crates/freeze/src/datasets/code_diffs.rs index a38d792a..d055a8df 100644 --- a/crates/freeze/src/datasets/code_diffs.rs +++ b/crates/freeze/src/datasets/code_diffs.rs @@ -3,15 +3,9 @@ use std::collections::HashMap; use polars::prelude::*; use super::state_diffs; -use crate::types::BlockChunk; -use crate::types::CodeDiffs; -use crate::types::CollectError; -use crate::types::ColumnType; -use crate::types::Dataset; -use crate::types::Datatype; -use crate::types::RowFilter; -use crate::types::Source; -use crate::types::Table; +use crate::types::{ + BlockChunk, CodeDiffs, CollectError, ColumnType, Dataset, Datatype, RowFilter, Source, Table, +}; #[async_trait::async_trait] impl Dataset for CodeDiffs { diff --git a/crates/freeze/src/datasets/logs.rs b/crates/freeze/src/datasets/logs.rs index b0b5b95a..08188516 100644 --- a/crates/freeze/src/datasets/logs.rs +++ b/crates/freeze/src/datasets/logs.rs @@ -1,24 +1,17 @@ -use std::collections::HashMap; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use ethers::prelude::*; use polars::prelude::*; -use tokio::sync::mpsc; -use tokio::task; +use tokio::{sync::mpsc, task}; -use crate::dataframes::SortableDataFrame; -use crate::types::conversions::ToVecHex; -use crate::types::BlockChunk; -use crate::types::CollectError; -use crate::types::ColumnType; -use crate::types::Dataset; -use crate::types::Datatype; -use crate::types::Logs; -use crate::types::RowFilter; -use crate::types::Source; -use crate::types::Table; -use crate::with_series; -use crate::with_series_binary; +use crate::{ + dataframes::SortableDataFrame, + types::{ + conversions::ToVecHex, BlockChunk, CollectError, ColumnType, Dataset, Datatype, Logs, + RowFilter, Source, Table, + }, + with_series, with_series_binary, +}; #[async_trait::async_trait] impl Dataset for Logs { @@ -109,10 +102,7 @@ async fn fetch_logs( if let Some(limiter) = rate_limiter { Arc::clone(&limiter).until_ready().await; } - let result = provider - .get_logs(&log_filter) - .await - .map_err(CollectError::ProviderError); + let result = provider.get_logs(&log_filter).await.map_err(CollectError::ProviderError); match tx.send(result).await { Ok(_) => {} Err(tokio::sync::mpsc::error::SendError(_e)) => { @@ -148,7 +138,7 @@ async fn logs_to_df( Ok(logs) => { for log in logs.iter() { if let Some(true) = log.removed { - continue; + continue } if let (Some(bn), Some(tx), Some(ti), Some(li)) = ( log.block_number, @@ -190,7 +180,7 @@ async fn logs_to_df( topic3.push(Some(log.topics[3].as_bytes().to_vec())); } _ => { - return Err(CollectError::InvalidNumberOfTopics); + return Err(CollectError::InvalidNumberOfTopics) } } data.push(log.data.clone().to_vec()); @@ -221,7 +211,5 @@ async fn logs_to_df( cols.push(Series::new("chain_id", vec![chain_id; n_rows])); } - DataFrame::new(cols) - .map_err(CollectError::PolarsError) - .sort_by_schema(schema) + DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema) } diff --git a/crates/freeze/src/datasets/nonce_diffs.rs b/crates/freeze/src/datasets/nonce_diffs.rs index 26d8f4ac..c0d6e9dd 100644 --- a/crates/freeze/src/datasets/nonce_diffs.rs +++ b/crates/freeze/src/datasets/nonce_diffs.rs @@ -3,15 +3,9 @@ use std::collections::HashMap; use polars::prelude::*; use super::state_diffs; -use crate::types::BlockChunk; -use crate::types::CollectError; -use crate::types::ColumnType; -use crate::types::Dataset; -use crate::types::Datatype; -use crate::types::NonceDiffs; -use crate::types::RowFilter; -use crate::types::Source; -use crate::types::Table; +use crate::types::{ + BlockChunk, CollectError, ColumnType, Dataset, Datatype, NonceDiffs, RowFilter, Source, Table, +}; #[async_trait::async_trait] impl Dataset for NonceDiffs { diff --git a/crates/freeze/src/datasets/state_diffs.rs b/crates/freeze/src/datasets/state_diffs.rs index 338e3daa..84e1ce34 100644 --- a/crates/freeze/src/datasets/state_diffs.rs +++ b/crates/freeze/src/datasets/state_diffs.rs @@ -1,24 +1,17 @@ -use std::collections::HashMap; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use ethers::prelude::*; use polars::prelude::*; use tokio::sync::mpsc; -use crate::dataframes::SortableDataFrame; -use crate::types::conversions::ToVecHex; -use crate::types::BlockChunk; -use crate::types::ChunkData; -use crate::types::CollectError; -use crate::types::ColumnType; -use crate::types::Datatype; -use crate::types::MultiDataset; -use crate::types::RowFilter; -use crate::types::Source; -use crate::types::StateDiffs; -use crate::types::Table; -use crate::with_series; -use crate::with_series_binary; +use crate::{ + dataframes::SortableDataFrame, + types::{ + conversions::ToVecHex, BlockChunk, ChunkData, CollectError, ColumnType, Datatype, + MultiDataset, RowFilter, Source, StateDiffs, Table, + }, + with_series, with_series_binary, +}; #[async_trait::async_trait] impl MultiDataset for StateDiffs { @@ -27,14 +20,9 @@ impl MultiDataset for StateDiffs { } fn datatypes(&self) -> HashSet { - [ - Datatype::BalanceDiffs, - Datatype::CodeDiffs, - Datatype::NonceDiffs, - Datatype::StorageDiffs, - ] - .into_iter() - .collect() + [Datatype::BalanceDiffs, Datatype::CodeDiffs, Datatype::NonceDiffs, Datatype::StorageDiffs] + .into_iter() + .collect() } async fn collect_block_chunk( diff --git a/crates/freeze/src/datasets/storage_diffs.rs b/crates/freeze/src/datasets/storage_diffs.rs index 49436aaf..2c04ac9f 100644 --- a/crates/freeze/src/datasets/storage_diffs.rs +++ b/crates/freeze/src/datasets/storage_diffs.rs @@ -3,15 +3,9 @@ use std::collections::HashMap; use polars::prelude::*; use super::state_diffs; -use crate::types::BlockChunk; -use crate::types::CollectError; -use crate::types::ColumnType; -use crate::types::Dataset; -use crate::types::Datatype; -use crate::types::RowFilter; -use crate::types::Source; -use crate::types::StorageDiffs; -use crate::types::Table; +use crate::types::{ + BlockChunk, CollectError, ColumnType, Dataset, Datatype, RowFilter, Source, StorageDiffs, Table, +}; #[async_trait::async_trait] impl Dataset for StorageDiffs { diff --git a/crates/freeze/src/datasets/traces.rs b/crates/freeze/src/datasets/traces.rs index 1f60f452..aa9aa207 100644 --- a/crates/freeze/src/datasets/traces.rs +++ b/crates/freeze/src/datasets/traces.rs @@ -1,24 +1,17 @@ -use std::collections::HashMap; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use ethers::prelude::*; use polars::prelude::*; -use tokio::sync::mpsc; -use tokio::task; +use tokio::{sync::mpsc, task}; -use crate::dataframes::SortableDataFrame; -use crate::types::conversions::ToVecHex; -use crate::types::BlockChunk; -use crate::types::CollectError; -use crate::types::ColumnType; -use crate::types::Dataset; -use crate::types::Datatype; -use crate::types::RowFilter; -use crate::types::Source; -use crate::types::Table; -use crate::types::Traces; -use crate::with_series; -use crate::with_series_binary; +use crate::{ + dataframes::SortableDataFrame, + types::{ + conversions::ToVecHex, BlockChunk, CollectError, ColumnType, Dataset, Datatype, RowFilter, + Source, Table, Traces, + }, + with_series, with_series_binary, +}; #[async_trait::async_trait] impl Dataset for Traces { @@ -82,10 +75,7 @@ impl Dataset for Traces { } fn default_sort(&self) -> Vec { - vec![ - "block_number".to_string(), - "transaction_position".to_string(), - ] + vec!["block_number".to_string(), "transaction_position".to_string()] } async fn collect_block_chunk( @@ -227,7 +217,8 @@ async fn traces_to_df( // value: value, // gas: gas, // input: input, - // call_type: action_call_type, [None, Call, CallCode, DelegateCall, StaticCall] + // call_type: action_call_type, [None, Call, CallCode, DelegateCall, + // StaticCall] // // Create // from: from, @@ -469,7 +460,5 @@ async fn traces_to_df( cols.push(Series::new("chain_id", vec![chain_id; n_rows])); } - DataFrame::new(cols) - .map_err(CollectError::PolarsError) - .sort_by_schema(schema) + DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema) } diff --git a/crates/freeze/src/datasets/transactions.rs b/crates/freeze/src/datasets/transactions.rs index 5e45db7c..e20fad02 100644 --- a/crates/freeze/src/datasets/transactions.rs +++ b/crates/freeze/src/datasets/transactions.rs @@ -2,17 +2,10 @@ use std::collections::HashMap; use polars::prelude::*; -use super::blocks; -use super::blocks_and_transactions; -use crate::types::BlockChunk; -use crate::types::CollectError; -use crate::types::ColumnType; -use crate::types::Dataset; -use crate::types::Datatype; -use crate::types::RowFilter; -use crate::types::Source; -use crate::types::Table; -use crate::types::Transactions; +use super::{blocks, blocks_and_transactions}; +use crate::types::{ + BlockChunk, CollectError, ColumnType, Dataset, Datatype, RowFilter, Source, Table, Transactions, +}; #[async_trait::async_trait] impl Dataset for Transactions { diff --git a/crates/freeze/src/datasets/vm_traces.rs b/crates/freeze/src/datasets/vm_traces.rs index 905c53d7..26c29703 100644 --- a/crates/freeze/src/datasets/vm_traces.rs +++ b/crates/freeze/src/datasets/vm_traces.rs @@ -4,21 +4,15 @@ use ethers::prelude::*; use polars::prelude::*; use tokio::sync::mpsc; -use crate::dataframes::SortableDataFrame; -use crate::datasets::state_diffs; -use crate::types::conversions::ToVecHex; -use crate::types::BlockChunk; -use crate::types::CollectError; -use crate::types::ColumnType; -use crate::types::Dataset; -use crate::types::Datatype; -use crate::types::RowFilter; -use crate::types::Source; -use crate::types::Table; -use crate::types::ToVecU8; -use crate::types::VmTraces; -use crate::with_series; -use crate::with_series_binary; +use crate::{ + dataframes::SortableDataFrame, + datasets::state_diffs, + types::{ + conversions::ToVecHex, BlockChunk, CollectError, ColumnType, Dataset, Datatype, RowFilter, + Source, Table, ToVecU8, VmTraces, + }, + with_series, with_series_binary, +}; #[async_trait::async_trait] impl Dataset for VmTraces { @@ -48,22 +42,11 @@ impl Dataset for VmTraces { } fn default_columns(&self) -> Vec<&'static str> { - vec![ - "block_number", - "transaction_position", - "pc", - "cost", - "used", - "op", - ] + vec!["block_number", "transaction_position", "pc", "cost", "used", "op"] } fn default_sort(&self) -> Vec { - vec![ - "block_number".to_string(), - "transaction_position".to_string(), - "used".to_string(), - ] + vec!["block_number".to_string(), "transaction_position".to_string(), "used".to_string()] } async fn collect_block_chunk( @@ -137,12 +120,7 @@ async fn vm_traces_to_df( let mut cols = Vec::new(); with_series!(cols, "block_number", columns.block_number, schema); - with_series!( - cols, - "transaction_position", - columns.transaction_position, - schema - ); + with_series!(cols, "transaction_position", columns.transaction_position, schema); with_series!(cols, "pc", columns.pc, schema); with_series!(cols, "cost", columns.cost, schema); with_series!(cols, "used", columns.used, schema); @@ -157,9 +135,7 @@ async fn vm_traces_to_df( cols.push(Series::new("chain_id", vec![chain_id; columns.n_rows])); }; - DataFrame::new(cols) - .map_err(CollectError::PolarsError) - .sort_by_schema(schema) + DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema) } fn add_ops( diff --git a/crates/freeze/src/freeze.rs b/crates/freeze/src/freeze.rs index c8319c41..1838a9e4 100644 --- a/crates/freeze/src/freeze.rs +++ b/crates/freeze/src/freeze.rs @@ -1,22 +1,13 @@ -use std::collections::HashMap; -use std::path::Path; -use std::sync::Arc; +use std::{collections::HashMap, path::Path, sync::Arc}; use futures::future::join_all; use indicatif::ProgressBar; use tokio::sync::Semaphore; -use crate::types::dataframes; -use crate::types::Chunk; -use crate::types::Datatype; -use crate::types::FileOutput; -use crate::types::FreezeChunkSummary; -use crate::types::FreezeError; -use crate::types::FreezeSummary; -use crate::types::FreezeSummaryAgg; -use crate::types::MultiDatatype; -use crate::types::MultiQuery; -use crate::types::Source; +use crate::types::{ + dataframes, Chunk, Datatype, FileOutput, FreezeChunkSummary, FreezeError, FreezeSummary, + FreezeSummaryAgg, MultiDatatype, MultiQuery, Source, +}; /// perform a bulk data extraction of multiple datatypes over multiple block chunks pub async fn freeze( @@ -69,34 +60,20 @@ pub async fn freeze( tasks.push(task) } } - let chunk_summaries: Vec = join_all(tasks) - .await - .into_iter() - .filter_map(Result::ok) - .collect(); + let chunk_summaries: Vec = + join_all(tasks).await.into_iter().filter_map(Result::ok).collect(); Ok(chunk_summaries.aggregate()) } fn cluster_datatypes(dts: Vec<&Datatype>) -> (Vec, Vec) { let mdts: Vec = MultiDatatype::variants() .iter() - .filter(|mdt| { - mdt.multi_dataset() - .datatypes() - .iter() - .all(|x| dts.contains(&x)) - }) + .filter(|mdt| mdt.multi_dataset().datatypes().iter().all(|x| dts.contains(&x))) .cloned() .collect(); - let mdt_dts: Vec = mdts - .iter() - .flat_map(|mdt| mdt.multi_dataset().datatypes()) - .collect(); - let other_dts = dts - .iter() - .filter(|dt| !mdt_dts.contains(dt)) - .map(|x| **x) - .collect(); + let mdt_dts: Vec = + mdts.iter().flat_map(|mdt| mdt.multi_dataset().datatypes()).collect(); + let other_dts = dts.iter().filter(|dt| !mdt_dts.contains(dt)).map(|x| **x).collect(); (other_dts, mdts) } @@ -122,7 +99,7 @@ async fn freeze_datatype_chunk( // skip path if file already exists if Path::new(&path).exists() && !sink.overwrite { - return FreezeChunkSummary::skip(paths); + return FreezeChunkSummary::skip(paths) } // collect data @@ -130,20 +107,19 @@ async fn freeze_datatype_chunk( Some(schema) => schema, _ => return FreezeChunkSummary::error(paths), }; - let collect_output = ds - .collect_chunk(&chunk, &source, schema, query.row_filters.get(&datatype)) - .await; + let collect_output = + ds.collect_chunk(&chunk, &source, schema, query.row_filters.get(&datatype)).await; let mut df = match collect_output { Err(_e) => { println!("chunk failed: {:?}", _e); - return FreezeChunkSummary::error(paths); + return FreezeChunkSummary::error(paths) } Ok(df) => df, }; // write data if let Err(_e) = dataframes::df_to_file(&mut df, &path, &sink) { - return FreezeChunkSummary::error(paths); + return FreezeChunkSummary::error(paths) } bar.inc(1); @@ -172,7 +148,7 @@ async fn freeze_multi_datatype_chunk( // skip path if file already exists if paths.values().all(|path| Path::new(&path).exists()) && !sink.overwrite { - return FreezeChunkSummary::skip(paths); + return FreezeChunkSummary::skip(paths) } // collect data @@ -183,14 +159,14 @@ async fn freeze_multi_datatype_chunk( let mut dfs = match collect_result { Err(_e) => { println!("chunk failed: {:?}", _e); - return FreezeChunkSummary::error(paths); + return FreezeChunkSummary::error(paths) } Ok(dfs) => dfs, }; // write data if let Err(_e) = dataframes::dfs_to_files(&mut dfs, &paths, &sink) { - return FreezeChunkSummary::error(paths); + return FreezeChunkSummary::error(paths) } bar.inc(1); diff --git a/crates/freeze/src/lib.rs b/crates/freeze/src/lib.rs index ef3e5a27..a9be83c9 100644 --- a/crates/freeze/src/lib.rs +++ b/crates/freeze/src/lib.rs @@ -12,7 +12,6 @@ mod datasets; mod freeze; mod types; -pub use collect::collect; -pub use collect::collect_multiple; +pub use collect::{collect, collect_multiple}; pub use freeze::freeze; pub use types::*; diff --git a/crates/freeze/src/types/chunks/chunk.rs b/crates/freeze/src/types/chunks/chunk.rs index de6b6eb8..f4b57301 100644 --- a/crates/freeze/src/types/chunks/chunk.rs +++ b/crates/freeze/src/types/chunks/chunk.rs @@ -1,9 +1,6 @@ -use crate::types::FileError; -use crate::types::FileOutput; +use crate::types::{FileError, FileOutput}; -use super::binary_chunk::BinaryChunk; -use super::chunk_ops::ChunkData; -use super::number_chunk::NumberChunk; +use super::{binary_chunk::BinaryChunk, chunk_ops::ChunkData, number_chunk::NumberChunk}; /// block chunk pub type BlockChunk = NumberChunk; diff --git a/crates/freeze/src/types/chunks/chunk_ops.rs b/crates/freeze/src/types/chunks/chunk_ops.rs index 773ffb64..1f094ee3 100644 --- a/crates/freeze/src/types/chunks/chunk_ops.rs +++ b/crates/freeze/src/types/chunks/chunk_ops.rs @@ -1,6 +1,4 @@ -use crate::ChunkError; -use crate::FileError; -use crate::FileOutput; +use crate::{ChunkError, FileError, FileOutput}; /// Trait for common chunk methods pub trait ChunkData: Sized { @@ -22,11 +20,9 @@ pub trait ChunkData: Sized { /// convert chunk to string representation fn stub(&self) -> Result { match (self.min_value(), self.max_value()) { - (Some(min), Some(max)) => Ok(format!( - "{}_to_{}", - Self::format_item(min), - Self::format_item(max), - )), + (Some(min), Some(max)) => { + Ok(format!("{}_to_{}", Self::format_item(min), Self::format_item(max),)) + } _ => Err(ChunkError::InvalidChunk), } } diff --git a/crates/freeze/src/types/chunks/mod.rs b/crates/freeze/src/types/chunks/mod.rs index 3ce425fa..dd8a3b47 100644 --- a/crates/freeze/src/types/chunks/mod.rs +++ b/crates/freeze/src/types/chunks/mod.rs @@ -4,9 +4,6 @@ pub(crate) mod chunk_ops; pub(crate) mod number_chunk; pub(crate) mod subchunks; -pub use chunk::AddressChunk; -pub use chunk::BlockChunk; -pub use chunk::Chunk; -pub use chunk::TransactionChunk; +pub use chunk::{AddressChunk, BlockChunk, Chunk, TransactionChunk}; pub use chunk_ops::ChunkData; pub use subchunks::Subchunk; diff --git a/crates/freeze/src/types/chunks/subchunks.rs b/crates/freeze/src/types/chunks/subchunks.rs index 94b7b600..d3355294 100644 --- a/crates/freeze/src/types/chunks/subchunks.rs +++ b/crates/freeze/src/types/chunks/subchunks.rs @@ -1,6 +1,4 @@ -use super::chunk::BlockChunk; -use super::chunk_ops::ChunkData; -use super::number_chunk::range_to_chunks; +use super::{chunk::BlockChunk, chunk_ops::ChunkData, number_chunk::range_to_chunks}; /// Aggregation operations related to chunks pub trait Subchunk { diff --git a/crates/freeze/src/types/conversions.rs b/crates/freeze/src/types/conversions.rs index 0eb4a7ec..9fbea194 100644 --- a/crates/freeze/src/types/conversions.rs +++ b/crates/freeze/src/types/conversions.rs @@ -61,8 +61,6 @@ impl ToVecHex for Vec>> { type Output = Vec>; fn to_vec_hex(&self) -> Self::Output { - self.iter() - .map(|opt| opt.as_ref().map(|v| prefix_hex::encode(v.clone()))) - .collect() + self.iter().map(|opt| opt.as_ref().map(|v| prefix_hex::encode(v.clone()))).collect() } } diff --git a/crates/freeze/src/types/dataframes/export.rs b/crates/freeze/src/types/dataframes/export.rs index a21ebc78..a2ba38dc 100644 --- a/crates/freeze/src/types/dataframes/export.rs +++ b/crates/freeze/src/types/dataframes/export.rs @@ -2,8 +2,7 @@ use std::collections::HashMap; use polars::prelude::*; -use crate::types::FileError; -use crate::types::FileOutput; +use crate::types::{FileError, FileOutput}; pub(crate) fn dfs_to_files( dfs: &mut HashMap, @@ -18,9 +17,7 @@ where let filename = match filenames.get(name) { Some(filename) => filename, None => { - return Err(FileError::NoFilePathError( - "no path given for dataframe".to_string(), - )) + return Err(FileError::NoFilePathError("no path given for dataframe".to_string())) } }; df_to_file(df, filename, file_output)? @@ -79,9 +76,7 @@ fn df_to_csv(df: &mut DataFrame, filename: &str) -> Result<(), FileError> { /// write polars dataframe to json file fn df_to_json(df: &mut DataFrame, filename: &str) -> Result<(), FileError> { let file = std::fs::File::create(filename).map_err(|_e| FileError::FileWriteError)?; - let result = JsonWriter::new(file) - .with_json_format(JsonFormat::Json) - .finish(df); + let result = JsonWriter::new(file).with_json_format(JsonFormat::Json).finish(df); match result { Err(_e) => Err(FileError::FileWriteError), _ => Ok(()), diff --git a/crates/freeze/src/types/dataframes/sort.rs b/crates/freeze/src/types/dataframes/sort.rs index ba125b5d..d68fa0e7 100644 --- a/crates/freeze/src/types/dataframes/sort.rs +++ b/crates/freeze/src/types/dataframes/sort.rs @@ -1,7 +1,6 @@ use polars::prelude::*; -use crate::types::CollectError; -use crate::types::Table; +use crate::types::{CollectError, Table}; pub(crate) trait SortableDataFrame { fn sort_by_schema(self, schema: &Table) -> Self; @@ -10,9 +9,9 @@ pub(crate) trait SortableDataFrame { impl SortableDataFrame for Result { fn sort_by_schema(self, schema: &Table) -> Self { match (self, &schema.sort_columns) { - (Ok(df), Some(sort_columns)) => df - .sort(sort_columns, false) - .map_err(CollectError::PolarsError), + (Ok(df), Some(sort_columns)) => { + df.sort(sort_columns, false).map_err(CollectError::PolarsError) + } (df, _) => df, } } diff --git a/crates/freeze/src/types/datatypes/multi.rs b/crates/freeze/src/types/datatypes/multi.rs index 9660d00e..018fdfe0 100644 --- a/crates/freeze/src/types/datatypes/multi.rs +++ b/crates/freeze/src/types/datatypes/multi.rs @@ -1,19 +1,12 @@ -use std::collections::HashMap; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use async_trait; use polars::prelude::*; -use crate::types::AddressChunk; -use crate::types::BlockChunk; -use crate::types::Chunk; -use crate::types::CollectError; -use crate::types::Dataset; -use crate::types::Datatype; -use crate::types::RowFilter; -use crate::types::Source; -use crate::types::Table; -use crate::types::TransactionChunk; +use crate::types::{ + AddressChunk, BlockChunk, Chunk, CollectError, Dataset, Datatype, RowFilter, Source, Table, + TransactionChunk, +}; /// Blocks and Transactions datasets pub struct BlocksAndTransactions; @@ -34,10 +27,7 @@ pub enum MultiDatatype { impl MultiDatatype { /// return all variants of multi datatype pub fn variants() -> Vec { - vec![ - MultiDatatype::BlocksAndTransactions, - MultiDatatype::StateDiffs, - ] + vec![MultiDatatype::BlocksAndTransactions, MultiDatatype::StateDiffs] } /// return MultiDataset corresponding to MultiDatatype @@ -60,10 +50,7 @@ pub trait MultiDataset: Sync + Send { /// return Datasets associated with MultiDataset fn datasets(&self) -> HashMap> { - self.datatypes() - .iter() - .map(|dt| (*dt, dt.dataset())) - .collect() + self.datatypes().iter().map(|dt| (*dt, dt.dataset())).collect() } /// collect dataset for a particular chunk @@ -75,17 +62,12 @@ pub trait MultiDataset: Sync + Send { filter: HashMap, ) -> Result, CollectError> { match chunk { - Chunk::Block(chunk) => { - self.collect_block_chunk(chunk, source, schemas, filter) - .await - } + Chunk::Block(chunk) => self.collect_block_chunk(chunk, source, schemas, filter).await, Chunk::Transaction(chunk) => { - self.collect_transaction_chunk(chunk, source, schemas, filter) - .await + self.collect_transaction_chunk(chunk, source, schemas, filter).await } Chunk::Address(chunk) => { - self.collect_address_chunk(chunk, source, schemas, filter) - .await + self.collect_address_chunk(chunk, source, schemas, filter).await } } } @@ -109,10 +91,7 @@ pub trait MultiDataset: Sync + Send { _schemas: HashMap, _filter: HashMap, ) -> Result, CollectError> { - panic!( - "transaction_chunk collection not implemented for {}", - self.name() - ) + panic!("transaction_chunk collection not implemented for {}", self.name()) } /// collect dataset for a particular transaction chunk @@ -123,9 +102,6 @@ pub trait MultiDataset: Sync + Send { _schemas: HashMap, _filter: HashMap, ) -> Result, CollectError> { - panic!( - "transaction_chunk collection not implemented for {}", - self.name() - ) + panic!("transaction_chunk collection not implemented for {}", self.name()) } } diff --git a/crates/freeze/src/types/datatypes/scalar.rs b/crates/freeze/src/types/datatypes/scalar.rs index 15a8c5c1..b8310369 100644 --- a/crates/freeze/src/types/datatypes/scalar.rs +++ b/crates/freeze/src/types/datatypes/scalar.rs @@ -3,15 +3,10 @@ use std::collections::HashMap; use async_trait; use polars::prelude::*; -use crate::types::AddressChunk; -use crate::types::BlockChunk; -use crate::types::Chunk; -use crate::types::CollectError; -use crate::types::ColumnType; -use crate::types::RowFilter; -use crate::types::Source; -use crate::types::Table; -use crate::types::TransactionChunk; +use crate::types::{ + AddressChunk, BlockChunk, Chunk, CollectError, ColumnType, RowFilter, Source, Table, + TransactionChunk, +}; /// Balance Diffs Dataset pub struct BalanceDiffs; @@ -101,17 +96,12 @@ pub trait Dataset: Sync + Send { filter: Option<&RowFilter>, ) -> Result { match chunk { - Chunk::Block(chunk) => { - self.collect_block_chunk(chunk, source, schema, filter) - .await - } + Chunk::Block(chunk) => self.collect_block_chunk(chunk, source, schema, filter).await, Chunk::Transaction(chunk) => { - self.collect_transaction_chunk(chunk, source, schema, filter) - .await + self.collect_transaction_chunk(chunk, source, schema, filter).await } Chunk::Address(chunk) => { - self.collect_address_chunk(chunk, source, schema, filter) - .await + self.collect_address_chunk(chunk, source, schema, filter).await } } } @@ -135,10 +125,7 @@ pub trait Dataset: Sync + Send { _schema: &Table, _filter: Option<&RowFilter>, ) -> Result { - panic!( - "transaction_chunk collection not implemented for {}", - self.name() - ) + panic!("transaction_chunk collection not implemented for {}", self.name()) } /// collect dataset for a particular transaction chunk @@ -149,9 +136,6 @@ pub trait Dataset: Sync + Send { _schema: &Table, _filter: Option<&RowFilter>, ) -> Result { - panic!( - "transaction_chunk collection not implemented for {}", - self.name() - ) + panic!("transaction_chunk collection not implemented for {}", self.name()) } } diff --git a/crates/freeze/src/types/mod.rs b/crates/freeze/src/types/mod.rs index 3fa832bc..0d369f8e 100644 --- a/crates/freeze/src/types/mod.rs +++ b/crates/freeze/src/types/mod.rs @@ -24,31 +24,14 @@ pub mod schemas; /// types related to summaries pub mod summaries; -pub use chunks::AddressChunk; -pub use chunks::BlockChunk; -pub use chunks::Chunk; -pub use chunks::ChunkData; -pub use chunks::Subchunk; -pub use chunks::TransactionChunk; -pub use conversions::ToVecHex; -pub use conversions::ToVecU8; +pub use chunks::{AddressChunk, BlockChunk, Chunk, ChunkData, Subchunk, TransactionChunk}; +pub use conversions::{ToVecHex, ToVecU8}; pub use datatypes::*; -pub use files::ColumnEncoding; -pub use files::FileFormat; -pub use files::FileOutput; -pub use queries::MultiQuery; -pub use queries::RowFilter; -pub use queries::SingleQuery; -pub use schemas::ColumnType; -pub use schemas::Table; -pub use sources::RateLimiter; -pub use sources::Source; -pub use summaries::FreezeChunkSummary; -pub use summaries::FreezeSummary; +pub use files::{ColumnEncoding, FileFormat, FileOutput}; +pub use queries::{MultiQuery, RowFilter, SingleQuery}; +pub use schemas::{ColumnType, Table}; +pub use sources::{RateLimiter, Source}; pub(crate) use summaries::FreezeSummaryAgg; +pub use summaries::{FreezeChunkSummary, FreezeSummary}; -pub use errors::ChunkError; -pub use errors::CollectError; -pub use errors::FileError; -pub use errors::FreezeError; -pub use errors::ParseError; +pub use errors::{ChunkError, CollectError, FileError, FreezeError, ParseError}; diff --git a/crates/freeze/src/types/queries.rs b/crates/freeze/src/types/queries.rs index 9a089fb2..ec545401 100644 --- a/crates/freeze/src/types/queries.rs +++ b/crates/freeze/src/types/queries.rs @@ -2,9 +2,7 @@ use std::collections::HashMap; use ethers::prelude::*; -use crate::types::Chunk; -use crate::types::Datatype; -use crate::types::Table; +use crate::types::{Chunk, Datatype, Table}; /// Query multiple data types #[derive(Clone)] @@ -44,21 +42,13 @@ impl From for SingleQuery { let (datatype, schema) = match query.schemas.len() { 0 => panic!("bad query, needs 1 datatype"), 1 => { - let datatype_schema = query - .schemas - .iter() - .next() - .expect("Expected at least one schema"); + let datatype_schema = + query.schemas.iter().next().expect("Expected at least one schema"); (*datatype_schema.0, datatype_schema.1.clone()) } _ => panic!("bad query, needs 1 datatype"), }; let row_filter = query.row_filters.get(&datatype).cloned(); - SingleQuery { - datatype, - schema, - chunks: query.chunks, - row_filter, - } + SingleQuery { datatype, schema, chunks: query.chunks, row_filter } } } diff --git a/crates/freeze/src/types/schemas.rs b/crates/freeze/src/types/schemas.rs index 42e0861c..9f7c0d98 100644 --- a/crates/freeze/src/types/schemas.rs +++ b/crates/freeze/src/types/schemas.rs @@ -3,8 +3,7 @@ use std::collections::HashSet; use indexmap::IndexMap; use thiserror::Error; -use crate::types::ColumnEncoding; -use crate::types::Datatype; +use crate::types::{ColumnEncoding, Datatype}; /// Schema for a particular table #[derive(Clone, Debug, Eq, PartialEq)] @@ -95,28 +94,17 @@ impl Datatype { ) -> Result { let column_types = self.dataset().column_types(); let default_columns = self.dataset().default_columns(); - let used_columns = compute_used_columns( - default_columns, - include_columns, - exclude_columns, - columns, - self, - ); + let used_columns = + compute_used_columns(default_columns, include_columns, exclude_columns, columns, self); let mut columns = IndexMap::new(); for column in used_columns { - let mut ctype = column_types - .get(column.as_str()) - .ok_or(SchemaError::InvalidColumn)?; + let mut ctype = column_types.get(column.as_str()).ok_or(SchemaError::InvalidColumn)?; if (*binary_column_format == ColumnEncoding::Hex) & (ctype == &ColumnType::Binary) { ctype = &ColumnType::Hex; } columns.insert((*column.clone()).to_string(), *ctype); } - let schema = Table { - datatype: *self, - sort_columns: sort, - columns, - }; + let schema = Table { datatype: *self, sort_columns: sort, columns }; Ok(schema) } } @@ -130,30 +118,17 @@ fn compute_used_columns( ) -> Vec { match (columns, include_columns, exclude_columns) { (Some(columns), _, _) if ((columns.len() == 1) & columns.contains(&"all".to_string())) => { - datatype - .dataset() - .column_types() - .keys() - .map(|k| k.to_string()) - .collect() + datatype.dataset().column_types().keys().map(|k| k.to_string()).collect() } (Some(columns), _, _) => columns.iter().map(|x| x.to_string()).collect(), (_, Some(include), _) if ((include.len() == 1) & include.contains(&"all".to_string())) => { - datatype - .dataset() - .column_types() - .keys() - .map(|k| k.to_string()) - .collect() + datatype.dataset().column_types().keys().map(|k| k.to_string()).collect() } (_, Some(include), Some(exclude)) => { let include_set: HashSet<_> = include.iter().collect(); let exclude_set: HashSet<_> = exclude.iter().collect(); let intersection: HashSet<_> = include_set.intersection(&exclude_set).collect(); - assert!( - intersection.is_empty(), - "include and exclude should be non-overlapping" - ); + assert!(intersection.is_empty(), "include and exclude should be non-overlapping"); include.to_vec() } (_, Some(include), None) => include.to_vec(), diff --git a/crates/freeze/src/types/sources.rs b/crates/freeze/src/types/sources.rs index 4071d1a5..9ac414b7 100644 --- a/crates/freeze/src/types/sources.rs +++ b/crates/freeze/src/types/sources.rs @@ -1,10 +1,11 @@ use std::sync::Arc; use ethers::prelude::*; -use governor::clock::DefaultClock; -use governor::middleware::NoOpMiddleware; -use governor::state::direct::NotKeyed; -use governor::state::InMemoryState; +use governor::{ + clock::DefaultClock, + middleware::NoOpMiddleware, + state::{direct::NotKeyed, InMemoryState}, +}; use tokio::sync::Semaphore; /// RateLimiter based on governor crate @@ -102,21 +103,10 @@ pub struct Source { // Some(chain_id), // Some(inner_request_size), // Some(max_concurrent_chunks), -// ) = ( -// self.provider, -// self.semaphore, -// self.chain_id, -// self.inner_request_size, -// self.max_concurrent_chunks, -// ) { -// Ok(Source { -// provider, -// semaphore, -// rate_limiter: self.rate_limiter, -// chain_id, -// inner_request_size, -// max_concurrent_chunks, -// }) +// ) = ( self.provider, self.semaphore, self.chain_id, self.inner_request_size, +// self.max_concurrent_chunks, +// ) { Ok(Source { provider, semaphore, rate_limiter: self.rate_limiter, chain_id, +// inner_request_size, max_concurrent_chunks, }) // } else { // Err("Cannot build Source. Missing fields.") // } diff --git a/crates/freeze/src/types/summaries.rs b/crates/freeze/src/types/summaries.rs index 78add650..6cfe0add 100644 --- a/crates/freeze/src/types/summaries.rs +++ b/crates/freeze/src/types/summaries.rs @@ -33,19 +33,11 @@ impl FreezeSummaryAgg for Vec { n_completed += 1; } for (datatype, path) in chunk_summary.paths { - paths_by_type - .entry(datatype) - .or_insert_with(Vec::new) - .push(path); + paths_by_type.entry(datatype).or_insert_with(Vec::new).push(path); } } - FreezeSummary { - n_completed, - n_skipped, - n_errored, - paths_by_type, - } + FreezeSummary { n_completed, n_skipped, n_errored, paths_by_type } } } @@ -61,26 +53,14 @@ pub struct FreezeChunkSummary { impl FreezeChunkSummary { pub(crate) fn success(paths: HashMap) -> FreezeChunkSummary { - FreezeChunkSummary { - skipped: false, - errored: false, - paths, - } + FreezeChunkSummary { skipped: false, errored: false, paths } } pub(crate) fn error(paths: HashMap) -> FreezeChunkSummary { - FreezeChunkSummary { - skipped: false, - errored: true, - paths, - } + FreezeChunkSummary { skipped: false, errored: true, paths } } pub(crate) fn skip(paths: HashMap) -> FreezeChunkSummary { - FreezeChunkSummary { - skipped: true, - errored: false, - paths, - } + FreezeChunkSummary { skipped: true, errored: false, paths } } } diff --git a/crates/python/src/collect_adapter.rs b/crates/python/src/collect_adapter.rs index 3fe8a789..89afcc81 100644 --- a/crates/python/src/collect_adapter.rs +++ b/crates/python/src/collect_adapter.rs @@ -1,10 +1,8 @@ use polars::prelude::*; -use pyo3::exceptions::PyTypeError; -use pyo3::prelude::*; +use pyo3::{exceptions::PyTypeError, prelude::*}; use pyo3_polars::PyDataFrame; -use cryo_cli::parse_opts; -use cryo_cli::Args; +use cryo_cli::{parse_opts, Args}; use cryo_freeze::collect; #[pyfunction( diff --git a/crates/python/src/freeze_adapter.rs b/crates/python/src/freeze_adapter.rs index b798c611..38ce2017 100644 --- a/crates/python/src/freeze_adapter.rs +++ b/crates/python/src/freeze_adapter.rs @@ -1,10 +1,10 @@ -use pyo3::exceptions::PyTypeError; -use pyo3::prelude::*; -use pyo3::types::IntoPyDict; -use pyo3::types::PyDict; +use pyo3::{ + exceptions::PyTypeError, + prelude::*, + types::{IntoPyDict, PyDict}, +}; -use cryo_cli::run; -use cryo_cli::Args; +use cryo_cli::{run, Args}; #[pyfunction( signature = (