From a247da868b9e60384cb32fb8d89e09d6a71c491e Mon Sep 17 00:00:00 2001 From: sslivkoff Date: Thu, 12 Oct 2023 21:05:15 -0700 Subject: [PATCH] more info + readability for outputs --- crates/cli/src/parse/source.rs | 3 + crates/cli/src/run.rs | 3 +- crates/freeze/src/collect.rs | 2 +- crates/freeze/src/datasets/balance_diffs.rs | 12 +- crates/freeze/src/datasets/balances.rs | 6 +- crates/freeze/src/datasets/blocks.rs | 12 +- crates/freeze/src/datasets/code_diffs.rs | 12 +- crates/freeze/src/datasets/codes.rs | 6 +- crates/freeze/src/datasets/contracts.rs | 12 +- crates/freeze/src/datasets/erc20_balances.rs | 6 +- crates/freeze/src/datasets/erc20_metadata.rs | 6 +- crates/freeze/src/datasets/erc20_supplies.rs | 6 +- crates/freeze/src/datasets/erc20_transfers.rs | 12 +- crates/freeze/src/datasets/erc721_metadata.rs | 6 +- .../freeze/src/datasets/erc721_transfers.rs | 12 +- crates/freeze/src/datasets/eth_calls.rs | 6 +- crates/freeze/src/datasets/logs.rs | 12 +- .../freeze/src/datasets/native_transfers.rs | 12 +- crates/freeze/src/datasets/nonce_diffs.rs | 12 +- crates/freeze/src/datasets/nonces.rs | 6 +- crates/freeze/src/datasets/storage_diffs.rs | 12 +- crates/freeze/src/datasets/storages.rs | 6 +- crates/freeze/src/datasets/trace_calls.rs | 6 +- crates/freeze/src/datasets/traces.rs | 12 +- .../src/datasets/transaction_addresses.rs | 12 +- crates/freeze/src/datasets/transactions.rs | 12 +- crates/freeze/src/datasets/vm_traces.rs | 12 +- crates/freeze/src/freeze.rs | 5 +- .../multi_datasets/blocks_and_transactions.rs | 12 +- .../multi_datasets/call_trace_derivatives.rs | 12 +- .../freeze/src/multi_datasets/state_diffs.rs | 12 +- crates/freeze/src/types/chunks/chunk_ops.rs | 3 +- .../src/types/collection/collect_by_block.rs | 4 +- .../collection/collect_by_transaction.rs | 4 +- .../src/types/collection/collect_generic.rs | 6 +- .../src/types/datatypes/datatype_macros.rs | 4 +- crates/freeze/src/types/sources.rs | 6 + crates/freeze/src/types/summaries.rs | 239 +++++++++++------- crates/python/src/collect_adapter.rs | 2 +- 39 files changed, 400 insertions(+), 145 deletions(-) diff --git a/crates/cli/src/parse/source.rs b/crates/cli/src/parse/source.rs index 390ff3fd..f9923b79 100644 --- a/crates/cli/src/parse/source.rs +++ b/crates/cli/src/parse/source.rs @@ -44,7 +44,10 @@ pub(crate) async fn parse_source(args: &Args) -> Result { fetcher: Arc::new(fetcher), chain_id, inner_request_size: args.inner_request_size, + max_concurrent_requests: args.requests_per_second.map(|x| x as u64), max_concurrent_chunks, + max_requests_per_second: args.requests_per_second.map(|x| x as u64), + rpc_url, }; Ok(output) diff --git a/crates/cli/src/run.rs b/crates/cli/src/run.rs index 5bfd7e33..cf24a1b8 100644 --- a/crates/cli/src/run.rs +++ b/crates/cli/src/run.rs @@ -1,6 +1,6 @@ use crate::{args, parse}; use cryo_freeze::{CollectError, ExecutionEnv, FreezeSummary}; -use std::time::SystemTime; +use std::{sync::Arc, time::SystemTime}; /// run cli pub async fn run(args: args::Args) -> Result, CollectError> { @@ -9,6 +9,7 @@ pub async fn run(args: args::Args) -> Result, CollectError Ok(opts) => opts, Err(e) => return Err(e.into()), }; + let source = Arc::new(source); let env = ExecutionEnv { t_start_parse, ..env }; let env = env.set_start_time(); cryo_freeze::freeze(&query, &source, &sink, &env).await diff --git a/crates/freeze/src/collect.rs b/crates/freeze/src/collect.rs index 8fb62c51..e2f684bd 100644 --- a/crates/freeze/src/collect.rs +++ b/crates/freeze/src/collect.rs @@ -2,7 +2,7 @@ use crate::{collect_partition, CollectError, Query, Source}; use polars::prelude::*; /// collect single dataframe -pub async fn collect(query: Query, source: Source) -> Result { +pub async fn collect(query: Query, source: Arc) -> Result { query.is_valid()?; let datatype = if query.datatypes.len() != 1 { return Err(CollectError::CollectError( diff --git a/crates/freeze/src/datasets/balance_diffs.rs b/crates/freeze/src/datasets/balance_diffs.rs index f3918110..9929d83b 100644 --- a/crates/freeze/src/datasets/balance_diffs.rs +++ b/crates/freeze/src/datasets/balance_diffs.rs @@ -35,7 +35,11 @@ impl Dataset for BalanceDiffs { impl CollectByBlock for BalanceDiffs { type Response = BlockTxsTraces; - async fn extract(request: Params, source: Source, schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + schemas: Schemas, + ) -> Result { let schema = schemas.get(&Datatype::BalanceDiffs).ok_or(err("schema not provided"))?; let include_txs = schema.has_column("transaction_hash"); source.fetcher.trace_block_state_diffs(request.block_number()? as u32, include_txs).await @@ -50,7 +54,11 @@ impl CollectByBlock for BalanceDiffs { impl CollectByTransaction for BalanceDiffs { type Response = (Option, Vec>>, Vec); - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { source.fetcher.trace_transaction_state_diffs(request.transaction_hash()?).await } diff --git a/crates/freeze/src/datasets/balances.rs b/crates/freeze/src/datasets/balances.rs index 6f74d7f1..9f435646 100644 --- a/crates/freeze/src/datasets/balances.rs +++ b/crates/freeze/src/datasets/balances.rs @@ -36,7 +36,11 @@ type BlockTxAddressOutput = (u32, Option>, Vec, U256); impl CollectByBlock for Balances { type Response = BlockTxAddressOutput; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { let address = request.address()?; let block_number = request.block_number()? as u32; let balance = diff --git a/crates/freeze/src/datasets/blocks.rs b/crates/freeze/src/datasets/blocks.rs index 72f08fbc..5f3bd795 100644 --- a/crates/freeze/src/datasets/blocks.rs +++ b/crates/freeze/src/datasets/blocks.rs @@ -55,7 +55,11 @@ type Result = ::core::result::Result; impl CollectByBlock for Blocks { type Response = Block; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { let block = source .fetcher .get_block(request.block_number()?) @@ -74,7 +78,11 @@ impl CollectByBlock for Blocks { impl CollectByTransaction for Blocks { type Response = Block; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { let transaction = source .fetcher .get_transaction(request.ethers_transaction_hash()?) diff --git a/crates/freeze/src/datasets/code_diffs.rs b/crates/freeze/src/datasets/code_diffs.rs index 17be10c2..bd23d5a5 100644 --- a/crates/freeze/src/datasets/code_diffs.rs +++ b/crates/freeze/src/datasets/code_diffs.rs @@ -35,7 +35,11 @@ type Result = ::core::result::Result; impl CollectByBlock for CodeDiffs { type Response = BlockTxsTraces; - async fn extract(request: Params, source: Source, schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + schemas: Schemas, + ) -> Result { let schema = schemas.get(&Datatype::CodeDiffs).ok_or(err("schema not provided"))?; let include_txs = schema.has_column("transaction_hash"); source.fetcher.trace_block_state_diffs(request.block_number()? as u32, include_txs).await @@ -50,7 +54,11 @@ impl CollectByBlock for CodeDiffs { impl CollectByTransaction for CodeDiffs { type Response = (Option, Vec>>, Vec); - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { source.fetcher.trace_transaction_state_diffs(request.transaction_hash()?).await } diff --git a/crates/freeze/src/datasets/codes.rs b/crates/freeze/src/datasets/codes.rs index 5e7380fb..b403e0f5 100644 --- a/crates/freeze/src/datasets/codes.rs +++ b/crates/freeze/src/datasets/codes.rs @@ -36,7 +36,11 @@ type BlockTxAddressOutput = (u32, Option>, Vec, Vec); impl CollectByBlock for Codes { type Response = BlockTxAddressOutput; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { let address = request.address()?; let block_number = request.block_number()? as u32; let output = diff --git a/crates/freeze/src/datasets/contracts.rs b/crates/freeze/src/datasets/contracts.rs index 0b993dba..57edb823 100644 --- a/crates/freeze/src/datasets/contracts.rs +++ b/crates/freeze/src/datasets/contracts.rs @@ -40,7 +40,11 @@ type Result = ::core::result::Result; impl CollectByBlock for Contracts { type Response = Vec; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { source.fetcher.trace_block(request.ethers_block_number()?).await } @@ -54,7 +58,11 @@ impl CollectByBlock for Contracts { impl CollectByTransaction for Contracts { type Response = Vec; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { source.fetcher.trace_transaction(request.ethers_transaction_hash()?).await } diff --git a/crates/freeze/src/datasets/erc20_balances.rs b/crates/freeze/src/datasets/erc20_balances.rs index 5a1da68d..617357d4 100644 --- a/crates/freeze/src/datasets/erc20_balances.rs +++ b/crates/freeze/src/datasets/erc20_balances.rs @@ -38,7 +38,11 @@ type BlockErc20AddressBalance = (u32, Vec, Vec, Option); impl CollectByBlock for Erc20Balances { type Response = BlockErc20AddressBalance; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { let signature = FUNCTION_ERC20_BALANCE_OF.clone(); let mut call_data = signature.clone(); call_data.extend(vec![0; 12]); diff --git a/crates/freeze/src/datasets/erc20_metadata.rs b/crates/freeze/src/datasets/erc20_metadata.rs index fd9b1a88..81017001 100644 --- a/crates/freeze/src/datasets/erc20_metadata.rs +++ b/crates/freeze/src/datasets/erc20_metadata.rs @@ -42,7 +42,11 @@ pub(crate) fn remove_control_characters(s: &str) -> String { impl CollectByBlock for Erc20Metadata { type Response = BlockAddressNameSymbolDecimals; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { let block_number = request.ethers_block_number()?; let address = request.ethers_address()?; diff --git a/crates/freeze/src/datasets/erc20_supplies.rs b/crates/freeze/src/datasets/erc20_supplies.rs index 1943b473..c3899e4d 100644 --- a/crates/freeze/src/datasets/erc20_supplies.rs +++ b/crates/freeze/src/datasets/erc20_supplies.rs @@ -37,7 +37,11 @@ type BlockErc20Supply = (u32, Vec, Option); impl CollectByBlock for Erc20Supplies { type Response = BlockErc20Supply; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { let signature: Vec = FUNCTION_ERC20_TOTAL_SUPPLY.clone(); let mut call_data = signature.clone(); call_data.extend(request.contract()?); diff --git a/crates/freeze/src/datasets/erc20_transfers.rs b/crates/freeze/src/datasets/erc20_transfers.rs index 6fcd3ed5..10a4e2bc 100644 --- a/crates/freeze/src/datasets/erc20_transfers.rs +++ b/crates/freeze/src/datasets/erc20_transfers.rs @@ -44,7 +44,11 @@ type Result = ::core::result::Result; impl CollectByBlock for Erc20Transfers { type Response = Vec; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { let topics = [Some(ValueOrArray::Value(Some(*EVENT_ERC20_TRANSFER))), None, None, None]; let filter = Filter { topics, ..request.ethers_log_filter()? }; let logs = source.fetcher.get_logs(&filter).await?; @@ -61,7 +65,11 @@ impl CollectByBlock for Erc20Transfers { impl CollectByTransaction for Erc20Transfers { type Response = Vec; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { let logs = source.fetcher.get_transaction_logs(request.transaction_hash()?).await?; Ok(logs.into_iter().filter(is_erc20_transfer).collect()) } diff --git a/crates/freeze/src/datasets/erc721_metadata.rs b/crates/freeze/src/datasets/erc721_metadata.rs index 1a348e31..32b70eb0 100644 --- a/crates/freeze/src/datasets/erc721_metadata.rs +++ b/crates/freeze/src/datasets/erc721_metadata.rs @@ -37,7 +37,11 @@ type BlockAddressNameSymbol = (u32, Vec, Option, Option); impl CollectByBlock for Erc721Metadata { type Response = BlockAddressNameSymbol; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { let block_number = request.ethers_block_number()?; let address = request.ethers_contract()?; diff --git a/crates/freeze/src/datasets/erc721_transfers.rs b/crates/freeze/src/datasets/erc721_transfers.rs index 7e188f27..718c7879 100644 --- a/crates/freeze/src/datasets/erc721_transfers.rs +++ b/crates/freeze/src/datasets/erc721_transfers.rs @@ -44,7 +44,11 @@ type Result = ::core::result::Result; impl CollectByBlock for Erc721Transfers { type Response = Vec; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { let topics = [Some(ValueOrArray::Value(Some(*EVENT_ERC721_TRANSFER))), None, None, None]; let filter = Filter { topics, ..request.ethers_log_filter()? }; let logs = source.fetcher.get_logs(&filter).await?; @@ -61,7 +65,11 @@ impl CollectByBlock for Erc721Transfers { impl CollectByTransaction for Erc721Transfers { type Response = Vec; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { let logs = source.fetcher.get_transaction_logs(request.transaction_hash()?).await?; Ok(logs.into_iter().filter(is_erc721_transfer).collect()) } diff --git a/crates/freeze/src/datasets/eth_calls.rs b/crates/freeze/src/datasets/eth_calls.rs index ceed7f26..6064649c 100644 --- a/crates/freeze/src/datasets/eth_calls.rs +++ b/crates/freeze/src/datasets/eth_calls.rs @@ -56,7 +56,11 @@ type EthCallsResponse = (u32, Vec, Vec, Vec); impl CollectByBlock for EthCalls { type Response = EthCallsResponse; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { let transaction = TransactionRequest { to: Some(request.ethers_contract()?.into()), data: Some(request.call_data()?.into()), diff --git a/crates/freeze/src/datasets/logs.rs b/crates/freeze/src/datasets/logs.rs index b23259ee..ec3e869f 100644 --- a/crates/freeze/src/datasets/logs.rs +++ b/crates/freeze/src/datasets/logs.rs @@ -52,7 +52,11 @@ type Result = ::core::result::Result; impl CollectByBlock for Logs { type Response = Vec; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { source.fetcher.get_logs(&request.ethers_log_filter()?).await } @@ -66,7 +70,11 @@ impl CollectByBlock for Logs { impl CollectByTransaction for Logs { type Response = Vec; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { source.fetcher.get_transaction_logs(request.transaction_hash()?).await } diff --git a/crates/freeze/src/datasets/native_transfers.rs b/crates/freeze/src/datasets/native_transfers.rs index 9278d94f..ff72b0ed 100644 --- a/crates/freeze/src/datasets/native_transfers.rs +++ b/crates/freeze/src/datasets/native_transfers.rs @@ -35,7 +35,11 @@ type Result = ::core::result::Result; impl CollectByBlock for NativeTransfers { type Response = Vec; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { source.fetcher.trace_block(request.block_number()?.into()).await } @@ -49,7 +53,11 @@ impl CollectByBlock for NativeTransfers { impl CollectByTransaction for NativeTransfers { type Response = Vec; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { source.fetcher.trace_transaction(request.ethers_transaction_hash()?).await } diff --git a/crates/freeze/src/datasets/nonce_diffs.rs b/crates/freeze/src/datasets/nonce_diffs.rs index 62e70e20..2c6b5874 100644 --- a/crates/freeze/src/datasets/nonce_diffs.rs +++ b/crates/freeze/src/datasets/nonce_diffs.rs @@ -35,7 +35,11 @@ type Result = ::core::result::Result; impl CollectByBlock for NonceDiffs { type Response = BlockTxsTraces; - async fn extract(request: Params, source: Source, schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + schemas: Schemas, + ) -> Result { let schema = schemas.get(&Datatype::NonceDiffs).ok_or(err("schema not provided"))?; let include_txs = schema.has_column("transaction_hash"); source.fetcher.trace_block_state_diffs(request.block_number()? as u32, include_txs).await @@ -50,7 +54,11 @@ impl CollectByBlock for NonceDiffs { impl CollectByTransaction for NonceDiffs { type Response = BlockTxsTraces; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { source.fetcher.trace_transaction_state_diffs(request.transaction_hash()?).await } diff --git a/crates/freeze/src/datasets/nonces.rs b/crates/freeze/src/datasets/nonces.rs index dfe30194..b96e3196 100644 --- a/crates/freeze/src/datasets/nonces.rs +++ b/crates/freeze/src/datasets/nonces.rs @@ -36,7 +36,11 @@ type BlockTxAddressOutput = (u32, Option>, Vec, u64); impl CollectByBlock for Nonces { type Response = BlockTxAddressOutput; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { let address = request.address()?; let block_number = request.block_number()? as u32; let output = source diff --git a/crates/freeze/src/datasets/storage_diffs.rs b/crates/freeze/src/datasets/storage_diffs.rs index 8230ea1f..7ad26f54 100644 --- a/crates/freeze/src/datasets/storage_diffs.rs +++ b/crates/freeze/src/datasets/storage_diffs.rs @@ -36,7 +36,11 @@ type Result = ::core::result::Result; impl CollectByBlock for StorageDiffs { type Response = BlockTxsTraces; - async fn extract(request: Params, source: Source, schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + schemas: Schemas, + ) -> Result { let schema = schemas.get(&Datatype::StorageDiffs).ok_or(err("schema not provided"))?; let include_txs = schema.has_column("transaction_hash"); source.fetcher.trace_block_state_diffs(request.block_number()? as u32, include_txs).await @@ -51,7 +55,11 @@ impl CollectByBlock for StorageDiffs { impl CollectByTransaction for StorageDiffs { type Response = BlockTxsTraces; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { source.fetcher.trace_transaction_state_diffs(request.transaction_hash()?).await } diff --git a/crates/freeze/src/datasets/storages.rs b/crates/freeze/src/datasets/storages.rs index f7399139..0f3aa07f 100644 --- a/crates/freeze/src/datasets/storages.rs +++ b/crates/freeze/src/datasets/storages.rs @@ -37,7 +37,11 @@ type BlockTxAddressOutput = (u32, Option>, Vec, Vec, Vec); impl CollectByBlock for Storages { type Response = BlockTxAddressOutput; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { let address = request.address()?; let block_number = request.block_number()? as u32; let slot = request.slot()?; diff --git a/crates/freeze/src/datasets/trace_calls.rs b/crates/freeze/src/datasets/trace_calls.rs index e67de8a0..91b996ae 100644 --- a/crates/freeze/src/datasets/trace_calls.rs +++ b/crates/freeze/src/datasets/trace_calls.rs @@ -53,7 +53,11 @@ type ContractCallDataTraces = (u32, Vec, Vec, Vec); impl CollectByBlock for TraceCalls { type Response = ContractCallDataTraces; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { let traces: Vec = source .fetcher .trace_call2( diff --git a/crates/freeze/src/datasets/traces.rs b/crates/freeze/src/datasets/traces.rs index b964f9a4..583f62a6 100644 --- a/crates/freeze/src/datasets/traces.rs +++ b/crates/freeze/src/datasets/traces.rs @@ -48,7 +48,11 @@ type Result = ::core::result::Result; impl CollectByBlock for Traces { type Response = Vec; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { source.fetcher.trace_block(request.block_number()?.into()).await } @@ -62,7 +66,11 @@ impl CollectByBlock for Traces { impl CollectByTransaction for Traces { type Response = Vec; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { source.fetcher.trace_transaction(request.ethers_transaction_hash()?).await } diff --git a/crates/freeze/src/datasets/transaction_addresses.rs b/crates/freeze/src/datasets/transaction_addresses.rs index 2e2d4f66..29a23c60 100644 --- a/crates/freeze/src/datasets/transaction_addresses.rs +++ b/crates/freeze/src/datasets/transaction_addresses.rs @@ -39,7 +39,11 @@ type BlockLogsTraces = (Block, Vec, Vec); impl CollectByBlock for TransactionAddresses { type Response = BlockLogsTraces; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { let block_number = request.ethers_block_number()?; let block = source.fetcher.get_block(request.block_number()?).await?; let block = block.ok_or(CollectError::CollectError("block not found".to_string()))?; @@ -66,7 +70,11 @@ impl CollectByBlock for TransactionAddresses { impl CollectByTransaction for TransactionAddresses { type Response = BlockLogsTraces; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { let tx_hash = request.ethers_transaction_hash()?; let tx_data = source.fetcher.get_transaction(tx_hash).await?.ok_or_else(|| { diff --git a/crates/freeze/src/datasets/transactions.rs b/crates/freeze/src/datasets/transactions.rs index 8bcac7b4..2e651353 100644 --- a/crates/freeze/src/datasets/transactions.rs +++ b/crates/freeze/src/datasets/transactions.rs @@ -46,7 +46,11 @@ type Result = ::core::result::Result; impl CollectByBlock for Transactions { type Response = (Block, Option>); - async fn extract(request: Params, source: Source, schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + schemas: Schemas, + ) -> Result { let block = source .fetcher .get_block_with_txs(request.block_number()?) @@ -84,7 +88,11 @@ impl CollectByBlock for Transactions { impl CollectByTransaction for Transactions { type Response = (Transaction, Option); - async fn extract(request: Params, source: Source, schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + schemas: Schemas, + ) -> Result { let tx_hash = request.ethers_transaction_hash()?; let schema = schemas.get(&Datatype::Transactions).ok_or(err("schema not provided"))?; let transaction = source diff --git a/crates/freeze/src/datasets/vm_traces.rs b/crates/freeze/src/datasets/vm_traces.rs index 5d92a54c..72cab3e9 100644 --- a/crates/freeze/src/datasets/vm_traces.rs +++ b/crates/freeze/src/datasets/vm_traces.rs @@ -48,7 +48,11 @@ type Result = ::core::result::Result; impl CollectByBlock for VmTraces { type Response = (Option, Option>, Vec); - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { source.fetcher.trace_block_vm_traces(request.block_number()? as u32).await } @@ -61,7 +65,11 @@ impl CollectByBlock for VmTraces { impl CollectByTransaction for VmTraces { type Response = (Option, Option>, Vec); - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { source.fetcher.trace_transaction_vm_traces(request.transaction_hash()?).await } diff --git a/crates/freeze/src/freeze.rs b/crates/freeze/src/freeze.rs index 57592de5..2c372bbe 100644 --- a/crates/freeze/src/freeze.rs +++ b/crates/freeze/src/freeze.rs @@ -6,6 +6,7 @@ use futures::{stream::FuturesUnordered, StreamExt}; use std::{ collections::{HashMap, HashSet}, path::PathBuf, + sync::Arc, }; use tokio::sync::Semaphore; @@ -14,7 +15,7 @@ type PartitionPayload = ( Partition, MetaDatatype, HashMap, - Source, + Arc, FileOutput, HashMap, ExecutionEnv, @@ -84,6 +85,7 @@ fn get_payloads( let semaphore = source .max_concurrent_chunks .map(|x| std::sync::Arc::new(tokio::sync::Semaphore::new(x as usize))); + let source = Arc::new(source.clone()); let mut payloads = Vec::new(); let mut skipping = Vec::new(); let mut all_paths = HashSet::new(); @@ -128,6 +130,7 @@ async fn freeze_partitions( skipped: Vec, ) -> FreezeSummary { if let Some(bar) = &env.bar { + bar.set_length(payloads.len() as u64); bar.inc(0); } diff --git a/crates/freeze/src/multi_datasets/blocks_and_transactions.rs b/crates/freeze/src/multi_datasets/blocks_and_transactions.rs index 9e29e6c4..49582d87 100644 --- a/crates/freeze/src/multi_datasets/blocks_and_transactions.rs +++ b/crates/freeze/src/multi_datasets/blocks_and_transactions.rs @@ -26,7 +26,11 @@ impl ToDataFrames for BlocksAndTransactions { impl CollectByBlock for BlocksAndTransactions { type Response = ::Response; - async fn extract(request: Params, source: Source, schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + schemas: Schemas, + ) -> Result { ::extract(request, source, schemas).await } @@ -47,7 +51,11 @@ impl CollectByTransaction for BlocksAndTransactions { ::Response, ); - async fn extract(request: Params, source: Source, schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + schemas: Schemas, + ) -> Result { let (tx, gas_used) = ::extract(request, source.clone(), schemas) .await?; diff --git a/crates/freeze/src/multi_datasets/call_trace_derivatives.rs b/crates/freeze/src/multi_datasets/call_trace_derivatives.rs index b1767d44..d8817003 100644 --- a/crates/freeze/src/multi_datasets/call_trace_derivatives.rs +++ b/crates/freeze/src/multi_datasets/call_trace_derivatives.rs @@ -38,7 +38,11 @@ impl ToDataFrames for CallTraceDerivatives { impl CollectByBlock for CallTraceDerivatives { type Response = Vec; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { source.fetcher.trace_block(request.block_number()?.into()).await } @@ -52,7 +56,11 @@ impl CollectByBlock for CallTraceDerivatives { impl CollectByTransaction for CallTraceDerivatives { type Response = Vec; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { source.fetcher.trace_transaction(request.ethers_transaction_hash()?).await } diff --git a/crates/freeze/src/multi_datasets/state_diffs.rs b/crates/freeze/src/multi_datasets/state_diffs.rs index e7c0a666..029db2b6 100644 --- a/crates/freeze/src/multi_datasets/state_diffs.rs +++ b/crates/freeze/src/multi_datasets/state_diffs.rs @@ -34,7 +34,11 @@ impl ToDataFrames for StateDiffs { impl CollectByBlock for StateDiffs { type Response = BlockTxsTraces; - async fn extract(request: Params, source: Source, schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + schemas: Schemas, + ) -> Result { let include_txs = schemas.values().any(|x| x.has_column("transaction_hash")); source.fetcher.trace_block_state_diffs(request.block_number()? as u32, include_txs).await } @@ -48,7 +52,11 @@ impl CollectByBlock for StateDiffs { impl CollectByTransaction for StateDiffs { type Response = BlockTxsTraces; - async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result { + async fn extract( + request: Params, + source: Arc, + _schemas: Schemas, + ) -> Result { source.fetcher.trace_transaction_state_diffs(request.transaction_hash()?).await } diff --git a/crates/freeze/src/types/chunks/chunk_ops.rs b/crates/freeze/src/types/chunks/chunk_ops.rs index 72f4ad1a..7a9e1551 100644 --- a/crates/freeze/src/types/chunks/chunk_ops.rs +++ b/crates/freeze/src/types/chunks/chunk_ops.rs @@ -1,4 +1,5 @@ use crate::{ChunkError, Datatype, FileError, FileOutput}; +use thousands::Separable; /// Trait for common chunk methods pub trait ChunkData: Sized { @@ -107,7 +108,7 @@ pub trait ValueToString { impl ValueToString for u64 { fn to_value_string(&self) -> String { - self.to_string() + self.separate_with_commas() } } diff --git a/crates/freeze/src/types/collection/collect_by_block.rs b/crates/freeze/src/types/collection/collect_by_block.rs index 6421a68f..76af1bfd 100644 --- a/crates/freeze/src/types/collection/collect_by_block.rs +++ b/crates/freeze/src/types/collection/collect_by_block.rs @@ -13,7 +13,7 @@ pub trait CollectByBlock: 'static + Send + Default + ToDataFrames { type Response: Send; /// fetch dataset data by block - async fn extract(_request: Params, _: Source, _: Schemas) -> Result { + async fn extract(_request: Params, _: Arc, _: Schemas) -> Result { Err(CollectError::CollectError("CollectByBlock not implemented".to_string())) } @@ -25,7 +25,7 @@ pub trait CollectByBlock: 'static + Send + Default + ToDataFrames { /// collect data into DataFrame async fn collect_by_block( partition: Partition, - source: Source, + source: Arc, schemas: &HashMap, inner_request_size: Option, ) -> Result> { diff --git a/crates/freeze/src/types/collection/collect_by_transaction.rs b/crates/freeze/src/types/collection/collect_by_transaction.rs index 1b67d1ee..421865c8 100644 --- a/crates/freeze/src/types/collection/collect_by_transaction.rs +++ b/crates/freeze/src/types/collection/collect_by_transaction.rs @@ -13,7 +13,7 @@ pub trait CollectByTransaction: 'static + Send + Default + ToDataFrames { type Response: Send; /// fetch dataset data by transaction - async fn extract(_request: Params, _: Source, _: Schemas) -> Result { + async fn extract(_request: Params, _: Arc, _: Schemas) -> Result { Err(CollectError::CollectError("CollectByTransaction not implemented".to_string())) } @@ -25,7 +25,7 @@ pub trait CollectByTransaction: 'static + Send + Default + ToDataFrames { /// collect data into DataFrame async fn collect_by_transaction( partition: Partition, - source: Source, + source: Arc, schemas: &HashMap, inner_request_size: Option, ) -> Result> { diff --git a/crates/freeze/src/types/collection/collect_generic.rs b/crates/freeze/src/types/collection/collect_generic.rs index 0f8c1ca8..838bb99a 100644 --- a/crates/freeze/src/types/collection/collect_generic.rs +++ b/crates/freeze/src/types/collection/collect_generic.rs @@ -9,7 +9,7 @@ pub async fn collect_partition( time_dimension: TimeDimension, datatype: MetaDatatype, partition: Partition, - source: Source, + source: Arc, schemas: HashMap, ) -> Result, CollectError> { match time_dimension { @@ -24,7 +24,7 @@ pub async fn collect_partition( pub async fn fetch_partition( f_request: F, partition: Partition, - source: Source, + source: Arc, inner_request_size: Option, schemas: HashMap, sender: mpsc::Sender>, @@ -32,7 +32,7 @@ pub async fn fetch_partition( where F: Copy + Send - + for<'a> Fn(Params, Source, HashMap) -> Fut + + for<'a> Fn(Params, Arc, HashMap) -> Fut + std::marker::Sync + 'static, Fut: Future> + Send + 'static, diff --git a/crates/freeze/src/types/datatypes/datatype_macros.rs b/crates/freeze/src/types/datatypes/datatype_macros.rs index bebcf1a3..ed02631c 100644 --- a/crates/freeze/src/types/datatypes/datatype_macros.rs +++ b/crates/freeze/src/types/datatypes/datatype_macros.rs @@ -95,7 +95,7 @@ macro_rules! define_datatypes { pub async fn collect_by_block( datatype: MetaDatatype, partition: Partition, - source: Source, + source: Arc, schemas: HashMap, ) -> Result, CollectError> { let task = match datatype { @@ -130,7 +130,7 @@ macro_rules! define_datatypes { pub async fn collect_by_transaction( datatype: MetaDatatype, partition: Partition, - source: Source, + source: Arc, schemas: HashMap, ) -> Result, CollectError> { let task = match datatype { diff --git a/crates/freeze/src/types/sources.rs b/crates/freeze/src/types/sources.rs index 9fe9f3c8..8ad883fe 100644 --- a/crates/freeze/src/types/sources.rs +++ b/crates/freeze/src/types/sources.rs @@ -22,8 +22,14 @@ pub struct Source { pub chain_id: u64, /// number of blocks per log request pub inner_request_size: u64, + /// Maximum requests collected concurrently + pub max_concurrent_requests: Option, /// Maximum chunks collected concurrently pub max_concurrent_chunks: Option, + /// Maximum requests per second + pub max_requests_per_second: Option, + /// Rpc Url + pub rpc_url: String, } /// Wrapper over `Provider

` that adds concurrency and rate limiting controls diff --git a/crates/freeze/src/types/summaries.rs b/crates/freeze/src/types/summaries.rs index 68ed3d1d..d5dd46e2 100644 --- a/crates/freeze/src/types/summaries.rs +++ b/crates/freeze/src/types/summaries.rs @@ -67,37 +67,68 @@ pub(crate) fn print_cryo_intro( ) { print_header("cryo parameters"); let datatype_strs: Vec<_> = query.schemas.keys().map(|d| d.name()).collect(); - print_bullet("datatypes", datatype_strs.join(", ")); - print_bullet("network", &sink.prefix); + print_bullet("data", ""); + print_bullet_indent("datatypes", datatype_strs.join(", "), 4); // let rpc_url = cli::parse_rpc_url(args); // print_bullet("provider", rpc_url); print_chunks(&query.partitions); - print_bullet( - "chunks to collect", - format!( - "{} / {}", - n_chunks_remaining.separate_with_commas(), - query.partitions.len().separate_with_commas() + + print_bullet("source", ""); + print_bullet_indent("network", &sink.prefix, 4); + print_bullet_indent("rpc url", &source.rpc_url, 4); + match source.max_requests_per_second { + Some(max_requests_per_second) => print_bullet_indent( + "max requests per second", + max_requests_per_second.separate_with_commas(), + 4, ), - ); + None => print_bullet_indent("max requests per second", "unlimited", 4), + }; + match source.max_concurrent_requests { + Some(max_concurrent_requests) => print_bullet_indent( + "max concurrent requests", + max_concurrent_requests.separate_with_commas(), + 4, + ), + None => print_bullet_indent("max concurrent requests", "unlimited", 4), + }; match source.max_concurrent_chunks { - Some(max_concurrent_chunks) => { - print_bullet("max concurrent chunks", max_concurrent_chunks.separate_with_commas()) - } - None => print_bullet("max concurrent chunks:", "[none]"), + Some(max_concurrent_chunks) => print_bullet_indent( + "max concurrent chunks", + max_concurrent_chunks.separate_with_commas(), + 4, + ), + None => print_bullet_indent("max concurrent chunks:", "unlimited", 4), }; + if query.schemas.contains_key(&Datatype::Logs) { - print_bullet("inner request size", source.inner_request_size.to_string()); + print_bullet_indent("inner request size", source.inner_request_size.to_string(), 4); }; - print_bullet("output format", sink.format.as_str()); - print_bullet("output dir", sink.output_dir.to_string_lossy()); + + print_bullet("output", ""); + if let Some(partition) = query.partitions.first() { + let stats = partition.stats(); + if let Some(dim) = query.partitioned_by.first() { + if dim == &Dim::BlockNumber { + if let Some(block_numbers) = stats.block_numbers { + let chunk_size = block_numbers.chunk_size; + print_bullet_indent("chunk size", chunk_size.separate_with_commas(), 4); + } + } + } + } + print_bullet_indent("n chunks", query.partitions.len().separate_with_commas(), 4); + print_bullet_indent("chunks remaining", n_chunks_remaining.to_string(), 4); + print_bullet_indent("output format", sink.format.as_str(), 4); + print_bullet_indent("output dir", sink.output_dir.clone().to_string_lossy(), 4); // print report path let report_path = if env.report && n_chunks_remaining > 0 { match super::reports::get_report_path(env, sink, true) { Ok(report_path) => { - let stripped_path: PathBuf = match report_path.strip_prefix("./") { - Ok(stripped) => PathBuf::from(stripped), + let stripped_path: PathBuf = match report_path.strip_prefix(sink.output_dir.clone()) + { + Ok(stripped) => PathBuf::from("$OUTPUT_DIR").join(PathBuf::from(stripped)), Err(_) => report_path, }; Some(stripped_path) @@ -108,11 +139,10 @@ pub(crate) fn print_cryo_intro( None }; match report_path { - None => print_bullet("report file", "None"), - Some(path) => print_bullet("report file", path.to_str().unwrap_or("none")), + None => print_bullet_indent("report file", "None", 4), + Some(path) => print_bullet_indent("report file", path.to_str().unwrap_or("none"), 4), }; let dt_start: DateTime = env.t_start.into(); - print_bullet("t_start", dt_start.format("%Y-%m-%d %H:%M:%S%.3f").to_string()); // print schemas print_schemas(&query.schemas); @@ -123,28 +153,29 @@ pub(crate) fn print_cryo_intro( println!(); println!(); - print_header("collecting data") + print_header("collecting data"); + println!("started at {}", dt_start.format("%Y-%m-%d %H:%M:%S%.3f")); } fn print_chunks(chunks: &[Partition]) { let stats = crate::types::partitions::meta_chunks_stats(chunks); - for (dim, dim_stats) in [("block", stats.block_numbers)].iter() { + for (dim, dim_stats) in [(Dim::BlockNumber, stats.block_numbers)].iter() { if let Some(dim_stats) = dim_stats { print_chunk(dim, dim_stats) } } for (dim, dim_stats) in vec![ - ("transaction", stats.transactions), - ("call_data", stats.call_datas), - ("address", stats.addresses), - ("contract", stats.contracts), - ("to_address", stats.to_addresses), - ("slot", stats.slots), - ("topic0", stats.topic0s), - ("topic1", stats.topic1s), - ("topic2", stats.topic2s), - ("topic3", stats.topic3s), + (Dim::TransactionHash, stats.transactions), + (Dim::CallData, stats.call_datas), + (Dim::Address, stats.addresses), + (Dim::Contract, stats.contracts), + (Dim::ToAddress, stats.to_addresses), + (Dim::Slot, stats.slots), + (Dim::Topic0, stats.topic0s), + (Dim::Topic1, stats.topic1s), + (Dim::Topic2, stats.topic2s), + (Dim::Topic3, stats.topic3s), ] .iter() { @@ -154,20 +185,31 @@ fn print_chunks(chunks: &[Partition]) { } } -fn print_chunk(dim: &str, dim_stats: &ChunkStats) { +fn print_chunk(dim: &Dim, dim_stats: &ChunkStats) { if dim_stats.total_values == 1 { - print_bullet(dim, dim_stats.min_value_to_string().unwrap_or("none".to_string())); + print_bullet_indent( + format!("{}", dim), + dim_stats.min_value_to_string().unwrap_or("none".to_string()), + 4, + ); } else { - print_bullet(format!("{} values", dim), ""); - if let Some(min_value_string) = dim_stats.min_value_to_string() { - print_bullet_indent("min", min_value_string, 4); - }; - if let Some(max_value_string) = dim_stats.max_value_to_string() { - print_bullet_indent("max", max_value_string, 4); - }; - print_bullet_indent("n_values", dim_stats.total_values.to_string(), 4); - print_bullet_indent("n_chunks", dim_stats.n_chunks.to_string(), 4); - print_bullet_indent("chunk size", dim_stats.chunk_size.to_string(), 4); + match (dim_stats.min_value_to_string(), dim_stats.max_value_to_string()) { + (Some(min), Some(max)) => print_bullet_indent( + dim.plural_name(), + format!( + "n={} min={} max={}", + dim_stats.total_values.separate_with_commas(), + min, + max + ), + 4, + ), + _ => print_bullet_indent( + dim.plural_name(), + format!("n={}", dim_stats.total_values.separate_with_commas()), + 4, + ), + } } } @@ -213,8 +255,23 @@ pub(crate) fn print_cryo_conclusion( query: &Query, env: &ExecutionEnv, ) { + let new_env = match env.t_end { + None => Some(env.clone().set_end_time()), + Some(_) => None, + }; + let env: &ExecutionEnv = match &new_env { + Some(e) => e, + None => env, + }; + let t_end = match env.t_end { + Some(t_end) => t_end, + _ => return, + }; + let dt_data_done: DateTime = t_end.into(); + + println!(" done at {}", dt_data_done.format("%Y-%m-%d %H:%M:%S%.3f").to_string().as_str()); + if freeze_summary.errored.is_empty() { - println!("...done") } else { println!("...done (errors in {} chunks)", freeze_summary.errored.len()) }; @@ -223,6 +280,7 @@ pub(crate) fn print_cryo_conclusion( if !freeze_summary.errored.is_empty() { print_header_error("error summary"); + println!("(errors in {} chunks)", freeze_summary.errored.len()); let mut error_counts: HashMap = HashMap::new(); for (_partition, error) in freeze_summary.errored.iter() { *error_counts.entry(error.to_string()).or_insert(0) += 1; @@ -237,21 +295,6 @@ pub(crate) fn print_cryo_conclusion( println!(); } - let new_env = match env.t_end { - None => Some(env.clone().set_end_time()), - Some(_) => None, - }; - let env: &ExecutionEnv = match &new_env { - Some(e) => e, - None => env, - }; - - let dt_start: DateTime = env.t_start.into(); - let t_end = match env.t_end { - Some(t_end) => t_end, - _ => return, - }; - let dt_data_done: DateTime = t_end.into(); let duration = match t_end.duration_since(env.t_start) { Ok(duration) => duration, Err(_e) => { @@ -266,21 +309,17 @@ pub(crate) fn print_cryo_conclusion( print_header("collection summary"); print_bullet("total duration", duration_string); - print_bullet_indent("t_start", dt_start.format("%Y-%m-%d %H:%M:%S%.3f").to_string(), 4); - print_bullet_indent( - "t_end", - " ".to_string() + dt_data_done.format("%Y-%m-%d %H:%M:%S%.3f").to_string().as_str(), - 4, - ); - let n_chunks = query.partitions.len().separate_with_commas(); - let width = n_chunks.len(); - print_bullet("total chunks", n_chunks.clone()); + let n_chunks = query.partitions.len(); + let n_chunks_str = n_chunks.separate_with_commas(); + let width = n_chunks_str.len(); + print_bullet("total chunks", n_chunks_str.clone()); print_bullet_indent( "chunks errored", format!( - " {:>width$} / {}", + " {:>width$} / {} ({}%)", freeze_summary.errored.len().separate_with_commas(), - n_chunks, + &n_chunks_str, + format_float((100 * freeze_summary.errored.len() / n_chunks) as f64), width = width ), 4, @@ -288,9 +327,10 @@ pub(crate) fn print_cryo_conclusion( print_bullet_indent( "chunks skipped", format!( - " {:>width$} / {}", + " {:>width$} / {} ({}%)", freeze_summary.skipped.len().separate_with_commas(), - n_chunks, + n_chunks_str, + format_float((100 * freeze_summary.skipped.len() / n_chunks) as f64), width = width ), 4, @@ -298,15 +338,16 @@ pub(crate) fn print_cryo_conclusion( print_bullet_indent( "chunks collected", format!( - "{:>width$} / {}", + "{:>width$} / {} ({}%)", freeze_summary.completed.len().separate_with_commas(), - n_chunks, + n_chunks_str, + format_float((100 * freeze_summary.completed.len() / n_chunks) as f64), width = width ), 4, ); - print_chunks_speeds(query.partitions.clone(), &query.partitioned_by, total_time); + print_chunks_speeds(freeze_summary.completed.clone(), &query.partitioned_by, total_time); } macro_rules! print_dim_speed { @@ -344,22 +385,46 @@ fn print_unit_speeds(name: String, n_completed: u64, total_time: f64) { let per_minute = per_second * 60.0; let per_hour = per_minute * 60.0; let per_day = per_hour * 24.0; - print_bullet("total ".to_string() + name.as_str(), n_completed.separate_with_commas()); - print_bullet_indent(name.clone() + " per second", format_float(per_second), 4); - print_bullet_indent(name.clone() + " per minute", format_float(per_minute), 4); + + let per_day_str = format_float(per_day); + let per_hour_str = format_float(per_hour); + let per_minute_str = format_float(per_minute); + let per_second_str = format_float(per_second); + + let width = per_day_str.len(); + + print_bullet(name.clone() + " collected", n_completed.separate_with_commas()); + print_bullet_indent( + name.clone() + " per second", + format!("{:>width$}", per_second_str, width = width - 3), + 4, + ); + print_bullet_indent( + name.clone() + " per minute", + format!("{:>width$}", per_minute_str, width = width - 3), + 4, + ); print_bullet_indent( name.clone() + " per hour", - " ".to_string() + format_float(per_hour).as_str(), + format!("{:>width$}", per_hour_str, width = std::cmp::max(5, width - 1)), 4, ); - print_bullet_indent(name + " per day", " ".to_string() + format_float(per_day).as_str(), 4); + print_bullet_indent(name + " per day", format!("{:>width$}", per_day_str, width = 6), 4); } fn format_float(number: f64) -> String { - round_to_decimal_places(number, 1).separate_with_commas() -} + let decimal_places = 1; + + let int_part = number.trunc() as i64; + let frac_multiplier = 10f64.powi(decimal_places as i32); + let frac_part = (number.fract() * frac_multiplier).round() as usize; + + if frac_part == 0 { + return format!("{}.0", int_part.separate_with_commas()) + } + + let frac_str = + format!("{:0>width$}", frac_part, width = decimal_places).trim_end_matches('0').to_string(); -fn round_to_decimal_places(number: f64, dp: u32) -> f64 { - let multiplier = 10f64.powi(dp as i32); - (number * multiplier).round() / multiplier + format!("{}.{}", int_part.separate_with_commas(), frac_str) } diff --git a/crates/python/src/collect_adapter.rs b/crates/python/src/collect_adapter.rs index 0460f12c..4f6e7ca8 100644 --- a/crates/python/src/collect_adapter.rs +++ b/crates/python/src/collect_adapter.rs @@ -187,7 +187,7 @@ async fn run_collect(args: Args) -> PolarsResult { Ok(opts) => opts, Err(e) => panic!("error parsing opts {:?}", e), }; - match collect(query, source).await { + match collect(query, source.into()).await { Ok(df) => Ok(df), Err(e) => panic!("error collecting {:?}", e), }