Skip to content

Commit

Permalink
add FromAddres dimension & take it from cli parameter, filter txs
Browse files Browse the repository at this point in the history
… by `from_address` & `to_address` (#111)

* add FromAddress dimension and add corresponding implementations

* implement from_address & to_address filter for txs

* take from_address from cli params, filter tx and receipts by from & to address

* remove unnecessary vec definition

* fmt

* remove timestamp param

* review: fetch less receipts if transaction filter is applied
  • Loading branch information
cool-mestorf authored Nov 12, 2023
1 parent 2c803b6 commit 7da85e8
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 36 deletions.
5 changes: 5 additions & 0 deletions crates/cli/src/parse/partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub(crate) async fn parse_partitions<P: JsonRpcClient>(
schemas: &HashMap<Datatype, Table>,
) -> Result<(Vec<Partition>, Vec<Dim>, TimeDimension), ParseError> {
// TODO: if wanting to chunk these non-block dimensions, do it in parse_binary_arg()
// TODO: map from args to dim is not exhaustive

// parse chunk data
let (block_number_labels, block_numbers) = blocks::parse_blocks(args, fetcher.clone()).await?;
Expand All @@ -28,6 +29,8 @@ pub(crate) async fn parse_partitions<P: JsonRpcClient>(
let call_data_labels = None;
let (address_labels, addresses) = parse_address_chunks(&args.address, "address")?;
let (contract_labels, contracts) = parse_address_chunks(&args.contract, "contract_address")?;
let (from_address_labels, from_addresses) =
parse_address_chunks(&args.from_address, "from_address")?;
let (to_address_labels, to_addresses) = parse_address_chunks(&args.to_address, "to_address")?;
let (slot_labels, slots) = parse_slot_chunks(&args.slot, "slot")?;
let (topic0_labels, topic0s) = parse_topic(&args.topic0, "topic0")?;
Expand All @@ -49,6 +52,7 @@ pub(crate) async fn parse_partitions<P: JsonRpcClient>(
transactions,
addresses,
contracts,
from_addresses,
to_addresses,
slots,
call_datas,
Expand All @@ -63,6 +67,7 @@ pub(crate) async fn parse_partitions<P: JsonRpcClient>(
call_data_labels,
address_labels,
contract_labels,
from_address_labels,
to_address_labels,
slot_labels,
topic0_labels,
Expand Down
2 changes: 2 additions & 0 deletions crates/cli/src/parse/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl DimIsNone for Args {
Dim::BlockNumber => self.blocks.is_some(),
Dim::TransactionHash => self.txs.is_some(),
Dim::Address => self.address.is_some(),
Dim::FromAddress => self.from_address.is_some(),
Dim::ToAddress => self.to_address.is_some(),
Dim::Contract => self.contract.is_some(),
Dim::CallData => self.call_data.is_some(),
Expand All @@ -79,6 +80,7 @@ impl DimIsNone for Args {
Dim::BlockNumber => self.blocks.is_none(),
Dim::TransactionHash => self.txs.is_none(),
Dim::Address => self.address.is_none(),
Dim::FromAddress => self.from_address.is_none(),
Dim::ToAddress => self.to_address.is_none(),
Dim::Contract => self.contract.is_none(),
Dim::CallData => self.call_data.is_none(),
Expand Down
90 changes: 59 additions & 31 deletions crates/freeze/src/datasets/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,18 @@ impl Dataset for Transactions {
"chain_id",
])
}

fn optional_parameters() -> Vec<Dim> {
vec![Dim::FromAddress, Dim::ToAddress]
}
}

/// tuple representing transaction and optional receipt
pub type TransactionAndReceipt = (Transaction, Option<TransactionReceipt>);

#[async_trait::async_trait]
impl CollectByBlock for Transactions {
type Response = (Block<Transaction>, Option<Vec<TransactionReceipt>>, bool);
type Response = (Block<Transaction>, Vec<TransactionAndReceipt>, bool);

async fn extract(request: Params, source: Arc<Source>, query: Arc<Query>) -> R<Self::Response> {
let block = source
Expand All @@ -66,44 +73,65 @@ impl CollectByBlock for Transactions {
.await?
.ok_or(CollectError::CollectError("block not found".to_string()))?;
let schema = query.schemas.get_schema(&Datatype::Transactions)?;
let receipt = if schema.has_column("gas_used") | schema.has_column("success") {
Some(source.get_tx_receipts_in_block(&block).await?)
} else {
None
};
Ok((block, receipt, query.exclude_failed))

// 1. collect transactions and filter them if optional parameters are supplied
// filter by from_address
let from_filter: Box<dyn Fn(&Transaction) -> bool + Send> =
if let Some(from_address) = &request.from_address {
Box::new(move |tx| tx.from.as_bytes() == from_address)
} else {
Box::new(|_| true)
};
// filter by to_address
let to_filter: Box<dyn Fn(&Transaction) -> bool + Send> =
if let Some(to_address) = &request.to_address {
Box::new(move |tx| tx.to.as_ref().map_or(false, |x| x.as_bytes() == to_address))
} else {
Box::new(|_| true)
};
let transactions =
block.transactions.clone().into_iter().filter(from_filter).filter(to_filter).collect();

// 2. collect receipts if necessary
// if transactions are filtered fetch by set of transaction hashes, else fetch all receipts
// in block
let receipts: Vec<Option<_>> =
if schema.has_column("gas_used") | schema.has_column("success") {
// receipts required
let receipts = if request.from_address.is_some() || request.to_address.is_some() {
source.get_tx_receipts(&transactions).await?
} else {
source.get_tx_receipts_in_block(&block).await?
};
receipts.into_iter().map(Some).collect()
} else {
vec![None; block.transactions.len()]
};

let transactions_with_receips = transactions.into_iter().zip(receipts).collect();
Ok((block, transactions_with_receips, query.exclude_failed))
}

fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
let schema = query.schemas.get_schema(&Datatype::Transactions)?;
let (block, receipts, exclude_failed) = response;
let timestamp = block.timestamp.as_u32();
match receipts {
Some(receipts) => {
for (tx, receipt) in block.transactions.into_iter().zip(receipts) {
process_transaction(
tx,
Some(receipt.clone()),
columns,
schema,
exclude_failed,
timestamp,
)?;
}
}
None => {
for tx in block.transactions.into_iter() {
process_transaction(tx, None, columns, schema, exclude_failed, timestamp)?;
}
}
let (block, transactions_with_receipts, exclude_failed) = response;
for (tx, receipt) in transactions_with_receipts.into_iter() {
process_transaction(
tx,
receipt,
columns,
schema,
exclude_failed,
block.timestamp.as_u32(),
)?;
}
Ok(())
}
}

#[async_trait::async_trait]
impl CollectByTransaction for Transactions {
type Response = (Transaction, Option<TransactionReceipt>, bool, u32);
type Response = (TransactionAndReceipt, bool, u32);

async fn extract(request: Params, source: Arc<Source>, query: Arc<Query>) -> R<Self::Response> {
let tx_hash = request.ethers_transaction_hash()?;
Expand All @@ -113,7 +141,7 @@ impl CollectByTransaction for Transactions {
.get_transaction(tx_hash)
.await?
.ok_or(CollectError::CollectError("transaction not found".to_string()))?;
let gas_used = if schema.has_column("gas_used") {
let receipt = if schema.has_column("gas_used") {
source.fetcher.get_transaction_receipt(tx_hash).await?
} else {
None
Expand All @@ -131,12 +159,12 @@ impl CollectByTransaction for Transactions {

let timestamp = block.timestamp.as_u32();

Ok((transaction, gas_used, query.exclude_failed, timestamp))
Ok(((transaction, receipt), query.exclude_failed, timestamp))
}

fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
let schema = query.schemas.get_schema(&Datatype::Transactions)?;
let (transaction, receipt, exclude_failed, timestamp) = response;
let ((transaction, receipt), exclude_failed, timestamp) = response;
process_transaction(transaction, receipt, columns, schema, exclude_failed, timestamp)?;
Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions crates/freeze/src/multi_datasets/blocks_and_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,20 @@ impl CollectByTransaction for BlocksAndTransactions {
);

async fn extract(request: Params, source: Arc<Source>, query: Arc<Query>) -> R<Self::Response> {
let (tx, receipt, exclude_failed, timestamp) =
let ((tx, receipt), exclude_failed, timestamp) =
<Transactions as CollectByTransaction>::extract(request, source.clone(), query).await?;
let block_number = tx.block_number.ok_or(err("no block number for tx"))?.as_u64();
let block = source
.fetcher
.get_block(block_number)
.await?
.ok_or(CollectError::CollectError("block not found".to_string()))?;
Ok((block, (tx, receipt, exclude_failed, timestamp)))
Ok((block, ((tx, receipt), exclude_failed, timestamp)))
}

fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
let BlocksAndTransactions(blocks, transactions) = columns;
let (block, (tx, receipt, exclude_failed, timestamp)) = response;
let (block, ((tx, receipt), exclude_failed, timestamp)) = response;
let schema = query.schemas.get_schema(&Datatype::Blocks)?;
blocks::process_block(block, blocks, schema)?;
let schema = query.schemas.get_schema(&Datatype::Transactions)?;
Expand Down
29 changes: 29 additions & 0 deletions crates/freeze/src/types/partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub enum Dim {
Address,
/// Contract dimension
Contract,
/// FromAddress dimension
FromAddress,
/// ToAddress dimension
ToAddress,
/// Slot dimension
Expand All @@ -32,13 +34,16 @@ pub enum Dim {

impl Dim {
/// list of all dimensions
// NOTE: this function is not exhaustive
// maybe generating iterator using macro (such as strum) would be better
pub fn all_dims() -> Vec<Dim> {
vec![
Dim::BlockNumber,
Dim::TransactionHash,
Dim::CallData,
Dim::Address,
Dim::Contract,
Dim::FromAddress,
Dim::ToAddress,
Dim::Slot,
Dim::Topic0,
Expand All @@ -56,6 +61,7 @@ impl Dim {
Dim::CallData => "call_datas",
Dim::Address => "addresses",
Dim::Contract => "contracts",
Dim::FromAddress => "from_addresses",
Dim::ToAddress => "to_addresses",
Dim::Slot => "slots",
Dim::Topic0 => "topic0s",
Expand All @@ -70,13 +76,16 @@ impl std::str::FromStr for Dim {
type Err = crate::ParseError;

/// convert str to Dim
// NOTE: this function is not exhaustive
// maybe generating FromStr trait using macro (such as strum) would be better
fn from_str(name: &str) -> Result<Dim, Self::Err> {
let dim = match name {
"block" => Dim::BlockNumber,
"transaction" => Dim::TransactionHash,
"call_data" => Dim::CallData,
"address" => Dim::Address,
"contract" => Dim::Contract,
"from_address" => Dim::FromAddress,
"to_address" => Dim::ToAddress,
"slot" => Dim::Slot,
"topic0" => Dim::Topic0,
Expand All @@ -97,6 +106,7 @@ impl std::fmt::Display for Dim {
Dim::CallData => "call_data",
Dim::Address => "address",
Dim::Contract => "contract",
Dim::FromAddress => "from_address",
Dim::ToAddress => "to_address",
Dim::Slot => "slot",
Dim::Topic0 => "topic0",
Expand All @@ -123,6 +133,8 @@ pub struct Partition {
pub addresses: Option<Vec<AddressChunk>>,
/// contracts
pub contracts: Option<Vec<AddressChunk>>,
/// from addresses
pub from_addresses: Option<Vec<AddressChunk>>,
/// to addresses
pub to_addresses: Option<Vec<AddressChunk>>,
/// slots
Expand Down Expand Up @@ -255,6 +267,7 @@ impl Partition {
Dim::CallData => chunks_to_name(&self.call_datas)?,
Dim::Address => chunks_to_name(&self.addresses)?,
Dim::Contract => chunks_to_name(&self.contracts)?,
Dim::FromAddress => chunks_to_name(&self.from_addresses)?,
Dim::ToAddress => chunks_to_name(&self.to_addresses)?,
Dim::Slot => chunks_to_name(&self.slots)?,
Dim::Topic0 => chunks_to_name(&self.topic0s)?,
Expand Down Expand Up @@ -282,6 +295,7 @@ impl Partition {
Dim::TransactionHash => partition!(outputs, transactions)?,
Dim::Address => partition!(outputs, addresses)?,
Dim::Contract => partition!(outputs, contracts)?,
Dim::FromAddress => partition!(outputs, from_addresses)?,
Dim::ToAddress => partition!(outputs, to_addresses)?,
Dim::CallData => partition!(outputs, call_datas)?,
Dim::Slot => partition!(outputs, slots)?,
Expand Down Expand Up @@ -309,6 +323,7 @@ impl Partition {
Dim::TransactionHash => label_partition!(outputs, dim_labels, transactions)?,
Dim::Address => label_partition!(outputs, dim_labels, addresses)?,
Dim::Contract => label_partition!(outputs, dim_labels, contracts)?,
Dim::FromAddress => label_partition!(outputs, dim_labels, from_addresses)?,
Dim::ToAddress => label_partition!(outputs, dim_labels, to_addresses)?,
Dim::CallData => label_partition!(outputs, dim_labels, call_datas)?,
Dim::Slot => label_partition!(outputs, dim_labels, slots)?,
Expand Down Expand Up @@ -342,6 +357,7 @@ impl Partition {
}
Dim::Address => parametrize!(outputs, new, self.addresses, address),
Dim::Contract => parametrize!(outputs, new, self.contracts, contract),
Dim::FromAddress => parametrize!(outputs, new, self.from_addresses, from_address),
Dim::ToAddress => parametrize!(outputs, new, self.to_addresses, to_address),
Dim::CallData => parametrize!(outputs, new, self.call_datas, call_data),
Dim::Slot => parametrize!(outputs, new, self.slots, slot),
Expand Down Expand Up @@ -388,6 +404,7 @@ impl Partition {
}

/// return Vec of dimensions defined in partitions
// NOTE: this function is not exhaustive
pub fn dims(&self) -> Vec<Dim> {
let mut dims = Vec::new();
if self.block_numbers.is_some() {
Expand All @@ -402,6 +419,9 @@ impl Partition {
if self.contracts.is_some() {
dims.push(Dim::Contract)
};
if self.from_addresses.is_some() {
dims.push(Dim::FromAddress)
};
if self.to_addresses.is_some() {
dims.push(Dim::ToAddress)
};
Expand Down Expand Up @@ -433,6 +453,7 @@ impl Partition {
Dim::TransactionHash => self.transactions.as_ref().map(|x| x.len()).unwrap_or(0),
Dim::Address => self.addresses.as_ref().map(|x| x.len()).unwrap_or(0),
Dim::Contract => self.contracts.as_ref().map(|x| x.len()).unwrap_or(0),
Dim::FromAddress => self.from_addresses.as_ref().map(|x| x.len()).unwrap_or(0),
Dim::ToAddress => self.to_addresses.as_ref().map(|x| x.len()).unwrap_or(0),
Dim::CallData => self.call_datas.as_ref().map(|x| x.len()).unwrap_or(0),
Dim::Slot => self.slots.as_ref().map(|x| x.len()).unwrap_or(0),
Expand All @@ -444,6 +465,7 @@ impl Partition {
}

/// get statistics for partition
// NOTE: this function is not exhaustive
pub fn stats(&self) -> PartitionStats {
let chunk = self.clone();
PartitionStats {
Expand All @@ -452,6 +474,7 @@ impl Partition {
call_datas: chunk.call_datas.map(|c| c.stats()),
addresses: chunk.addresses.map(|c| c.stats()),
contracts: chunk.contracts.map(|c| c.stats()),
from_addresses: chunk.from_addresses.map(|c| c.stats()),
to_addresses: chunk.to_addresses.map(|c| c.stats()),
slots: chunk.slots.map(|c| c.stats()),
topic0s: chunk.topic0s.map(|c| c.stats()),
Expand Down Expand Up @@ -483,6 +506,8 @@ pub struct PartitionStats {
pub addresses: Option<ChunkStats<Vec<u8>>>,
/// contracts stats
pub contracts: Option<ChunkStats<Vec<u8>>>,
/// from_addresses stats
pub from_addresses: Option<ChunkStats<Vec<u8>>>,
/// to_addresses stats
pub to_addresses: Option<ChunkStats<Vec<u8>>>,
/// slots stats
Expand Down Expand Up @@ -517,6 +542,7 @@ impl PartitionStats {
call_datas: fold(self.call_datas, other.call_datas),
addresses: fold(self.addresses, other.addresses),
contracts: fold(self.contracts, other.contracts),
from_addresses: fold(self.from_addresses, other.from_addresses),
to_addresses: fold(self.to_addresses, other.to_addresses),
slots: fold(self.slots, other.slots),
topic0s: fold(self.topic0s, other.topic0s),
Expand All @@ -539,6 +565,8 @@ pub struct PartitionLabels {
pub address_labels: Option<Vec<Option<String>>>,
/// contract labels
pub contract_labels: Option<Vec<Option<String>>>,
/// from address labels
pub from_address_labels: Option<Vec<Option<String>>>,
/// to address labels
pub to_address_labels: Option<Vec<Option<String>>>,
/// slot labels
Expand All @@ -561,6 +589,7 @@ impl PartitionLabels {
Dim::CallData => self.call_data_labels.clone(),
Dim::Address => self.address_labels.clone(),
Dim::Contract => self.contract_labels.clone(),
Dim::FromAddress => self.from_address_labels.clone(),
Dim::ToAddress => self.to_address_labels.clone(),
Dim::Slot => self.slot_labels.clone(),
Dim::Topic0 => self.topic0_labels.clone(),
Expand Down
2 changes: 2 additions & 0 deletions crates/freeze/src/types/rpc_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub struct Params {
pub address: Option<Vec<u8>>,
/// contract
pub contract: Option<Vec<u8>>,
/// from address
pub from_address: Option<Vec<u8>>,
/// to address
pub to_address: Option<Vec<u8>>,
/// slot
Expand Down
Loading

0 comments on commit 7da85e8

Please sign in to comment.