Skip to content

Commit

Permalink
more info + readability for outputs
Browse files Browse the repository at this point in the history
  • Loading branch information
sslivkoff committed Oct 13, 2023
1 parent 726ab18 commit a247da8
Show file tree
Hide file tree
Showing 39 changed files with 400 additions and 145 deletions.
3 changes: 3 additions & 0 deletions crates/cli/src/parse/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ pub(crate) async fn parse_source(args: &Args) -> Result<Source, ParseError> {
fetcher: Arc::new(fetcher),
chain_id,
inner_request_size: args.inner_request_size,
max_concurrent_requests: args.requests_per_second.map(|x| x as u64),
max_concurrent_chunks,
max_requests_per_second: args.requests_per_second.map(|x| x as u64),
rpc_url,
};

Ok(output)
Expand Down
3 changes: 2 additions & 1 deletion crates/cli/src/run.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{args, parse};
use cryo_freeze::{CollectError, ExecutionEnv, FreezeSummary};
use std::time::SystemTime;
use std::{sync::Arc, time::SystemTime};

/// run cli
pub async fn run(args: args::Args) -> Result<Option<FreezeSummary>, CollectError> {
Expand All @@ -9,6 +9,7 @@ pub async fn run(args: args::Args) -> Result<Option<FreezeSummary>, CollectError
Ok(opts) => opts,
Err(e) => return Err(e.into()),
};
let source = Arc::new(source);
let env = ExecutionEnv { t_start_parse, ..env };
let env = env.set_start_time();
cryo_freeze::freeze(&query, &source, &sink, &env).await
Expand Down
2 changes: 1 addition & 1 deletion crates/freeze/src/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{collect_partition, CollectError, Query, Source};
use polars::prelude::*;

/// collect single dataframe
pub async fn collect(query: Query, source: Source) -> Result<DataFrame, CollectError> {
pub async fn collect(query: Query, source: Arc<Source>) -> Result<DataFrame, CollectError> {
query.is_valid()?;
let datatype = if query.datatypes.len() != 1 {
return Err(CollectError::CollectError(
Expand Down
12 changes: 10 additions & 2 deletions crates/freeze/src/datasets/balance_diffs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ impl Dataset for BalanceDiffs {
impl CollectByBlock for BalanceDiffs {
type Response = BlockTxsTraces;

async fn extract(request: Params, source: Source, schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
schemas: Schemas,
) -> Result<Self::Response> {
let schema = schemas.get(&Datatype::BalanceDiffs).ok_or(err("schema not provided"))?;
let include_txs = schema.has_column("transaction_hash");
source.fetcher.trace_block_state_diffs(request.block_number()? as u32, include_txs).await
Expand All @@ -50,7 +54,11 @@ impl CollectByBlock for BalanceDiffs {
impl CollectByTransaction for BalanceDiffs {
type Response = (Option<u32>, Vec<Option<Vec<u8>>>, Vec<ethers::types::BlockTrace>);

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
source.fetcher.trace_transaction_state_diffs(request.transaction_hash()?).await
}

Expand Down
6 changes: 5 additions & 1 deletion crates/freeze/src/datasets/balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ type BlockTxAddressOutput = (u32, Option<Vec<u8>>, Vec<u8>, U256);
impl CollectByBlock for Balances {
type Response = BlockTxAddressOutput;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
let address = request.address()?;
let block_number = request.block_number()? as u32;
let balance =
Expand Down
12 changes: 10 additions & 2 deletions crates/freeze/src/datasets/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ type Result<T> = ::core::result::Result<T, CollectError>;
impl CollectByBlock for Blocks {
type Response = Block<TxHash>;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
let block = source
.fetcher
.get_block(request.block_number()?)
Expand All @@ -74,7 +78,11 @@ impl CollectByBlock for Blocks {
impl CollectByTransaction for Blocks {
type Response = Block<TxHash>;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
let transaction = source
.fetcher
.get_transaction(request.ethers_transaction_hash()?)
Expand Down
12 changes: 10 additions & 2 deletions crates/freeze/src/datasets/code_diffs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ type Result<T> = ::core::result::Result<T, CollectError>;
impl CollectByBlock for CodeDiffs {
type Response = BlockTxsTraces;

async fn extract(request: Params, source: Source, schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
schemas: Schemas,
) -> Result<Self::Response> {
let schema = schemas.get(&Datatype::CodeDiffs).ok_or(err("schema not provided"))?;
let include_txs = schema.has_column("transaction_hash");
source.fetcher.trace_block_state_diffs(request.block_number()? as u32, include_txs).await
Expand All @@ -50,7 +54,11 @@ impl CollectByBlock for CodeDiffs {
impl CollectByTransaction for CodeDiffs {
type Response = (Option<u32>, Vec<Option<Vec<u8>>>, Vec<ethers::types::BlockTrace>);

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
source.fetcher.trace_transaction_state_diffs(request.transaction_hash()?).await
}

Expand Down
6 changes: 5 additions & 1 deletion crates/freeze/src/datasets/codes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ type BlockTxAddressOutput = (u32, Option<Vec<u8>>, Vec<u8>, Vec<u8>);
impl CollectByBlock for Codes {
type Response = BlockTxAddressOutput;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
let address = request.address()?;
let block_number = request.block_number()? as u32;
let output =
Expand Down
12 changes: 10 additions & 2 deletions crates/freeze/src/datasets/contracts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ type Result<T> = ::core::result::Result<T, CollectError>;
impl CollectByBlock for Contracts {
type Response = Vec<Trace>;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
source.fetcher.trace_block(request.ethers_block_number()?).await
}

Expand All @@ -54,7 +58,11 @@ impl CollectByBlock for Contracts {
impl CollectByTransaction for Contracts {
type Response = Vec<Trace>;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
source.fetcher.trace_transaction(request.ethers_transaction_hash()?).await
}

Expand Down
6 changes: 5 additions & 1 deletion crates/freeze/src/datasets/erc20_balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ type BlockErc20AddressBalance = (u32, Vec<u8>, Vec<u8>, Option<U256>);
impl CollectByBlock for Erc20Balances {
type Response = BlockErc20AddressBalance;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
let signature = FUNCTION_ERC20_BALANCE_OF.clone();
let mut call_data = signature.clone();
call_data.extend(vec![0; 12]);
Expand Down
6 changes: 5 additions & 1 deletion crates/freeze/src/datasets/erc20_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ pub(crate) fn remove_control_characters(s: &str) -> String {
impl CollectByBlock for Erc20Metadata {
type Response = BlockAddressNameSymbolDecimals;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
let block_number = request.ethers_block_number()?;
let address = request.ethers_address()?;

Expand Down
6 changes: 5 additions & 1 deletion crates/freeze/src/datasets/erc20_supplies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ type BlockErc20Supply = (u32, Vec<u8>, Option<U256>);
impl CollectByBlock for Erc20Supplies {
type Response = BlockErc20Supply;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
let signature: Vec<u8> = FUNCTION_ERC20_TOTAL_SUPPLY.clone();
let mut call_data = signature.clone();
call_data.extend(request.contract()?);
Expand Down
12 changes: 10 additions & 2 deletions crates/freeze/src/datasets/erc20_transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ type Result<T> = ::core::result::Result<T, CollectError>;
impl CollectByBlock for Erc20Transfers {
type Response = Vec<Log>;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
let topics = [Some(ValueOrArray::Value(Some(*EVENT_ERC20_TRANSFER))), None, None, None];
let filter = Filter { topics, ..request.ethers_log_filter()? };
let logs = source.fetcher.get_logs(&filter).await?;
Expand All @@ -61,7 +65,11 @@ impl CollectByBlock for Erc20Transfers {
impl CollectByTransaction for Erc20Transfers {
type Response = Vec<Log>;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
let logs = source.fetcher.get_transaction_logs(request.transaction_hash()?).await?;
Ok(logs.into_iter().filter(is_erc20_transfer).collect())
}
Expand Down
6 changes: 5 additions & 1 deletion crates/freeze/src/datasets/erc721_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ type BlockAddressNameSymbol = (u32, Vec<u8>, Option<String>, Option<String>);
impl CollectByBlock for Erc721Metadata {
type Response = BlockAddressNameSymbol;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
let block_number = request.ethers_block_number()?;
let address = request.ethers_contract()?;

Expand Down
12 changes: 10 additions & 2 deletions crates/freeze/src/datasets/erc721_transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ type Result<T> = ::core::result::Result<T, CollectError>;
impl CollectByBlock for Erc721Transfers {
type Response = Vec<Log>;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
let topics = [Some(ValueOrArray::Value(Some(*EVENT_ERC721_TRANSFER))), None, None, None];
let filter = Filter { topics, ..request.ethers_log_filter()? };
let logs = source.fetcher.get_logs(&filter).await?;
Expand All @@ -61,7 +65,11 @@ impl CollectByBlock for Erc721Transfers {
impl CollectByTransaction for Erc721Transfers {
type Response = Vec<Log>;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
let logs = source.fetcher.get_transaction_logs(request.transaction_hash()?).await?;
Ok(logs.into_iter().filter(is_erc721_transfer).collect())
}
Expand Down
6 changes: 5 additions & 1 deletion crates/freeze/src/datasets/eth_calls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ type EthCallsResponse = (u32, Vec<u8>, Vec<u8>, Vec<u8>);
impl CollectByBlock for EthCalls {
type Response = EthCallsResponse;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
let transaction = TransactionRequest {
to: Some(request.ethers_contract()?.into()),
data: Some(request.call_data()?.into()),
Expand Down
12 changes: 10 additions & 2 deletions crates/freeze/src/datasets/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ type Result<T> = ::core::result::Result<T, CollectError>;
impl CollectByBlock for Logs {
type Response = Vec<Log>;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
source.fetcher.get_logs(&request.ethers_log_filter()?).await
}

Expand All @@ -66,7 +70,11 @@ impl CollectByBlock for Logs {
impl CollectByTransaction for Logs {
type Response = Vec<Log>;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
source.fetcher.get_transaction_logs(request.transaction_hash()?).await
}

Expand Down
12 changes: 10 additions & 2 deletions crates/freeze/src/datasets/native_transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ type Result<T> = ::core::result::Result<T, CollectError>;
impl CollectByBlock for NativeTransfers {
type Response = Vec<Trace>;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
source.fetcher.trace_block(request.block_number()?.into()).await
}

Expand All @@ -49,7 +53,11 @@ impl CollectByBlock for NativeTransfers {
impl CollectByTransaction for NativeTransfers {
type Response = Vec<Trace>;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
source.fetcher.trace_transaction(request.ethers_transaction_hash()?).await
}

Expand Down
12 changes: 10 additions & 2 deletions crates/freeze/src/datasets/nonce_diffs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ type Result<T> = ::core::result::Result<T, CollectError>;
impl CollectByBlock for NonceDiffs {
type Response = BlockTxsTraces;

async fn extract(request: Params, source: Source, schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
schemas: Schemas,
) -> Result<Self::Response> {
let schema = schemas.get(&Datatype::NonceDiffs).ok_or(err("schema not provided"))?;
let include_txs = schema.has_column("transaction_hash");
source.fetcher.trace_block_state_diffs(request.block_number()? as u32, include_txs).await
Expand All @@ -50,7 +54,11 @@ impl CollectByBlock for NonceDiffs {
impl CollectByTransaction for NonceDiffs {
type Response = BlockTxsTraces;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
source.fetcher.trace_transaction_state_diffs(request.transaction_hash()?).await
}

Expand Down
6 changes: 5 additions & 1 deletion crates/freeze/src/datasets/nonces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ type BlockTxAddressOutput = (u32, Option<Vec<u8>>, Vec<u8>, u64);
impl CollectByBlock for Nonces {
type Response = BlockTxAddressOutput;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
let address = request.address()?;
let block_number = request.block_number()? as u32;
let output = source
Expand Down
12 changes: 10 additions & 2 deletions crates/freeze/src/datasets/storage_diffs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ type Result<T> = ::core::result::Result<T, CollectError>;
impl CollectByBlock for StorageDiffs {
type Response = BlockTxsTraces;

async fn extract(request: Params, source: Source, schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
schemas: Schemas,
) -> Result<Self::Response> {
let schema = schemas.get(&Datatype::StorageDiffs).ok_or(err("schema not provided"))?;
let include_txs = schema.has_column("transaction_hash");
source.fetcher.trace_block_state_diffs(request.block_number()? as u32, include_txs).await
Expand All @@ -51,7 +55,11 @@ impl CollectByBlock for StorageDiffs {
impl CollectByTransaction for StorageDiffs {
type Response = BlockTxsTraces;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
source.fetcher.trace_transaction_state_diffs(request.transaction_hash()?).await
}

Expand Down
Loading

0 comments on commit a247da8

Please sign in to comment.