diff --git a/crates/freeze/src/datasets/blocks.rs b/crates/freeze/src/datasets/blocks.rs index 13c09436..22853482 100644 --- a/crates/freeze/src/datasets/blocks.rs +++ b/crates/freeze/src/datasets/blocks.rs @@ -9,11 +9,13 @@ use crate::{ types::{ conversions::{ToVecHex, ToVecU8}, BlockChunk, Blocks, CollectError, ColumnType, Dataset, Datatype, RowFilter, Source, Table, - TransactionChunk, U256Type, + TransactionChunk, }, - with_series, with_series_binary, with_series_u256, + with_series, with_series_binary, }; +use super::transactions::TransactionColumns; + pub(crate) type BlockTxGasTuple = Result<(Block, Option>), CollectError>; #[async_trait::async_trait] @@ -182,7 +184,7 @@ impl ProcessTransactions for TxHash { impl ProcessTransactions for Transaction { fn process(&self, schema: &Table, columns: &mut TransactionColumns, gas_used: Option) { - process_transaction(self, schema, columns, gas_used) + columns.process_transaction(self, schema, gas_used) } } @@ -193,35 +195,25 @@ pub(crate) async fn blocks_to_dfs( chain_id: u64, ) -> Result<(Option, Option), CollectError> { // initialize - let mut block_columns = - if blocks_schema.is_none() { BlockColumns::new(0) } else { BlockColumns::new(100) }; - let mut transaction_columns = if transactions_schema.is_none() { - TransactionColumns::new(0) - } else { - TransactionColumns::new(100) - }; + let mut block_columns = BlockColumns::default(); + let mut transaction_columns = TransactionColumns::default(); // parse stream of blocks - let mut n_blocks = 0; - let mut n_txs = 0; while let Some(message) = blocks.recv().await { match message { Ok((block, gas_used)) => { - n_blocks += 1; if let Some(schema) = blocks_schema { - process_block(&block, schema, &mut block_columns) + block_columns.process_block(&block, schema) } if let Some(schema) = transactions_schema { match gas_used { Some(gas_used) => { for (tx, gas_used) in block.transactions.iter().zip(gas_used) { - n_txs += 1; tx.process(schema, &mut transaction_columns, Some(gas_used)) } } None => { for tx in block.transactions.iter() { - n_txs += 1; tx.process(schema, &mut transaction_columns, None) } } @@ -237,17 +229,19 @@ pub(crate) async fn blocks_to_dfs( // convert to dataframes let blocks_df = match blocks_schema { - Some(schema) => Some(block_columns.create_df(schema, chain_id, n_blocks)?), + Some(schema) => Some(block_columns.create_df(schema, chain_id)?), None => None, }; let transactions_df = match transactions_schema { - Some(schema) => Some(transaction_columns.create_df(schema, chain_id, n_txs)?), + Some(schema) => Some(transaction_columns.create_df(schema, chain_id)?), None => None, }; Ok((blocks_df, transactions_df)) } +#[derive(Default)] struct BlockColumns { + n_rows: usize, hash: Vec>, parent_hash: Vec>, author: Vec>, @@ -265,32 +259,63 @@ struct BlockColumns { } impl BlockColumns { - fn new(n: usize) -> Self { - Self { - hash: Vec::with_capacity(n), - parent_hash: Vec::with_capacity(n), - author: Vec::with_capacity(n), - state_root: Vec::with_capacity(n), - transactions_root: Vec::with_capacity(n), - receipts_root: Vec::with_capacity(n), - number: Vec::with_capacity(n), - gas_used: Vec::with_capacity(n), - extra_data: Vec::with_capacity(n), - logs_bloom: Vec::with_capacity(n), - timestamp: Vec::with_capacity(n), - total_difficulty: Vec::with_capacity(n), - size: Vec::with_capacity(n), - base_fee_per_gas: Vec::with_capacity(n), + fn process_block(&mut self, block: &Block, schema: &Table) { + self.n_rows += 1; + if schema.has_column("hash") { + match block.hash { + Some(h) => self.hash.push(h.as_bytes().to_vec()), + _ => panic!("invalid block"), + } + } + if schema.has_column("parent_hash") { + self.parent_hash.push(block.parent_hash.as_bytes().to_vec()); + } + if schema.has_column("author") { + match block.author { + Some(a) => self.author.push(a.as_bytes().to_vec()), + _ => panic!("invalid block"), + } + } + if schema.has_column("state_root") { + self.state_root.push(block.state_root.as_bytes().to_vec()); + } + if schema.has_column("transactions_root") { + self.transactions_root.push(block.transactions_root.as_bytes().to_vec()); + } + if schema.has_column("receipts_root") { + self.receipts_root.push(block.receipts_root.as_bytes().to_vec()); + } + if schema.has_column("number") { + match block.number { + Some(n) => self.number.push(n.as_u32()), + _ => panic!("invalid block"), + } + } + if schema.has_column("gas_used") { + self.gas_used.push(block.gas_used.as_u32()); + } + if schema.has_column("extra_data") { + self.extra_data.push(block.extra_data.to_vec()); + } + if schema.has_column("logs_bloom") { + self.logs_bloom.push(block.logs_bloom.map(|x| x.0.to_vec())); + } + if schema.has_column("timestamp") { + self.timestamp.push(block.timestamp.as_u32()); + } + if schema.has_column("total_difficulty") { + self.total_difficulty.push(block.total_difficulty.map(|x| x.to_vec_u8())); + } + if schema.has_column("size") { + self.size.push(block.size.map(|x| x.as_u32())); + } + if schema.has_column("base_fee_per_gas") { + self.base_fee_per_gas.push(block.base_fee_per_gas.map(|value| value.as_u64())); } } - fn create_df( - self, - schema: &Table, - chain_id: u64, - n_rows: u64, - ) -> Result { - let mut cols = Vec::new(); + fn create_df(self, schema: &Table, chain_id: u64) -> Result { + let mut cols = Vec::with_capacity(schema.columns().len()); with_series_binary!(cols, "hash", self.hash, schema); with_series_binary!(cols, "parent_hash", self.parent_hash, schema); with_series_binary!(cols, "author", self.author, schema); @@ -305,195 +330,8 @@ impl BlockColumns { with_series_binary!(cols, "total_difficulty", self.total_difficulty, schema); with_series!(cols, "size", self.size, schema); with_series!(cols, "base_fee_per_gas", self.base_fee_per_gas, schema); - - if schema.has_column("chain_id") { - cols.push(Series::new("chain_id", vec![chain_id; n_rows as usize])); - } + with_series!(cols, "chain_id", vec![chain_id; self.n_rows], schema); DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema) } } - -pub(crate) struct TransactionColumns { - block_number: Vec>, - transaction_index: Vec>, - transaction_hash: Vec>, - nonce: Vec, - from_address: Vec>, - to_address: Vec>>, - value: Vec, - input: Vec>, - gas_limit: Vec, - gas_used: Vec, - gas_price: Vec>, - transaction_type: Vec>, - max_priority_fee_per_gas: Vec>, - max_fee_per_gas: Vec>, -} - -impl TransactionColumns { - pub(crate) fn new(n: usize) -> Self { - Self { - block_number: Vec::with_capacity(n), - transaction_index: Vec::with_capacity(n), - transaction_hash: Vec::with_capacity(n), - nonce: Vec::with_capacity(n), - from_address: Vec::with_capacity(n), - to_address: Vec::with_capacity(n), - value: Vec::with_capacity(n), - input: Vec::with_capacity(n), - gas_limit: Vec::with_capacity(n), - gas_used: Vec::with_capacity(n), - gas_price: Vec::with_capacity(n), - transaction_type: Vec::with_capacity(n), - max_priority_fee_per_gas: Vec::with_capacity(n), - max_fee_per_gas: Vec::with_capacity(n), - } - } - - pub(crate) fn create_df( - self, - schema: &Table, - chain_id: u64, - n_rows: usize, - ) -> Result { - let mut cols = Vec::new(); - with_series!(cols, "block_number", self.block_number, schema); - with_series!(cols, "transaction_index", self.transaction_index, schema); - with_series_binary!(cols, "transaction_hash", self.transaction_hash, schema); - with_series!(cols, "nonce", self.nonce, schema); - with_series_binary!(cols, "from_address", self.from_address, schema); - with_series_binary!(cols, "to_address", self.to_address, schema); - with_series_u256!(cols, "value", self.value, schema); - with_series_binary!(cols, "input", self.input, schema); - with_series!(cols, "gas_limit", self.gas_limit, schema); - with_series!(cols, "gas_used", self.gas_used, schema); - with_series!(cols, "gas_price", self.gas_price, schema); - with_series!(cols, "transaction_type", self.transaction_type, schema); - with_series!(cols, "max_priority_fee_per_gas", self.max_priority_fee_per_gas, schema); - with_series!(cols, "max_fee_per_gas", self.max_fee_per_gas, schema); - - if schema.has_column("chain_id") { - cols.push(Series::new("chain_id", vec![chain_id; n_rows])); - } - - DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema) - } -} - -fn process_block(block: &Block, schema: &Table, columns: &mut BlockColumns) { - if schema.has_column("hash") { - match block.hash { - Some(h) => columns.hash.push(h.as_bytes().to_vec()), - _ => panic!("invalid block"), - } - } - if schema.has_column("parent_hash") { - columns.parent_hash.push(block.parent_hash.as_bytes().to_vec()); - } - if schema.has_column("author") { - match block.author { - Some(a) => columns.author.push(a.as_bytes().to_vec()), - _ => panic!("invalid block"), - } - } - if schema.has_column("state_root") { - columns.state_root.push(block.state_root.as_bytes().to_vec()); - } - if schema.has_column("transactions_root") { - columns.transactions_root.push(block.transactions_root.as_bytes().to_vec()); - } - if schema.has_column("receipts_root") { - columns.receipts_root.push(block.receipts_root.as_bytes().to_vec()); - } - if schema.has_column("number") { - match block.number { - Some(n) => columns.number.push(n.as_u32()), - _ => panic!("invalid block"), - } - } - if schema.has_column("gas_used") { - columns.gas_used.push(block.gas_used.as_u32()); - } - if schema.has_column("extra_data") { - columns.extra_data.push(block.extra_data.to_vec()); - } - if schema.has_column("logs_bloom") { - columns.logs_bloom.push(block.logs_bloom.map(|x| x.0.to_vec())); - } - if schema.has_column("timestamp") { - columns.timestamp.push(block.timestamp.as_u32()); - } - if schema.has_column("total_difficulty") { - columns.total_difficulty.push(block.total_difficulty.map(|x| x.to_vec_u8())); - } - if schema.has_column("size") { - columns.size.push(block.size.map(|x| x.as_u32())); - } - if schema.has_column("base_fee_per_gas") { - columns.base_fee_per_gas.push(block.base_fee_per_gas.map(|value| value.as_u64())); - } -} - -fn process_transaction( - tx: &Transaction, - schema: &Table, - columns: &mut TransactionColumns, - gas_used: Option, -) { - if schema.has_column("block_number") { - match tx.block_number { - Some(block_number) => columns.block_number.push(Some(block_number.as_u64())), - None => columns.block_number.push(None), - } - } - if schema.has_column("transaction_index") { - match tx.transaction_index { - Some(transaction_index) => { - columns.transaction_index.push(Some(transaction_index.as_u64())) - } - None => columns.transaction_index.push(None), - } - } - if schema.has_column("transaction_hash") { - columns.transaction_hash.push(tx.hash.as_bytes().to_vec()); - } - if schema.has_column("from_address") { - columns.from_address.push(tx.from.as_bytes().to_vec()); - } - if schema.has_column("to_address") { - match tx.to { - Some(to_address) => columns.to_address.push(Some(to_address.as_bytes().to_vec())), - None => columns.to_address.push(None), - } - } - if schema.has_column("nonce") { - columns.nonce.push(tx.nonce.as_u64()); - } - if schema.has_column("value") { - columns.value.push(tx.value); - } - if schema.has_column("input") { - columns.input.push(tx.input.to_vec()); - } - if schema.has_column("gas_limit") { - columns.gas_limit.push(tx.gas.as_u32()); - } - if schema.has_column("gas_used") { - columns.gas_used.push(gas_used.unwrap()) - } - if schema.has_column("gas_price") { - columns.gas_price.push(tx.gas_price.map(|gas_price| gas_price.as_u64())); - } - if schema.has_column("transaction_type") { - columns.transaction_type.push(tx.transaction_type.map(|value| value.as_u32())); - } - if schema.has_column("max_priority_fee_per_gas") { - columns - .max_priority_fee_per_gas - .push(tx.max_priority_fee_per_gas.map(|value| value.as_u64())); - } - if schema.has_column("max_fee_per_gas") { - columns.max_fee_per_gas.push(tx.max_fee_per_gas.map(|value| value.as_u64())); - } -} diff --git a/crates/freeze/src/datasets/logs.rs b/crates/freeze/src/datasets/logs.rs index 7c135e28..06f884f3 100644 --- a/crates/freeze/src/datasets/logs.rs +++ b/crates/freeze/src/datasets/logs.rs @@ -190,99 +190,116 @@ async fn fetch_transaction_logs( } } -async fn logs_to_df( - mut logs: mpsc::Receiver, CollectError>>, - schema: &Table, - chain_id: u64, -) -> Result { - let mut block_number: Vec = Vec::new(); - let mut transaction_index: Vec = Vec::new(); - let mut log_index: Vec = Vec::new(); - let mut transaction_hash: Vec> = Vec::new(); - let mut address: Vec> = Vec::new(); - let mut topic0: Vec>> = Vec::new(); - let mut topic1: Vec>> = Vec::new(); - let mut topic2: Vec>> = Vec::new(); - let mut topic3: Vec>> = Vec::new(); - let mut data: Vec> = Vec::new(); +#[derive(Default)] +pub(crate) struct LogColumns { + n_rows: usize, + block_number: Vec, + transaction_index: Vec, + log_index: Vec, + transaction_hash: Vec>, + address: Vec>, + topic0: Vec>>, + topic1: Vec>>, + topic2: Vec>>, + topic3: Vec>>, + data: Vec>, +} - let mut n_rows = 0; - // while let Some(Ok(logs)) = logs.recv().await { - while let Some(message) = logs.recv().await { - match message { - Ok(logs) => { - for log in logs.iter() { - if let Some(true) = log.removed { - continue +impl LogColumns { + pub(crate) fn process_logs( + &mut self, + logs: Vec, + schema: &Table, + ) -> Result<(), CollectError> { + for log in logs { + if let Some(true) = log.removed { + continue + } + if let (Some(bn), Some(tx), Some(ti), Some(li)) = + (log.block_number, log.transaction_hash, log.transaction_index, log.log_index) + { + self.n_rows += 1; + self.address.push(log.address.as_bytes().to_vec()); + match log.topics.len() { + 0 => { + self.topic0.push(None); + self.topic1.push(None); + self.topic2.push(None); + self.topic3.push(None); } - if let (Some(bn), Some(tx), Some(ti), Some(li)) = ( - log.block_number, - log.transaction_hash, - log.transaction_index, - log.log_index, - ) { - n_rows += 1; - address.push(log.address.as_bytes().to_vec()); - match log.topics.len() { - 0 => { - topic0.push(None); - topic1.push(None); - topic2.push(None); - topic3.push(None); - } - 1 => { - topic0.push(Some(log.topics[0].as_bytes().to_vec())); - topic1.push(None); - topic2.push(None); - topic3.push(None); - } - 2 => { - topic0.push(Some(log.topics[0].as_bytes().to_vec())); - topic1.push(Some(log.topics[1].as_bytes().to_vec())); - topic2.push(None); - topic3.push(None); - } - 3 => { - topic0.push(Some(log.topics[0].as_bytes().to_vec())); - topic1.push(Some(log.topics[1].as_bytes().to_vec())); - topic2.push(Some(log.topics[2].as_bytes().to_vec())); - topic3.push(None); - } - 4 => { - topic0.push(Some(log.topics[0].as_bytes().to_vec())); - topic1.push(Some(log.topics[1].as_bytes().to_vec())); - topic2.push(Some(log.topics[2].as_bytes().to_vec())); - topic3.push(Some(log.topics[3].as_bytes().to_vec())); - } - _ => return Err(CollectError::InvalidNumberOfTopics), - } - data.push(log.data.clone().to_vec()); - block_number.push(bn.as_u32()); - transaction_hash.push(tx.as_bytes().to_vec()); - transaction_index.push(ti.as_u32()); - log_index.push(li.as_u32()); + 1 => { + self.topic0.push(Some(log.topics[0].as_bytes().to_vec())); + self.topic1.push(None); + self.topic2.push(None); + self.topic3.push(None); + } + 2 => { + self.topic0.push(Some(log.topics[0].as_bytes().to_vec())); + self.topic1.push(Some(log.topics[1].as_bytes().to_vec())); + self.topic2.push(None); + self.topic3.push(None); + } + 3 => { + self.topic0.push(Some(log.topics[0].as_bytes().to_vec())); + self.topic1.push(Some(log.topics[1].as_bytes().to_vec())); + self.topic2.push(Some(log.topics[2].as_bytes().to_vec())); + self.topic3.push(None); + } + 4 => { + self.topic0.push(Some(log.topics[0].as_bytes().to_vec())); + self.topic1.push(Some(log.topics[1].as_bytes().to_vec())); + self.topic2.push(Some(log.topics[2].as_bytes().to_vec())); + self.topic3.push(Some(log.topics[3].as_bytes().to_vec())); } + _ => return Err(CollectError::InvalidNumberOfTopics), } + if schema.has_column("data") { + self.data.push(log.data.to_vec()); + } + self.block_number.push(bn.as_u32()); + self.transaction_hash.push(tx.as_bytes().to_vec()); + self.transaction_index.push(ti.as_u32()); + self.log_index.push(li.as_u32()); } - _ => return Err(CollectError::TooManyRequestsError), } + Ok(()) } - let mut cols = Vec::new(); - with_series!(cols, "block_number", block_number, schema); - with_series!(cols, "transaction_index", transaction_index, schema); - with_series!(cols, "log_index", log_index, schema); - with_series_binary!(cols, "transaction_hash", transaction_hash, schema); - with_series_binary!(cols, "contract_address", address, schema); - with_series_binary!(cols, "topic0", topic0, schema); - with_series_binary!(cols, "topic1", topic1, schema); - with_series_binary!(cols, "topic2", topic2, schema); - with_series_binary!(cols, "topic3", topic3, schema); - with_series_binary!(cols, "data", data, schema); + pub(crate) fn create_df( + self, + schema: &Table, + chain_id: u64, + ) -> Result { + let mut cols = Vec::with_capacity(schema.columns().len()); + with_series!(cols, "block_number", self.block_number, schema); + with_series!(cols, "transaction_index", self.transaction_index, schema); + with_series!(cols, "log_index", self.log_index, schema); + with_series_binary!(cols, "transaction_hash", self.transaction_hash, schema); + with_series_binary!(cols, "contract_address", self.address, schema); + with_series_binary!(cols, "topic0", self.topic0, schema); + with_series_binary!(cols, "topic1", self.topic1, schema); + with_series_binary!(cols, "topic2", self.topic2, schema); + with_series_binary!(cols, "topic3", self.topic3, schema); + with_series_binary!(cols, "data", self.data, schema); + with_series!(cols, "chain_id", vec![chain_id; self.n_rows], schema); - if schema.has_column("chain_id") { - cols.push(Series::new("chain_id", vec![chain_id; n_rows])); + DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema) } +} - DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema) +async fn logs_to_df( + mut logs: mpsc::Receiver, CollectError>>, + schema: &Table, + chain_id: u64, +) -> Result { + let mut columns = LogColumns::default(); + + while let Some(message) = logs.recv().await { + if let Ok(logs) = message { + columns.process_logs(logs, schema)? + } else { + return Err(CollectError::TooManyRequestsError) + } + } + columns.create_df(schema, chain_id) } diff --git a/crates/freeze/src/datasets/transactions.rs b/crates/freeze/src/datasets/transactions.rs index 67a788f7..1dacf7f4 100644 --- a/crates/freeze/src/datasets/transactions.rs +++ b/crates/freeze/src/datasets/transactions.rs @@ -5,9 +5,14 @@ use polars::prelude::*; use tokio::{sync::mpsc, task}; use super::{blocks, blocks::ProcessTransactions, blocks_and_transactions}; -use crate::types::{ - BlockChunk, CollectError, ColumnType, Dataset, Datatype, RowFilter, Source, Table, - TransactionChunk, Transactions, +use crate::{ + dataframes::SortableDataFrame, + types::{ + conversions::{ToVecHex, ToVecU8}, + BlockChunk, CollectError, ColumnType, Dataset, Datatype, RowFilter, Source, Table, + TransactionChunk, Transactions, U256Type, + }, + with_series, with_series_binary, with_series_u256, }; #[async_trait::async_trait] @@ -167,21 +172,126 @@ async fn fetch_transactions( } } +#[derive(Default)] +pub(crate) struct TransactionColumns { + n_rows: usize, + block_number: Vec>, + transaction_index: Vec>, + transaction_hash: Vec>, + nonce: Vec, + from_address: Vec>, + to_address: Vec>>, + value: Vec, + input: Vec>, + gas_limit: Vec, + gas_used: Vec, + gas_price: Vec>, + transaction_type: Vec>, + max_priority_fee_per_gas: Vec>, + max_fee_per_gas: Vec>, +} + +impl TransactionColumns { + pub(crate) fn create_df( + self, + schema: &Table, + chain_id: u64, + ) -> Result { + let mut cols = Vec::with_capacity(schema.columns().len()); + with_series!(cols, "block_number", self.block_number, schema); + with_series!(cols, "transaction_index", self.transaction_index, schema); + with_series_binary!(cols, "transaction_hash", self.transaction_hash, schema); + with_series!(cols, "nonce", self.nonce, schema); + with_series_binary!(cols, "from_address", self.from_address, schema); + with_series_binary!(cols, "to_address", self.to_address, schema); + with_series_u256!(cols, "value", self.value, schema); + with_series_binary!(cols, "input", self.input, schema); + with_series!(cols, "gas_limit", self.gas_limit, schema); + with_series!(cols, "gas_used", self.gas_used, schema); + with_series!(cols, "gas_price", self.gas_price, schema); + with_series!(cols, "transaction_type", self.transaction_type, schema); + with_series!(cols, "max_priority_fee_per_gas", self.max_priority_fee_per_gas, schema); + with_series!(cols, "max_fee_per_gas", self.max_fee_per_gas, schema); + with_series!(cols, "chain_id", vec![chain_id; self.n_rows], schema); + + DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema) + } + + pub(crate) fn process_transaction( + &mut self, + tx: &Transaction, + schema: &Table, + gas_used: Option, + ) { + self.n_rows += 1; + if schema.has_column("block_number") { + match tx.block_number { + Some(block_number) => self.block_number.push(Some(block_number.as_u64())), + None => self.block_number.push(None), + } + } + if schema.has_column("transaction_index") { + match tx.transaction_index { + Some(transaction_index) => { + self.transaction_index.push(Some(transaction_index.as_u64())) + } + None => self.transaction_index.push(None), + } + } + if schema.has_column("transaction_hash") { + self.transaction_hash.push(tx.hash.as_bytes().to_vec()); + } + if schema.has_column("from_address") { + self.from_address.push(tx.from.as_bytes().to_vec()); + } + if schema.has_column("to_address") { + match tx.to { + Some(to_address) => self.to_address.push(Some(to_address.as_bytes().to_vec())), + None => self.to_address.push(None), + } + } + if schema.has_column("nonce") { + self.nonce.push(tx.nonce.as_u64()); + } + if schema.has_column("value") { + self.value.push(tx.value); + } + if schema.has_column("input") { + self.input.push(tx.input.to_vec()); + } + if schema.has_column("gas_limit") { + self.gas_limit.push(tx.gas.as_u32()); + } + if schema.has_column("gas_used") { + self.gas_used.push(gas_used.unwrap()) + } + if schema.has_column("gas_price") { + self.gas_price.push(tx.gas_price.map(|gas_price| gas_price.as_u64())); + } + if schema.has_column("transaction_type") { + self.transaction_type.push(tx.transaction_type.map(|value| value.as_u32())); + } + if schema.has_column("max_priority_fee_per_gas") { + self.max_priority_fee_per_gas + .push(tx.max_priority_fee_per_gas.map(|value| value.as_u64())); + } + if schema.has_column("max_fee_per_gas") { + self.max_fee_per_gas.push(tx.max_fee_per_gas.map(|value| value.as_u64())); + } + } +} + async fn transactions_to_df( mut transactions: mpsc::Receiver), CollectError>>, schema: &Table, chain_id: u64, ) -> Result { - let mut columns = blocks::TransactionColumns::new(100); - let mut n_txs = 0; + let mut columns = TransactionColumns::default(); while let Some(message) = transactions.recv().await { match message { - Ok((transaction, gas_used)) => { - n_txs += 1; - transaction.process(schema, &mut columns, gas_used) - } + Ok((transaction, gas_used)) => transaction.process(schema, &mut columns, gas_used), Err(e) => return Err(e), } } - columns.create_df(schema, chain_id, n_txs) + columns.create_df(schema, chain_id) }