Skip to content

Commit

Permalink
commit using Arc<Query>
Browse files Browse the repository at this point in the history
  • Loading branch information
sslivkoff committed Oct 21, 2023
1 parent a4747a4 commit dba7695
Show file tree
Hide file tree
Showing 44 changed files with 330 additions and 652 deletions.
5 changes: 2 additions & 3 deletions crates/freeze/src/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{collect_partition, CollectError, Query, Source};
use polars::prelude::*;

/// collect single dataframe
pub async fn collect(query: Query, source: Arc<Source>) -> Result<DataFrame, CollectError> {
pub async fn collect(query: Arc<Query>, source: Arc<Source>) -> Result<DataFrame, CollectError> {
query.is_valid()?;
let datatype = if query.datatypes.len() != 1 {
return Err(CollectError::CollectError(
Expand All @@ -18,8 +18,7 @@ pub async fn collect(query: Query, source: Arc<Source>) -> Result<DataFrame, Col
} else {
query.partitions[0].clone()
};
let results =
collect_partition(query.time_dimension, datatype, partition, source, query.schemas).await?;
let results = collect_partition(datatype, partition, query, source).await?;
if results.len() > 1 {
Err(CollectError::CollectError("collect() only returns single dataframes".to_string()))
} else {
Expand Down
26 changes: 7 additions & 19 deletions crates/freeze/src/datasets/address_appearances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,13 @@ impl Dataset for AddressAppearances {
}
}

type Result<T> = ::core::result::Result<T, CollectError>;

type BlockLogsTraces = (Block<TxHash>, Vec<Log>, Vec<Trace>);

#[async_trait::async_trait]
impl CollectByBlock for AddressAppearances {
type Response = BlockLogsTraces;

async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
async fn extract(request: Params, source: Arc<Source>, _: Arc<Query>) -> R<Self::Response> {
let block_number = request.ethers_block_number()?;
let block = source.fetcher.get_block(request.block_number()?).await?;
let block = block.ok_or(CollectError::CollectError("block not found".to_string()))?;
Expand All @@ -55,9 +49,8 @@ impl CollectByBlock for AddressAppearances {
Ok((block, logs, traces))
}

fn transform(response: Self::Response, columns: &mut Self, schemas: &Schemas) -> Result<()> {
let schema =
schemas.get(&Datatype::AddressAppearances).ok_or(err("schema not provided"))?;
fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
let schema = query.schemas.get_schema(&Datatype::AddressAppearances)?;
process_appearances(response, columns, schema)
}
}
Expand All @@ -66,11 +59,7 @@ impl CollectByBlock for AddressAppearances {
impl CollectByTransaction for AddressAppearances {
type Response = BlockLogsTraces;

async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
async fn extract(request: Params, source: Arc<Source>, _: Arc<Query>) -> R<Self::Response> {
let tx_hash = request.ethers_transaction_hash()?;

let tx_data = source.fetcher.get_transaction(tx_hash).await?.ok_or_else(|| {
Expand Down Expand Up @@ -101,9 +90,8 @@ impl CollectByTransaction for AddressAppearances {
Ok((block, logs, traces))
}

fn transform(response: Self::Response, columns: &mut Self, schemas: &Schemas) -> Result<()> {
let schema =
schemas.get(&Datatype::AddressAppearances).ok_or(err("schema not provided"))?;
fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
let schema = query.schemas.get_schema(&Datatype::AddressAppearances)?;
process_appearances(response, columns, schema)
}
}
Expand Down Expand Up @@ -220,7 +208,7 @@ fn process_appearances(
traces: BlockLogsTraces,
columns: &mut AddressAppearances,
schema: &Table,
) -> Result<()> {
) -> R<()> {
let (block, logs, traces) = traces;
let mut logs_by_tx: HashMap<H256, Vec<Log>> = HashMap::new();
for log in logs.into_iter() {
Expand Down
33 changes: 12 additions & 21 deletions crates/freeze/src/datasets/balance_diffs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::*;
use ethers::prelude::*;
use polars::prelude::*;
use std::collections::HashMap;

/// columns for transactions
#[cryo_to_df::to_df(Datatype::BalanceDiffs)]
Expand All @@ -17,57 +16,49 @@ pub struct BalanceDiffs {
pub(crate) chain_id: Vec<u64>,
}

type BlockTxsTraces = (Option<u32>, Vec<Option<Vec<u8>>>, Vec<ethers::types::BlockTrace>);
type Result<T> = ::core::result::Result<T, CollectError>;

#[async_trait::async_trait]
impl Dataset for BalanceDiffs {
fn default_sort() -> Vec<String> {
vec!["block_number".to_string(), "transaction_index".to_string()]
}
}

type BlockTxsTraces = (Option<u32>, Vec<Option<Vec<u8>>>, Vec<ethers::types::BlockTrace>);

#[async_trait::async_trait]
impl CollectByBlock for BalanceDiffs {
type Response = BlockTxsTraces;

async fn extract(
request: Params,
source: Arc<Source>,
schemas: Schemas,
) -> Result<Self::Response> {
let schema = schemas.get(&Datatype::BalanceDiffs).ok_or(err("schema not provided"))?;
async fn extract(request: Params, source: Arc<Source>, query: Arc<Query>) -> R<Self::Response> {
let schema =
query.schemas.get(&Datatype::BalanceDiffs).ok_or(err("schema not provided"))?;
let include_txs = schema.has_column("transaction_hash");
source.fetcher.trace_block_state_diffs(request.block_number()? as u32, include_txs).await
}

fn transform(response: Self::Response, columns: &mut Self, schemas: &Schemas) -> Result<()> {
process_balance_diffs(&response, columns, schemas)
fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
process_balance_diffs(&response, columns, &query.schemas)
}
}

#[async_trait::async_trait]
impl CollectByTransaction for BalanceDiffs {
type Response = (Option<u32>, Vec<Option<Vec<u8>>>, Vec<ethers::types::BlockTrace>);
type Response = BlockTxsTraces;

async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
async fn extract(request: Params, source: Arc<Source>, _: Arc<Query>) -> R<Self::Response> {
source.fetcher.trace_transaction_state_diffs(request.transaction_hash()?).await
}

fn transform(response: Self::Response, columns: &mut Self, schemas: &Schemas) -> Result<()> {
process_balance_diffs(&response, columns, schemas)
fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
process_balance_diffs(&response, columns, &query.schemas)
}
}

pub(crate) fn process_balance_diffs(
response: &BlockTxsTraces,
columns: &mut BalanceDiffs,
schemas: &Schemas,
) -> Result<()> {
) -> R<()> {
let schema = schemas.get(&Datatype::BalanceDiffs).ok_or(err("schema not provided"))?;
let (block_number, txs, traces) = response;
for (index, (trace, tx)) in traces.iter().zip(txs).enumerate() {
Expand Down
18 changes: 4 additions & 14 deletions crates/freeze/src/datasets/balances.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::*;
use ethers::prelude::*;
use polars::prelude::*;
use std::collections::HashMap;

/// columns for balances
#[cryo_to_df::to_df(Datatype::Balances)]
Expand Down Expand Up @@ -29,27 +28,22 @@ impl Dataset for Balances {
}
}

type Result<T> = ::core::result::Result<T, CollectError>;
type BlockTxAddressOutput = (u32, Option<Vec<u8>>, Vec<u8>, U256);

#[async_trait::async_trait]
impl CollectByBlock for Balances {
type Response = BlockTxAddressOutput;

async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
async fn extract(request: Params, source: Arc<Source>, _: Arc<Query>) -> R<Self::Response> {
let address = request.address()?;
let block_number = request.block_number()? as u32;
let balance =
source.fetcher.get_balance(H160::from_slice(&address), block_number.into()).await?;
Ok((block_number, None, address, balance))
}

fn transform(response: Self::Response, columns: &mut Self, schemas: &Schemas) -> Result<()> {
let schema = schemas.get(&Datatype::Balances).ok_or(err("schema not provided"))?;
fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
let schema = query.schemas.get(&Datatype::Balances).ok_or(err("schema not provided"))?;
process_balance(columns, response, schema)
}
}
Expand All @@ -59,11 +53,7 @@ impl CollectByTransaction for Balances {
type Response = ();
}

fn process_balance(
columns: &mut Balances,
data: BlockTxAddressOutput,
schema: &Table,
) -> Result<()> {
fn process_balance(columns: &mut Balances, data: BlockTxAddressOutput, schema: &Table) -> R<()> {
let (block, _tx, address, balance) = data;
columns.n_rows += 1;
store!(schema, columns, block_number, block);
Expand Down
29 changes: 7 additions & 22 deletions crates/freeze/src/datasets/blocks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::*;
use ethers::prelude::*;
use polars::prelude::*;
use std::collections::HashMap;

/// columns for transactions
#[cryo_to_df::to_df(Datatype::Blocks)]
Expand Down Expand Up @@ -45,17 +44,11 @@ impl Dataset for Blocks {
}
}

type Result<T> = ::core::result::Result<T, CollectError>;

#[async_trait::async_trait]
impl CollectByBlock for Blocks {
type Response = Block<TxHash>;

async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
async fn extract(request: Params, source: Arc<Source>, _: Arc<Query>) -> R<Self::Response> {
let block = source
.fetcher
.get_block(request.block_number()?)
Expand All @@ -64,8 +57,8 @@ impl CollectByBlock for Blocks {
Ok(block)
}

fn transform(response: Self::Response, columns: &mut Self, schemas: &Schemas) -> Result<()> {
let schema = schemas.get(&Datatype::Blocks).ok_or(err("schema not provided"))?;
fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
let schema = query.schemas.get_schema(&Datatype::Blocks)?;
process_block(response, columns, schema)
}
}
Expand All @@ -74,11 +67,7 @@ impl CollectByBlock for Blocks {
impl CollectByTransaction for Blocks {
type Response = Block<TxHash>;

async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
async fn extract(request: Params, source: Arc<Source>, _: Arc<Query>) -> R<Self::Response> {
let transaction = source
.fetcher
.get_transaction(request.ethers_transaction_hash()?)
Expand All @@ -92,18 +81,14 @@ impl CollectByTransaction for Blocks {
Ok(block)
}

fn transform(response: Self::Response, columns: &mut Self, schemas: &Schemas) -> Result<()> {
let schema = schemas.get(&Datatype::Blocks).ok_or(err("schema not provided"))?;
fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
let schema = query.schemas.get_schema(&Datatype::Blocks)?;
process_block(response, columns, schema)
}
}

/// process block into columns
pub(crate) fn process_block<TX>(
block: Block<TX>,
columns: &mut Blocks,
schema: &Table,
) -> Result<()> {
pub(crate) fn process_block<TX>(block: Block<TX>, columns: &mut Blocks, schema: &Table) -> R<()> {
columns.n_rows += 1;

store!(schema, columns, block_hash, block.hash.map(|x| x.0.to_vec()));
Expand Down
34 changes: 12 additions & 22 deletions crates/freeze/src/datasets/code_diffs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::*;
use ethers::prelude::*;
use polars::prelude::*;
use std::collections::HashMap;

/// columns for transactions
#[cryo_to_df::to_df(Datatype::CodeDiffs)]
Expand All @@ -24,50 +23,41 @@ impl Dataset for CodeDiffs {
}
}

type BlockTxsTraces = (Option<u32>, Vec<Option<Vec<u8>>>, Vec<ethers::types::BlockTrace>);
type Result<T> = ::core::result::Result<T, CollectError>;
type BlockTxTraces = (Option<u32>, Vec<Option<Vec<u8>>>, Vec<ethers::types::BlockTrace>);

#[async_trait::async_trait]
impl CollectByBlock for CodeDiffs {
type Response = BlockTxsTraces;
type Response = BlockTxTraces;

async fn extract(
request: Params,
source: Arc<Source>,
schemas: Schemas,
) -> Result<Self::Response> {
let schema = schemas.get(&Datatype::CodeDiffs).ok_or(err("schema not provided"))?;
async fn extract(request: Params, source: Arc<Source>, query: Arc<Query>) -> R<Self::Response> {
let schema = query.schemas.get(&Datatype::CodeDiffs).ok_or(err("schema not provided"))?;
let include_txs = schema.has_column("transaction_hash");
source.fetcher.trace_block_state_diffs(request.block_number()? as u32, include_txs).await
}

fn transform(response: Self::Response, columns: &mut Self, schemas: &Schemas) -> Result<()> {
process_code_diffs(&response, columns, schemas)
fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
process_code_diffs(&response, columns, &query.schemas)
}
}

#[async_trait::async_trait]
impl CollectByTransaction for CodeDiffs {
type Response = (Option<u32>, Vec<Option<Vec<u8>>>, Vec<ethers::types::BlockTrace>);
type Response = BlockTxTraces;

async fn extract(
request: Params,
source: Arc<Source>,
_schemas: Schemas,
) -> Result<Self::Response> {
async fn extract(request: Params, source: Arc<Source>, _: Arc<Query>) -> R<Self::Response> {
source.fetcher.trace_transaction_state_diffs(request.transaction_hash()?).await
}

fn transform(response: Self::Response, columns: &mut Self, schemas: &Schemas) -> Result<()> {
process_code_diffs(&response, columns, schemas)
fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
process_code_diffs(&response, columns, &query.schemas)
}
}

pub(crate) fn process_code_diffs(
response: &BlockTxsTraces,
response: &BlockTxTraces,
columns: &mut CodeDiffs,
schemas: &Schemas,
) -> Result<()> {
) -> R<()> {
let schema = schemas.get(&Datatype::CodeDiffs).ok_or(err("schema not provided"))?;
let (block_number, txs, traces) = response;
for (index, (trace, tx)) in traces.iter().zip(txs).enumerate() {
Expand Down
Loading

0 comments on commit dba7695

Please sign in to comment.