diff --git a/crates/freeze/src/datasets/blocks.rs b/crates/freeze/src/datasets/blocks.rs index f1adcda2..26b69e79 100644 --- a/crates/freeze/src/datasets/blocks.rs +++ b/crates/freeze/src/datasets/blocks.rs @@ -199,12 +199,9 @@ pub(crate) async fn blocks_to_dfs( let mut transaction_columns = TransactionColumns::default(); // parse stream of blocks - let mut n_blocks = 0; - let mut n_txs = 0; while let Some(message) = blocks.recv().await { match message { Ok((block, gas_used)) => { - n_blocks += 1; if let Some(schema) = blocks_schema { block_columns.process_block(&block, schema) } @@ -212,13 +209,11 @@ pub(crate) async fn blocks_to_dfs( match gas_used { Some(gas_used) => { for (tx, gas_used) in block.transactions.iter().zip(gas_used) { - n_txs += 1; tx.process(schema, &mut transaction_columns, Some(gas_used)) } } None => { for tx in block.transactions.iter() { - n_txs += 1; tx.process(schema, &mut transaction_columns, None) } } @@ -234,11 +229,11 @@ pub(crate) async fn blocks_to_dfs( // convert to dataframes let blocks_df = match blocks_schema { - Some(schema) => Some(block_columns.create_df(schema, chain_id, n_blocks)?), + Some(schema) => Some(block_columns.create_df(schema, chain_id)?), None => None, }; let transactions_df = match transactions_schema { - Some(schema) => Some(transaction_columns.create_df(schema, chain_id, n_txs)?), + Some(schema) => Some(transaction_columns.create_df(schema, chain_id)?), None => None, }; Ok((blocks_df, transactions_df)) @@ -246,6 +241,7 @@ pub(crate) async fn blocks_to_dfs( #[derive(Default)] struct BlockColumns { + n_rows: usize, hash: Vec>, parent_hash: Vec>, author: Vec>, @@ -264,6 +260,7 @@ struct BlockColumns { impl BlockColumns { fn process_block(&mut self, block: &Block, schema: &Table) { + self.n_rows += 1; if schema.has_column("hash") { match block.hash { Some(h) => self.hash.push(h.as_bytes().to_vec()), @@ -321,7 +318,6 @@ impl BlockColumns { self, schema: &Table, chain_id: u64, - n_rows: u64, ) -> Result { let mut cols = Vec::with_capacity(schema.columns().len()); with_series_binary!(cols, "hash", self.hash, schema); @@ -338,7 +334,7 @@ impl BlockColumns { with_series_binary!(cols, "total_difficulty", self.total_difficulty, schema); with_series!(cols, "size", self.size, schema); with_series!(cols, "base_fee_per_gas", self.base_fee_per_gas, schema); - with_series!(cols, "chain_id", vec![chain_id; n_rows as usize], schema); + with_series!(cols, "chain_id", vec![chain_id; self.n_rows], schema); DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema) } diff --git a/crates/freeze/src/datasets/transactions.rs b/crates/freeze/src/datasets/transactions.rs index 7e05f891..12f1b0a4 100644 --- a/crates/freeze/src/datasets/transactions.rs +++ b/crates/freeze/src/datasets/transactions.rs @@ -174,6 +174,7 @@ async fn fetch_transactions( #[derive(Default)] pub(crate) struct TransactionColumns { + n_rows: usize, block_number: Vec>, transaction_index: Vec>, transaction_hash: Vec>, @@ -195,7 +196,6 @@ impl TransactionColumns { self, schema: &Table, chain_id: u64, - n_rows: usize, ) -> Result { let mut cols = Vec::with_capacity(schema.columns().len()); with_series!(cols, "block_number", self.block_number, schema); @@ -212,7 +212,7 @@ impl TransactionColumns { with_series!(cols, "transaction_type", self.transaction_type, schema); with_series!(cols, "max_priority_fee_per_gas", self.max_priority_fee_per_gas, schema); with_series!(cols, "max_fee_per_gas", self.max_fee_per_gas, schema); - with_series!(cols, "chain_id", vec![chain_id; n_rows], schema); + with_series!(cols, "chain_id", vec![chain_id; self.n_rows], schema); DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema) } @@ -223,6 +223,7 @@ impl TransactionColumns { schema: &Table, gas_used: Option, ) { + self.n_rows += 1; if schema.has_column("block_number") { match tx.block_number { Some(block_number) => self.block_number.push(Some(block_number.as_u64())), @@ -286,15 +287,13 @@ async fn transactions_to_df( chain_id: u64, ) -> Result { let mut columns = TransactionColumns::default(); - let mut n_txs = 0; while let Some(message) = transactions.recv().await { match message { Ok((transaction, gas_used)) => { - n_txs += 1; transaction.process(schema, &mut columns, gas_used) } Err(e) => return Err(e), } } - columns.create_df(schema, chain_id, n_txs) + columns.create_df(schema, chain_id) }