Skip to content

Commit

Permalink
Merge pull request #73 from paradigmxyz/revamp_multidatatypes
Browse files Browse the repository at this point in the history
revamp multidatatypes
  • Loading branch information
sslivkoff authored Oct 12, 2023
2 parents 2d7ff17 + 017b439 commit d1e3b00
Show file tree
Hide file tree
Showing 16 changed files with 208 additions and 81 deletions.
19 changes: 8 additions & 11 deletions crates/cli/src/parse/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,15 +222,12 @@ async fn parse_block_token<P: JsonRpcClient>(
}
};

let end_block = if second_ref != &"latest" &&
second_ref != &"" &&
!second_ref.starts_with('+') &&
!first_ref.starts_with('-')
{
end_block - 1
} else {
end_block
};
let end_block =
if second_ref != &"latest" && second_ref != &"" && !first_ref.starts_with('-') {
end_block - 1
} else {
end_block
};

let start_block =
if first_ref.starts_with('-') { start_block + 1 } else { start_block };
Expand Down Expand Up @@ -476,7 +473,7 @@ mod tests {
(BlockTokenTest::WithoutMock((r"1:2", BlockChunk::Range(1, 1))), true), /* Single block range */
(BlockTokenTest::WithoutMock((r"0:2", BlockChunk::Range(0, 1))), true), /* Implicit start */
(BlockTokenTest::WithoutMock((r"-10:100", BlockChunk::Range(91, 100))), true), /* Relative negative */
(BlockTokenTest::WithoutMock((r"10:+100", BlockChunk::Range(10, 110))), true), /* Relative positive */
(BlockTokenTest::WithoutMock((r"10:+100", BlockChunk::Range(10, 109))), true), /* Relative positive */
(BlockTokenTest::WithMock((r"1:latest", BlockChunk::Range(1, 12), 12)), true), /* Explicit latest */
(BlockTokenTest::WithMock((r"1:", BlockChunk::Range(1, 12), 12)), true), /* Implicit latest */
// Number type
Expand Down Expand Up @@ -517,7 +514,7 @@ mod tests {
BlockInputTest::WithoutMock((
&block_inputs_multiple_complex,
vec![
BlockChunk::Numbers(vec![15000000, 15000001]),
BlockChunk::Numbers(vec![15000000]),
BlockChunk::Numbers(vec![1000, 1001]),
BlockChunk::Numbers(vec![999999998, 999999999, 1000000000]),
BlockChunk::Numbers(vec![2000]),
Expand Down
16 changes: 10 additions & 6 deletions crates/freeze/src/datasets/contracts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ impl CollectByBlock for Contracts {
}

fn transform(response: Self::Response, columns: &mut Self, schemas: &Schemas) -> Result<()> {
let schema = schemas.get(&Datatype::Contracts).ok_or(err("schema not provided"))?;
process_contracts(response, columns, schema)
let traces = traces::filter_failed_traces(response);
process_contracts(&traces, columns, schemas)
}
}

Expand All @@ -59,14 +59,18 @@ impl CollectByTransaction for Contracts {
}

fn transform(response: Self::Response, columns: &mut Self, schemas: &Schemas) -> Result<()> {
let schema = schemas.get(&Datatype::Contracts).ok_or(err("schema not provided"))?;
process_contracts(response, columns, schema)
let traces = traces::filter_failed_traces(response);
process_contracts(&traces, columns, schemas)
}
}

/// process block into columns
fn process_contracts(traces: Vec<Trace>, columns: &mut Contracts, schema: &Table) -> Result<()> {
let traces = traces::filter_failed_traces(traces);
pub(crate) fn process_contracts(
traces: &[Trace],
columns: &mut Contracts,
schemas: &Schemas,
) -> Result<()> {
let schema = schemas.get(&Datatype::Contracts).ok_or(err("schema not provided"))?;
let mut deployer = H160([0; 20]);
let mut create_index = 0;
for trace in traces.iter() {
Expand Down
76 changes: 48 additions & 28 deletions crates/freeze/src/datasets/mod.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,55 @@
mod balance_diffs;
mod balances;
mod blocks;
mod blocks_and_transactions;
mod code_diffs;
mod codes;
mod contracts;
mod erc20_balances;
mod erc20_metadata;
mod erc20_supplies;
mod erc20_transfers;
mod erc721_metadata;
mod erc721_transfers;
mod eth_calls;
mod logs;
mod native_transfers;
mod nonce_diffs;
mod nonces;
mod state_diffs;
mod storage_diffs;
mod storages;
mod trace_calls;
mod traces;
mod transaction_addresses;
mod transactions;
mod vm_traces;
/// balance diffs
pub mod balance_diffs;
/// balances
pub mod balances;
/// blocks
pub mod blocks;
/// code diffs
pub mod code_diffs;
/// codes
pub mod codes;
/// contracts
pub mod contracts;
/// erc20 balances
pub mod erc20_balances;
/// erc20 metadata
pub mod erc20_metadata;
/// erc20 supplies
pub mod erc20_supplies;
/// erc20 transfers
pub mod erc20_transfers;
/// erc721 metadata
pub mod erc721_metadata;
/// erc721 transfers
pub mod erc721_transfers;
/// eth calls
pub mod eth_calls;
/// logs
pub mod logs;
/// native transfers
pub mod native_transfers;
/// nonce diffs
pub mod nonce_diffs;
/// nonces
pub mod nonces;
/// storage diffs
pub mod storage_diffs;
/// storages
pub mod storages;
/// trace calls
pub mod trace_calls;
/// traces
pub mod traces;
/// transaction addresses
pub mod transaction_addresses;
/// transactions
pub mod transactions;
/// vm traces
pub mod vm_traces;

pub use balance_diffs::*;
pub use balances::*;
pub use blocks::*;
pub use blocks_and_transactions::*;
pub use code_diffs::*;
pub use codes::*;
pub use contracts::*;
Expand All @@ -43,7 +64,6 @@ pub use logs::*;
pub use native_transfers::*;
pub use nonce_diffs::*;
pub use nonces::*;
pub use state_diffs::*;
pub use storage_diffs::*;
pub use storages::*;
pub use trace_calls::*;
Expand Down
15 changes: 8 additions & 7 deletions crates/freeze/src/datasets/native_transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ impl CollectByBlock for NativeTransfers {
}

fn transform(response: Self::Response, columns: &mut Self, schemas: &Schemas) -> Result<()> {
let schema = schemas.get(&Datatype::NativeTransfers).ok_or(err("schema not provided"))?;
process_native_transfers(response, columns, schema)
let traces = traces::filter_failed_traces(response);
process_native_transfers(&traces, columns, schemas)
}
}

Expand All @@ -54,17 +54,18 @@ impl CollectByTransaction for NativeTransfers {
}

fn transform(response: Self::Response, columns: &mut Self, schemas: &Schemas) -> Result<()> {
let schema = schemas.get(&Datatype::NativeTransfers).ok_or(err("schema not provided"))?;
process_native_transfers(response, columns, schema)
let traces = traces::filter_failed_traces(response);
process_native_transfers(&traces, columns, schemas)
}
}

/// process block into columns
fn process_native_transfers(
traces: Vec<Trace>,
pub(crate) fn process_native_transfers(
traces: &[Trace],
columns: &mut NativeTransfers,
schema: &Table,
schemas: &Schemas,
) -> Result<()> {
let schema = schemas.get(&Datatype::NativeTransfers).ok_or(err("schema not provided"))?;
for (transfer_index, trace) in traces.iter().enumerate() {
columns.n_rows += 1;
store!(schema, columns, block_number, trace.block_number as u32);
Expand Down
15 changes: 10 additions & 5 deletions crates/freeze/src/datasets/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ impl CollectByBlock for Traces {
}

fn transform(response: Self::Response, columns: &mut Self, schemas: &Schemas) -> Result<()> {
let schema = schemas.get(&Datatype::Traces).ok_or(err("schema not provided"))?;
process_traces(response, columns, schema)
let traces = traces::filter_failed_traces(response);
process_traces(&traces, columns, schemas)
}
}

Expand All @@ -67,12 +67,17 @@ impl CollectByTransaction for Traces {
}

fn transform(response: Self::Response, columns: &mut Self, schemas: &Schemas) -> Result<()> {
let schema = schemas.get(&Datatype::Traces).ok_or(err("schema not provided"))?;
process_traces(response, columns, schema)
let traces = traces::filter_failed_traces(response);
process_traces(&traces, columns, schemas)
}
}
/// process block into columns
fn process_traces(traces: Vec<Trace>, columns: &mut Traces, schema: &Table) -> Result<()> {
pub(crate) fn process_traces(
traces: &[Trace],
columns: &mut Traces,
schemas: &Schemas,
) -> Result<()> {
let schema = schemas.get(&Datatype::Traces).ok_or(err("schema not provided"))?;
for trace in traces.iter() {
columns.n_rows += 1;
process_action(&trace.action, columns, schema);
Expand Down
2 changes: 1 addition & 1 deletion crates/freeze/src/freeze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ fn get_payloads(
let mut all_paths = HashSet::new();
for datatype in query.datatypes.clone().into_iter() {
for partition in query.partitions.clone().into_iter() {
let paths = sink.get_paths(query, &partition)?;
let paths = sink.get_paths(query, &partition, Some(vec![datatype.clone()]))?;
if !sink.overwrite && paths.values().all(|path| path.exists()) {
skipping.push(partition);
continue
Expand Down
2 changes: 2 additions & 0 deletions crates/freeze/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
mod collect;
mod datasets;
mod freeze;
mod multi_datasets;
mod types;

pub use collect::collect;
pub use datasets::*;
pub use freeze::freeze;
pub use multi_datasets::*;
pub use types::*;
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl CollectByBlock for BlocksAndTransactions {
let BlocksAndTransactions(blocks, transactions) = columns;
let (block, _) = response.clone();
let schema = schemas.get(&Datatype::Blocks).ok_or(err("schema not provided"))?;
super::blocks::process_block(block, blocks, schema)?;
blocks::process_block(block, blocks, schema)?;
<Transactions as CollectByBlock>::transform(response, transactions, schemas)?;
Ok(())
}
Expand Down Expand Up @@ -64,7 +64,7 @@ impl CollectByTransaction for BlocksAndTransactions {
let BlocksAndTransactions(blocks, transactions) = columns;
let (block, (tx, gas_used)) = response;
let schema = schemas.get(&Datatype::Blocks).ok_or(err("schema not provided"))?;
super::blocks::process_block(block, blocks, schema)?;
blocks::process_block(block, blocks, schema)?;
let schema = schemas.get(&Datatype::Transactions).ok_or(err("schema not provided"))?;
transactions::process_transaction(tx, gas_used, transactions, schema);
Ok(())
Expand Down
81 changes: 81 additions & 0 deletions crates/freeze/src/multi_datasets/call_trace_derivatives.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use crate::*;
use ethers::prelude::*;
use polars::prelude::*;
use std::collections::HashMap;

/// CallTraceDerivatives
#[derive(Default)]
pub struct CallTraceDerivatives(
contracts::Contracts,
native_transfers::NativeTransfers,
traces::Traces,
);

type Result<T> = ::core::result::Result<T, CollectError>;

impl ToDataFrames for CallTraceDerivatives {
fn create_dfs(
self,
schemas: &HashMap<Datatype, Table>,
chain_id: u64,
) -> Result<HashMap<Datatype, DataFrame>> {
let CallTraceDerivatives(contracts, native_transfers, traces) = self;
let mut output = HashMap::new();
if schemas.contains_key(&Datatype::Contracts) {
output.extend(contracts.create_dfs(schemas, chain_id)?);
}
if schemas.contains_key(&Datatype::NativeTransfers) {
output.extend(native_transfers.create_dfs(schemas, chain_id)?);
}
if schemas.contains_key(&Datatype::Traces) {
output.extend(traces.create_dfs(schemas, chain_id)?);
}
Ok(output)
}
}

#[async_trait::async_trait]
impl CollectByBlock for CallTraceDerivatives {
type Response = Vec<Trace>;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
source.fetcher.trace_block(request.block_number()?.into()).await
}

fn transform(response: Self::Response, columns: &mut Self, schemas: &Schemas) -> Result<()> {
let traces = traces::filter_failed_traces(response);
process_call_trace_derivatives(traces, columns, schemas)
}
}

#[async_trait::async_trait]
impl CollectByTransaction for CallTraceDerivatives {
type Response = Vec<Trace>;

async fn extract(request: Params, source: Source, _schemas: Schemas) -> Result<Self::Response> {
source.fetcher.trace_transaction(request.ethers_transaction_hash()?).await
}

fn transform(response: Self::Response, columns: &mut Self, schemas: &Schemas) -> Result<()> {
let traces = traces::filter_failed_traces(response);
process_call_trace_derivatives(traces, columns, schemas)
}
}

fn process_call_trace_derivatives(
response: Vec<Trace>,
columns: &mut CallTraceDerivatives,
schemas: &HashMap<Datatype, Table>,
) -> Result<()> {
let CallTraceDerivatives(contracts, native_transfers, traces) = columns;
if schemas.contains_key(&Datatype::Contracts) {
contracts::process_contracts(&response, contracts, schemas)?;
}
if schemas.contains_key(&Datatype::NativeTransfers) {
native_transfers::process_native_transfers(&response, native_transfers, schemas)?;
}
if schemas.contains_key(&Datatype::Traces) {
traces::process_traces(&response, traces, schemas)?;
}
Ok(())
}
7 changes: 7 additions & 0 deletions crates/freeze/src/multi_datasets/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod blocks_and_transactions;
mod call_trace_derivatives;
mod state_diffs;

pub use blocks_and_transactions::*;
pub use call_trace_derivatives::*;
pub use state_diffs::*;
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use super::{balance_diffs, code_diffs, nonce_diffs, storage_diffs};
use crate::*;
use polars::prelude::*;
use std::collections::HashMap;
Expand Down
6 changes: 6 additions & 0 deletions crates/freeze/src/types/datatypes/datatype_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ macro_rules! define_datatypes {
MultiDatatype::BlocksAndTransactions => {
BlocksAndTransactions::collect_by_block(partition, source, &schemas, None)
}
MultiDatatype::CallTraceDerivatives => {
CallTraceDerivatives::collect_by_block(partition, source, &schemas, None)
}
MultiDatatype::StateDiffs => {
StateDiffs::collect_by_block(partition, source, &schemas, None)
},
Expand Down Expand Up @@ -149,6 +152,9 @@ macro_rules! define_datatypes {
MultiDatatype::BlocksAndTransactions => {
BlocksAndTransactions::collect_by_transaction(partition, source, &schemas, inner_request_size)
}
MultiDatatype::CallTraceDerivatives => {
CallTraceDerivatives::collect_by_transaction(partition, source, &schemas, None)
}
MultiDatatype::StateDiffs => {
StateDiffs::collect_by_transaction(partition, source, &schemas, inner_request_size)
}
Expand Down
Loading

0 comments on commit d1e3b00

Please sign in to comment.