Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More datasets #64

Merged
merged 10 commits into from
Sep 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ governor = "0.5.1"
hex = "0.4.3"
indexmap = "2.0.0"
indicatif = "0.17.5"
lazy_static = "1.4.0"
polars = { version = "0.32.1", features = [
"parquet",
"string_encoding",
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,13 @@ Many `cryo` cli options will affect output schemas by adding/removing columns or
#### Schema Design Guide

An attempt is made to ensure that the dataset schemas conform to a common set of design guidelines:
- By default, rows should contain enough information be order-able
- By default, rows should contain enough information in their columns to be order-able (unless the rows do not have an intrinsic order)
- Columns should be named by their JSON-RPC or ethers.rs defaults, except in cases where a much more explicit name is available
- To make joins across tables easier, a given piece of information should use the same datatype and column name across tables when possible
- Large ints such as `u256` should allow multiple conversions. A `value` column of type `u256` should allow: `value_binary`, `value_string`, `value_f32`, `value_f64`, `value_u32`, `value_u64`, and `value_d128`
- By default, columns related to non-identifying cryptographic signatures are omitted by default. For example, `state_root` of a block or `v`/`r`/`s` of a transaction
- Integer values that can never be negative should be stored as unsigned integers
- Every table should allow an optional `chain_id` column so that data from multiple chains can be easily stored in the same table.

Standard types across tables:
- `block_number`: `u32`
Expand Down
34 changes: 31 additions & 3 deletions crates/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,37 @@ pub struct Args {
#[arg(long, help_heading = "Output Options")]
pub no_report: bool,

/// Address
#[arg(long, help_heading = "Dataset-specific Options", num_args(1..))]
pub address: Option<Vec<String>>,

/// To Address
#[arg(long, help_heading = "Dataset-specific Options", num_args(1..), value_name="TO")]
pub to_address: Option<Vec<String>>,

/// From Address
#[arg(long, help_heading = "Dataset-specific Options", num_args(1..), value_name="FROM")]
pub from_address: Option<Vec<String>>,

/// [eth_calls] Call data to use for eth_calls
#[arg(long, help_heading = "Dataset-specific Options", num_args(1..))]
pub call_data: Option<Vec<String>>,

/// [eth_calls] Function to use for eth_calls
#[arg(long, help_heading = "Dataset-specific Options", num_args(1..))]
pub function: Option<Vec<String>>,

/// [eth_calls] Inputs to use for eth_calls
#[arg(long, help_heading = "Dataset-specific Options", num_args(1..))]
pub inputs: Option<Vec<String>>,

/// [slots] Slots
#[arg(long, help_heading = "Dataset-specific Options", num_args(1..))]
pub slots: Option<Vec<String>>,

/// [logs] filter logs by contract address
#[arg(long, help_heading = "Dataset-specific Options")]
pub contract: Option<String>,
pub contract: Option<Vec<String>>,

/// [logs] filter logs by topic0
#[arg(long, visible_alias = "event", help_heading = "Dataset-specific Options")]
Expand All @@ -170,14 +198,14 @@ pub struct Args {
/// [logs] Blocks per request
#[arg(
long,
value_name = "BLOCKS",
value_name = "SIZE",
default_value_t = 1,
help_heading = "Dataset-specific Options"
)]
pub inner_request_size: u64,

/// [logs] event signature to parse
#[arg(long, help_heading = "Dataset-specific Options")]
#[arg(long, value_name = "SIGNATURE", help_heading = "Dataset-specific Options")]
pub event_signature: Option<String>,
}

Expand Down
12 changes: 10 additions & 2 deletions crates/cli/src/parse/blocks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use ethers::prelude::*;
use polars::prelude::*;
use std::collections::HashMap;

use cryo_freeze::{BlockChunk, Chunk, ChunkData, Fetcher, ParseError, Subchunk};
use cryo_freeze::{BlockChunk, Chunk, ChunkData, Datatype, Fetcher, ParseError, Subchunk, Table};

use crate::args::Args;

Expand Down Expand Up @@ -123,8 +124,15 @@ async fn postprocess_block_chunks<P: JsonRpcClient>(
pub(crate) async fn get_default_block_chunks<P: JsonRpcClient>(
args: &Args,
fetcher: Arc<Fetcher<P>>,
schemas: &HashMap<Datatype, Table>,
) -> Result<Vec<(Chunk, Option<String>)>, ParseError> {
let block_chunks = parse_block_inputs(&String::from(r"0:latest"), &fetcher).await?;
let default_blocks = schemas
.keys()
.map(|datatype| datatype.dataset().default_blocks())
.find(|blocks| !blocks.is_none())
.unwrap_or(Some("0:latest".to_string()))
.unwrap();
let block_chunks = parse_block_inputs(&default_blocks, &fetcher).await?;
postprocess_block_chunks(block_chunks, args, fetcher).await
}

Expand Down
7 changes: 3 additions & 4 deletions crates/cli/src/parse/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
mod args;
mod blocks;
mod file_output;
mod parse_utils;
mod query;
mod schemas;
mod source;
mod transactions;

pub use args::*;
// use blocks::*;
// use file_output::*;
// use query::*;
// use source::*;
use schemas::*;
83 changes: 83 additions & 0 deletions crates/cli/src/parse/parse_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use cryo_freeze::ParseError;
use std::collections::HashMap;

pub(crate) fn hex_string_to_binary(hex_string: &String) -> Result<Vec<u8>, ParseError> {
let hex_string = hex_string.strip_prefix("0x").unwrap_or(hex_string);
hex::decode(hex_string)
.map_err(|_| ParseError::ParseError("could not parse data as hex".to_string()))
}

pub(crate) fn hex_strings_to_binary(hex_strings: &[String]) -> Result<Vec<Vec<u8>>, ParseError> {
hex_strings
.iter()
.map(|x| {
hex::decode(x.strip_prefix("0x").unwrap_or(x))
.map_err(|_| ParseError::ParseError("could not parse data as hex".to_string()))
})
.collect::<Result<Vec<_>, _>>()
}

#[derive(Eq, PartialEq, Hash)]
pub(crate) enum BinaryInputList {
Explicit,
ParquetColumn(String, String),
}

type ParsedBinaryArg = HashMap<BinaryInputList, Vec<Vec<u8>>>;

/// parse binary argument list
/// each argument can be a hex string or a parquet column reference
/// each parquet column is loaded into its own list, hex strings loaded into another
pub(crate) fn parse_binary_arg(
inputs: &[String],
default_column: &str,
) -> Result<ParsedBinaryArg, ParseError> {
let mut parsed = HashMap::new();

// separate into files vs explicit
let (files, hex_strings): (Vec<&String>, Vec<&String>) =
inputs.iter().partition(|tx| std::path::Path::new(tx).exists());

// files columns
for path in files {
let reference = parse_file_column_reference(path, default_column)?;
let values = cryo_freeze::read_binary_column(&reference.path, &reference.column)
.map_err(|_e| ParseError::ParseError("could not read input".to_string()))?;
let key = BinaryInputList::ParquetColumn(reference.path, reference.column);
parsed.insert(key, values);
}

// explicit binary strings
if !hex_strings.is_empty() {
let hex_strings: Vec<String> = hex_strings.into_iter().cloned().collect();
let binary_vec = hex_strings_to_binary(&hex_strings)?;
parsed.insert(BinaryInputList::Explicit, binary_vec);
};

Ok(parsed)
}

struct FileColumnReference {
path: String,
column: String,
}

fn parse_file_column_reference(
path: &str,
default_column: &str,
) -> Result<FileColumnReference, ParseError> {
let (path, column) = if path.contains(':') {
let pieces: Vec<&str> = path.split(':').collect();
if pieces.len() == 2 {
(pieces[0], pieces[1])
} else {
return Err(ParseError::ParseError("could not parse path column".to_string()))
}
} else {
(path, default_column)
};

let parsed = FileColumnReference { path: path.to_string(), column: column.to_string() };

Ok(parsed)
}
Loading
Loading