Skip to content

Commit

Permalink
Move counting n_rows to column objects.
Browse files Browse the repository at this point in the history
  • Loading branch information
kskalski committed Aug 31, 2023
1 parent 709c279 commit 9d15e4f
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 14 deletions.
14 changes: 5 additions & 9 deletions crates/freeze/src/datasets/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,26 +199,21 @@ pub(crate) async fn blocks_to_dfs<TX: ProcessTransactions>(
let mut transaction_columns = TransactionColumns::default();

// parse stream of blocks
let mut n_blocks = 0;
let mut n_txs = 0;
while let Some(message) = blocks.recv().await {
match message {
Ok((block, gas_used)) => {
n_blocks += 1;
if let Some(schema) = blocks_schema {
block_columns.process_block(&block, schema)
}
if let Some(schema) = transactions_schema {
match gas_used {
Some(gas_used) => {
for (tx, gas_used) in block.transactions.iter().zip(gas_used) {
n_txs += 1;
tx.process(schema, &mut transaction_columns, Some(gas_used))
}
}
None => {
for tx in block.transactions.iter() {
n_txs += 1;
tx.process(schema, &mut transaction_columns, None)
}
}
Expand All @@ -234,18 +229,19 @@ pub(crate) async fn blocks_to_dfs<TX: ProcessTransactions>(

// convert to dataframes
let blocks_df = match blocks_schema {
Some(schema) => Some(block_columns.create_df(schema, chain_id, n_blocks)?),
Some(schema) => Some(block_columns.create_df(schema, chain_id)?),
None => None,
};
let transactions_df = match transactions_schema {
Some(schema) => Some(transaction_columns.create_df(schema, chain_id, n_txs)?),
Some(schema) => Some(transaction_columns.create_df(schema, chain_id)?),
None => None,
};
Ok((blocks_df, transactions_df))
}

#[derive(Default)]
struct BlockColumns {
n_rows: usize,
hash: Vec<Vec<u8>>,
parent_hash: Vec<Vec<u8>>,
author: Vec<Vec<u8>>,
Expand All @@ -264,6 +260,7 @@ struct BlockColumns {

impl BlockColumns {
fn process_block<TX>(&mut self, block: &Block<TX>, schema: &Table) {
self.n_rows += 1;
if schema.has_column("hash") {
match block.hash {
Some(h) => self.hash.push(h.as_bytes().to_vec()),
Expand Down Expand Up @@ -321,7 +318,6 @@ impl BlockColumns {
self,
schema: &Table,
chain_id: u64,
n_rows: u64,
) -> Result<DataFrame, CollectError> {
let mut cols = Vec::with_capacity(schema.columns().len());
with_series_binary!(cols, "hash", self.hash, schema);
Expand All @@ -338,7 +334,7 @@ impl BlockColumns {
with_series_binary!(cols, "total_difficulty", self.total_difficulty, schema);
with_series!(cols, "size", self.size, schema);
with_series!(cols, "base_fee_per_gas", self.base_fee_per_gas, schema);
with_series!(cols, "chain_id", vec![chain_id; n_rows as usize], schema);
with_series!(cols, "chain_id", vec![chain_id; self.n_rows], schema);

DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema)
}
Expand Down
9 changes: 4 additions & 5 deletions crates/freeze/src/datasets/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ async fn fetch_transactions(

#[derive(Default)]
pub(crate) struct TransactionColumns {
n_rows: usize,
block_number: Vec<Option<u64>>,
transaction_index: Vec<Option<u64>>,
transaction_hash: Vec<Vec<u8>>,
Expand All @@ -195,7 +196,6 @@ impl TransactionColumns {
self,
schema: &Table,
chain_id: u64,
n_rows: usize,
) -> Result<DataFrame, CollectError> {
let mut cols = Vec::with_capacity(schema.columns().len());
with_series!(cols, "block_number", self.block_number, schema);
Expand All @@ -212,7 +212,7 @@ impl TransactionColumns {
with_series!(cols, "transaction_type", self.transaction_type, schema);
with_series!(cols, "max_priority_fee_per_gas", self.max_priority_fee_per_gas, schema);
with_series!(cols, "max_fee_per_gas", self.max_fee_per_gas, schema);
with_series!(cols, "chain_id", vec![chain_id; n_rows], schema);
with_series!(cols, "chain_id", vec![chain_id; self.n_rows], schema);

DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema)
}
Expand All @@ -223,6 +223,7 @@ impl TransactionColumns {
schema: &Table,
gas_used: Option<u32>,
) {
self.n_rows += 1;
if schema.has_column("block_number") {
match tx.block_number {
Some(block_number) => self.block_number.push(Some(block_number.as_u64())),
Expand Down Expand Up @@ -286,15 +287,13 @@ async fn transactions_to_df(
chain_id: u64,
) -> Result<DataFrame, CollectError> {
let mut columns = TransactionColumns::default();
let mut n_txs = 0;
while let Some(message) = transactions.recv().await {
match message {
Ok((transaction, gas_used)) => {
n_txs += 1;
transaction.process(schema, &mut columns, gas_used)
}
Err(e) => return Err(e),
}
}
columns.create_df(schema, chain_id, n_txs)
columns.create_df(schema, chain_id)
}

0 comments on commit 9d15e4f

Please sign in to comment.