From bff84e3a5fdc28fe1b66c6d3a87774ed23b9d2e9 Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Mon, 21 Aug 2023 11:27:45 +0200 Subject: [PATCH 01/16] More permissive include/exclude column specifier interpretation. --- crates/freeze/src/types/schemas.rs | 43 ++++++++++++++++-------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/crates/freeze/src/types/schemas.rs b/crates/freeze/src/types/schemas.rs index 83cbb5ff..b4d4af65 100644 --- a/crates/freeze/src/types/schemas.rs +++ b/crates/freeze/src/types/schemas.rs @@ -93,9 +93,15 @@ impl Datatype { sort: Option>, ) -> Result { let column_types = self.dataset().column_types(); + let all_columns = column_types.keys().map(|k| k.to_string()).collect(); let default_columns = self.dataset().default_columns(); - let used_columns = - compute_used_columns(default_columns, include_columns, exclude_columns, columns, self); + let used_columns = compute_used_columns( + all_columns, + default_columns, + include_columns, + exclude_columns, + columns, + ); let mut columns = IndexMap::new(); for column in used_columns { let mut ctype = column_types.get(column.as_str()).ok_or(SchemaError::InvalidColumn)?; @@ -110,38 +116,35 @@ impl Datatype { } fn compute_used_columns( + all_columns: HashSet, default_columns: Vec<&str>, include_columns: &Option>, exclude_columns: &Option>, columns: &Option>, - datatype: &Datatype, -) -> Vec { +) -> HashSet { match (columns, include_columns, exclude_columns) { (Some(columns), _, _) if ((columns.len() == 1) & columns.contains(&"all".to_string())) => { - datatype.dataset().column_types().keys().map(|k| k.to_string()).collect() + all_columns } (Some(columns), _, _) => columns.iter().map(|x| x.to_string()).collect(), (_, Some(include), _) if ((include.len() == 1) & include.contains(&"all".to_string())) => { - datatype.dataset().column_types().keys().map(|k| k.to_string()).collect() + all_columns } (_, Some(include), Some(exclude)) => { - let mut result: Vec = default_columns.iter().map(|s| s.to_string()).collect(); - let mut result_set: HashSet = result.iter().cloned().collect(); + let mut result_set: HashSet = + default_columns.iter().map(|s| s.to_string()).collect(); let exclude_set: HashSet = exclude.iter().cloned().collect(); - include - .iter() - .filter(|item| !exclude_set.contains(*item) && result_set.insert(item.to_string())) - .for_each(|item| result.push(item.clone())); - result + result_set.extend(include.iter().cloned()); + result_set = result_set.difference(&exclude_set).cloned().collect(); + // Permissively skip `include` columns that are not in this dataset (they might apply to other dataset) + result_set.intersection(&all_columns).cloned().collect() } (_, Some(include), None) => { - let mut result: Vec = default_columns.iter().map(|s| s.to_string()).collect(); - let mut result_set: HashSet = result.iter().cloned().collect(); - include - .iter() - .filter(|item| result_set.insert(item.to_string())) - .for_each(|item| result.push(item.clone())); - result + let mut result_set: HashSet = + default_columns.iter().map(|s| s.to_string()).collect(); + result_set.extend(include.iter().cloned()); + // Permissively skip `include` columns that are not in this dataset (they might apply to other dataset) + result_set.intersection(&all_columns).cloned().collect() } (_, None, Some(exclude)) => { let exclude_set: HashSet<_> = exclude.iter().collect(); From ab70444a40b8d8e2a9f501f48055b5c44ac22936 Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Mon, 21 Aug 2023 12:38:08 +0200 Subject: [PATCH 02/16] Refactor, add tests and update docs. --- crates/cli/src/args.rs | 4 +- crates/freeze/src/types/schemas.rs | 145 ++++++++++++++++++++++------- 2 files changed, 112 insertions(+), 37 deletions(-) diff --git a/crates/cli/src/args.rs b/crates/cli/src/args.rs index 82f46928..3582b84c 100644 --- a/crates/cli/src/args.rs +++ b/crates/cli/src/args.rs @@ -40,7 +40,9 @@ pub struct Args { pub reorg_buffer: u64, /// Columns to include alongside the default output, - /// use `all` to include all available columns + /// use `all` to include all available columns. + /// Unknown columns are ignored per dataset, so a superset of columns can be specified for + /// multiset download. #[arg(short, long, value_name="COLS", num_args(0..), verbatim_doc_comment, help_heading="Content Options")] pub include_columns: Option>, diff --git a/crates/freeze/src/types/schemas.rs b/crates/freeze/src/types/schemas.rs index b4d4af65..f9f560d4 100644 --- a/crates/freeze/src/types/schemas.rs +++ b/crates/freeze/src/types/schemas.rs @@ -1,6 +1,4 @@ -use std::collections::HashSet; - -use indexmap::IndexMap; +use indexmap::{IndexMap, IndexSet}; use thiserror::Error; use crate::types::{ColumnEncoding, Datatype}; @@ -116,44 +114,119 @@ impl Datatype { } fn compute_used_columns( - all_columns: HashSet, + all_columns: IndexSet, default_columns: Vec<&str>, include_columns: &Option>, exclude_columns: &Option>, columns: &Option>, -) -> HashSet { - match (columns, include_columns, exclude_columns) { - (Some(columns), _, _) if ((columns.len() == 1) & columns.contains(&"all".to_string())) => { - all_columns - } - (Some(columns), _, _) => columns.iter().map(|x| x.to_string()).collect(), - (_, Some(include), _) if ((include.len() == 1) & include.contains(&"all".to_string())) => { - all_columns - } - (_, Some(include), Some(exclude)) => { - let mut result_set: HashSet = - default_columns.iter().map(|s| s.to_string()).collect(); - let exclude_set: HashSet = exclude.iter().cloned().collect(); - result_set.extend(include.iter().cloned()); - result_set = result_set.difference(&exclude_set).cloned().collect(); - // Permissively skip `include` columns that are not in this dataset (they might apply to other dataset) - result_set.intersection(&all_columns).cloned().collect() +) -> IndexSet { + if let Some(columns) = columns { + if (columns.len() == 1) & columns.contains(&"all".to_string()) { + return all_columns } - (_, Some(include), None) => { - let mut result_set: HashSet = - default_columns.iter().map(|s| s.to_string()).collect(); - result_set.extend(include.iter().cloned()); - // Permissively skip `include` columns that are not in this dataset (they might apply to other dataset) - result_set.intersection(&all_columns).cloned().collect() - } - (_, None, Some(exclude)) => { - let exclude_set: HashSet<_> = exclude.iter().collect(); - default_columns - .into_iter() - .filter(|s| !exclude_set.contains(&s.to_string())) - .map(|s| s.to_string()) - .collect() + return columns.iter().map(|x| x.to_string()).collect() + } + let mut result_set = IndexSet::from_iter(default_columns.iter().map(|s| s.to_string())); + if let Some(include) = include_columns { + if (include.len() == 1) & include.contains(&"all".to_string()) { + return all_columns } - (_, None, None) => default_columns.iter().map(|s| s.to_string()).collect(), + // Permissively skip `include` columns that are not in this dataset (they might apply to + // other dataset) + result_set.extend(include.iter().cloned()); + result_set = result_set.intersection(&all_columns).cloned().collect() + } + if let Some(exclude) = exclude_columns { + let exclude_set = IndexSet::::from_iter(exclude.iter().cloned()); + result_set = result_set.difference(&exclude_set).cloned().collect() + } + result_set +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_table_schema_explicit_cols() { + let cols = Some(vec!["number".to_string(), "hash".to_string()]); + let table = + Datatype::Blocks.table_schema(&ColumnEncoding::Hex, &None, &None, &cols, None).unwrap(); + assert_eq!(vec!["number", "hash"], table.columns()); + + // "all" marker support + let cols = Some(vec!["all".to_string()]); + let table = + Datatype::Blocks.table_schema(&ColumnEncoding::Hex, &None, &None, &cols, None).unwrap(); + assert_eq!(15, table.columns().len()); + assert!(table.columns().contains(&"hash")); + assert!(table.columns().contains(&"transactions_root")); + } + + #[test] + fn test_table_schema_include_cols() { + let inc_cols = Some(vec!["chain_id".to_string(), "receipts_root".to_string()]); + let table = Datatype::Blocks + .table_schema(&ColumnEncoding::Hex, &inc_cols, &None, &None, None) + .unwrap(); + assert_eq!(9, table.columns().len()); + assert_eq!(["chain_id", "receipts_root"], table.columns()[7..9]); + + // Non-existing include is skipped + let inc_cols = Some(vec!["chain_id".to_string(), "foo_bar".to_string()]); + let table = Datatype::Blocks + .table_schema(&ColumnEncoding::Hex, &inc_cols, &None, &None, None) + .unwrap(); + assert_eq!(Some(&"chain_id"), table.columns().last()); + assert!(!table.columns().contains(&"foo_bar")); + + // "all" marker support + let inc_cols = Some(vec!["all".to_string()]); + let table = Datatype::Blocks + .table_schema(&ColumnEncoding::Hex, &inc_cols, &None, &None, None) + .unwrap(); + assert_eq!(15, table.columns().len()); + assert!(table.columns().contains(&"hash")); + assert!(table.columns().contains(&"transactions_root")); + } + + #[test] + fn test_table_schema_exclude_cols() { + // defaults + let table = + Datatype::Blocks.table_schema(&ColumnEncoding::Hex, &None, &None, &None, None).unwrap(); + assert_eq!(7, table.columns().len()); + assert!(table.columns().contains(&"author")); + assert!(table.columns().contains(&"extra_data")); + + let ex_cols = Some(vec!["author".to_string(), "extra_data".to_string()]); + let table = Datatype::Blocks + .table_schema(&ColumnEncoding::Hex, &None, &ex_cols, &None, None) + .unwrap(); + assert_eq!(5, table.columns().len()); + assert!(!table.columns().contains(&"author")); + assert!(!table.columns().contains(&"extra_data")); + + // Non-existing exclude is ignored + let ex_cols = Some(vec!["timestamp".to_string(), "foo_bar".to_string()]); + let table = Datatype::Blocks + .table_schema(&ColumnEncoding::Hex, &None, &ex_cols, &None, None) + .unwrap(); + assert_eq!(6, table.columns().len()); + assert!(!table.columns().contains(&"timestamp")); + assert!(!table.columns().contains(&"foo_bar")); + } + + #[test] + fn test_table_schema_include_and_exclude_cols() { + let inc_cols = Some(vec!["chain_id".to_string(), "receipts_root".to_string()]); + let ex_cols = Some(vec!["author".to_string(), "extra_data".to_string()]); + let table = Datatype::Blocks + .table_schema(&ColumnEncoding::Hex, &inc_cols, &ex_cols, &None, None) + .unwrap(); + assert!(!table.columns().contains(&"author")); + assert!(!table.columns().contains(&"extra_data")); + assert_eq!(7, table.columns().len()); + assert_eq!(["chain_id", "receipts_root"], table.columns()[5..7]); } } From 5834bcd0c418646449c9a49b3b68fbf5a6698dde Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Tue, 22 Aug 2023 07:52:57 +0200 Subject: [PATCH 03/16] Better error propagation / display. (#38) --- crates/freeze/src/types/errors.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/freeze/src/types/errors.rs b/crates/freeze/src/types/errors.rs index 64516d3c..e3f697bf 100644 --- a/crates/freeze/src/types/errors.rs +++ b/crates/freeze/src/types/errors.rs @@ -7,7 +7,7 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum FreezeError { /// Error related to processing file path - #[error("Failed to create file path")] + #[error(transparent)] FilePathError(#[from] FileError), /// Error related to joining a tokio task @@ -15,7 +15,7 @@ pub enum FreezeError { TaskFailed(#[source] tokio::task::JoinError), /// Error related to collecting data - #[error("Collect error")] + #[error(transparent)] CollectError(#[from] CollectError), /// Error related to progress bar @@ -23,7 +23,7 @@ pub enum FreezeError { ProgressBarError(#[from] indicatif::style::TemplateError), /// Parse error - #[error("Parsing error")] + #[error(transparent)] ParseError(#[from] ParseError), /// Error from serializing report @@ -79,7 +79,7 @@ pub enum CollectError { #[derive(Error, Debug)] pub enum ParseError { /// Error related to parsing - #[error("Parsing error")] + #[error("Parsing error {0:?}")] ParseError(String), /// Error related to provider operations From 51c8450aa3732c73d701a047b0a74853b15c2f96 Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Tue, 22 Aug 2023 08:05:41 +0200 Subject: [PATCH 04/16] Return error from main when some chunks failed. (#42) --- crates/cli/src/main.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index d7ab8e71..e1fad694 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -15,7 +15,8 @@ use eyre::Result; async fn main() -> Result<()> { let args = Args::parse(); match run::run(args).await { - Ok(Some(_freeze_summary)) => Ok(()), + Ok(Some(freeze_summary)) if freeze_summary.n_errored == 0 => Ok(()), + Ok(Some(_freeze_summary)) => Err(eyre::Error::msg("Some chunks failed")), Ok(None) => Ok(()), Err(e) => Err(eyre::Report::from(e)), } From 3b37635f7a5b8784f58e9f1e66e84ae9bdb06c9c Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Tue, 29 Aug 2023 22:34:50 +0200 Subject: [PATCH 05/16] Factor out dependency definitions into workspace Cargo.toml. (#36) * Factor out dependency definitions into workspace Cargo.toml. * Cleanup dependencies and fix warnings. --- Cargo.lock | 72 ++++++++++++++++++---------------------- Cargo.toml | 50 +++++++++++++++++++++++----- crates/cli/Cargo.toml | 31 ++++++++--------- crates/freeze/Cargo.toml | 27 ++++++++------- crates/python/Cargo.toml | 13 ++++---- 5 files changed, 109 insertions(+), 84 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 06531cb3..7a28ce13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -564,6 +564,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "time 0.1.45", "wasm-bindgen", "winapi", @@ -899,7 +900,7 @@ dependencies = [ "polars", "pyo3", "pyo3-asyncio", - "pyo3-build-config 0.18.3", + "pyo3-build-config", "pyo3-polars", "tokio", ] @@ -1227,9 +1228,9 @@ dependencies = [ [[package]] name = "ethers" -version = "2.0.7" +version = "2.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a58ce802c65cf3d0756dee5a61094a92cde53c1583b246e9ee5b37226c7fc15" +checksum = "96b4026b97da8281276744741fac7eb385da905f6093c583331fa2953fdd4253" dependencies = [ "ethers-addressbook", "ethers-contract", @@ -1243,9 +1244,9 @@ dependencies = [ [[package]] name = "ethers-addressbook" -version = "2.0.7" +version = "2.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b856b7b8ff5c961093cb8efe151fbcce724b451941ce20781de11a531ccd578" +checksum = "edcb6ffefc230d8c42874c51b28dc11dbb8de50b27a8fdf92648439d6baa68dc" dependencies = [ "ethers-core", "once_cell", @@ -1255,14 +1256,15 @@ dependencies = [ [[package]] name = "ethers-contract" -version = "2.0.7" +version = "2.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e066a0d9cfc70c454672bf16bb433b0243427420076dc5b2f49c448fb5a10628" +checksum = "0d4719a44c3d37ab07c6dea99ab174068d8c35e441b60b6c20ce4e48357273e8" dependencies = [ "ethers-contract-abigen", "ethers-contract-derive", "ethers-core", "ethers-providers", + "ethers-signers", "futures-util", "hex", "once_cell", @@ -1274,9 +1276,9 @@ dependencies = [ [[package]] name = "ethers-contract-abigen" -version = "2.0.7" +version = "2.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c113e3e86b6bc16d98484b2c3bb2d01d6fed9f489fe2e592e5cc87c3024d616b" +checksum = "155ea1b84d169d231317ed86e307af6f2bed6b40dd17e5e94bc84da21cadb21c" dependencies = [ "Inflector", "dunce", @@ -1298,9 +1300,9 @@ dependencies = [ [[package]] name = "ethers-contract-derive" -version = "2.0.7" +version = "2.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c3fb5adee25701c79ec58fcf2c63594cd8829bc9ad6037ff862d5a111101ed2" +checksum = "8567ff196c4a37c1a8c90ec73bda0ad2062e191e4f0a6dc4d943e2ec4830fc88" dependencies = [ "Inflector", "ethers-contract-abigen", @@ -1344,9 +1346,9 @@ dependencies = [ [[package]] name = "ethers-etherscan" -version = "2.0.7" +version = "2.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84ebb401ba97c6f5af278c2c9936c4546cad75dec464b439ae6df249906f4caa" +checksum = "22b3a8269d3df0ed6364bc05b4735b95f4bf830ce3aef87d5e760fb0e93e5b91" dependencies = [ "ethers-core", "reqwest", @@ -1359,9 +1361,9 @@ dependencies = [ [[package]] name = "ethers-middleware" -version = "2.0.7" +version = "2.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "740f4a773c19dd6d6a68c8c2e0996c096488d38997d524e21dc612c55da3bd24" +checksum = "e0c339aad74ae5c451d27e0e49c7a3c7d22620b119b4f9291d7aa21f72d7f366" dependencies = [ "async-trait", "auto_impl", @@ -1386,9 +1388,9 @@ dependencies = [ [[package]] name = "ethers-providers" -version = "2.0.7" +version = "2.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56b498fd2a6c019d023e43e83488cd1fb0721f299055975aa6bac8dbf1e95f2c" +checksum = "b411b119f1cf0efb69e2190883dee731251882bb21270f893ee9513b3a697c48" dependencies = [ "async-trait", "auto_impl", @@ -1424,9 +1426,9 @@ dependencies = [ [[package]] name = "ethers-signers" -version = "2.0.7" +version = "2.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02c4b7e15f212fa7cc2e1251868320221d4ff77a3d48068e69f47ce1c491df2d" +checksum = "4864d387456a9c09a1157fa10e1528b29d90f1d859443acf06a1b23365fb518c" dependencies = [ "async-trait", "coins-bip32", @@ -1443,9 +1445,9 @@ dependencies = [ [[package]] name = "ethers-solc" -version = "2.0.7" +version = "2.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a81c89f121595cf8959e746045bb8b25a6a38d72588561e1a3b7992fc213f674" +checksum = "7a6c2b9625a2c639d46625f88acc2092a3cb35786c37f7c2128b3ca20f639b3c" dependencies = [ "cfg-if", "dunce", @@ -2186,9 +2188,9 @@ dependencies = [ [[package]] name = "lalrpop" -version = "0.19.12" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a1cbf952127589f2851ab2046af368fd20645491bb4b376f04b7f94d7a9837b" +checksum = "da4081d44f4611b66c6dd725e6de3169f9f63905421e8626fcb86b6a898998b8" dependencies = [ "ascii-canvas", "bit-set", @@ -2199,7 +2201,7 @@ dependencies = [ "lalrpop-util", "petgraph", "regex", - "regex-syntax 0.6.29", + "regex-syntax 0.7.2", "string_cache", "term", "tiny-keccak", @@ -2208,9 +2210,9 @@ dependencies = [ [[package]] name = "lalrpop-util" -version = "0.19.12" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3c48237b9604c5a4702de6b824e02006c3214327564636aef27c1028a8fa0ed" +checksum = "3f35c735096c0293d313e8f2a641627472b83d01b937177fe76e5e2708d31e0d" [[package]] name = "lazy_static" @@ -3215,7 +3217,7 @@ dependencies = [ "libc", "memoffset", "parking_lot", - "pyo3-build-config 0.19.2", + "pyo3-build-config", "pyo3-ffi", "pyo3-macros", "unindent", @@ -3234,16 +3236,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "pyo3-build-config" -version = "0.18.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cb946f5ac61bb61a5014924910d936ebd2b23b705f7a4a3c40b05c720b079a3" -dependencies = [ - "once_cell", - "target-lexicon", -] - [[package]] name = "pyo3-build-config" version = "0.19.2" @@ -3261,7 +3253,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e53cee42e77ebe256066ba8aa77eff722b3bb91f3419177cf4cd0f304d3284d9" dependencies = [ "libc", - "pyo3-build-config 0.19.2", + "pyo3-build-config", ] [[package]] @@ -3966,9 +3958,9 @@ dependencies = [ [[package]] name = "solang-parser" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a94494913728908efa7a25a2dd2e4f037e714897985c24273c40596638ed909" +checksum = "9c792fe9fae2a2f716846f214ca10d5a1e21133e0bf36cef34bcc4a852467b21" dependencies = [ "itertools", "lalrpop", diff --git a/Cargo.toml b/Cargo.toml index 87eb217f..080ea8b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,6 @@ [workspace] -members = [ - "crates/cli", - "crates/freeze", - "crates/python", -] +members = ["crates/cli", "crates/freeze", "crates/python"] # Explicitly set the resolver to version 2, which is the default for packages with edition >= 2021 but not virtual workspaces. # https://doc.rust-lang.org/edition-guide/rust-2021/default-cargo-resolver.html @@ -16,12 +12,48 @@ edition = "2021" license = "MIT OR Apache-2.0" homepage = "https://github.com/paradigmxyz/cryo" repository = "https://github.com/paradigmxyz/cryo" -exclude = [ - ".github/", -] +exclude = [".github/"] + +[workspace.dependencies] +anstyle = "1.0.1" +async-trait = "0.1.68" +chrono = { version = "0.4.26", features = ["serde"] } +clap_cryo = { version = "4.3.21-cryo", features = [ + "derive", + "color", + "unstable-styles", +] } +colored = "2.0.0" +color-print = "0.3.4" +ethers = { version = "2.0.8", features = ["rustls", "ws", "ipc"] } +ethers-core = "2.0.8" +eyre = "0.6.8" +futures = "0.3.28" +governor = "0.5.1" +hex = "0.4.3" +indexmap = "2.0.0" +indicatif = "0.17.5" +polars = { version = "0.32.1", features = [ + "parquet", + "string_encoding", + "polars-lazy", + "lazy", + "binary_encoding", + "json", + "dtype-struct", +] } +prefix-hex = "0.7.0" +pyo3 = { version = "0.19.2", features = ["extension-module"] } +pyo3-build-config = "0.19.0" +pyo3-asyncio = { version = "0.19.0", features = ["tokio-runtime"] } +pyo3-polars = "0.6.0" +serde = { version = "1.0.183", features = ["derive"] } +serde_json = "1.0.104" +thiserror = "1.0.40" +thousands = "0.2.0" +tokio = { version = "1.29.0", features = ["macros", "rt-multi-thread", "sync"] } [profile.dev] incremental = true debug = 1 codegen-units = 32 - diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 9613978a..8e136a09 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -19,19 +19,20 @@ path = "src/main.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -clap_cryo = { version = "4.3.21-cryo", features = ["derive", "color", "unstable-styles"] } -color-print = "0.3.4" -ethers = "2.0.7" -hex = "0.4.3" -indicatif = "0.17.5" -polars = "0.32.1" -tokio = "1.29.0" cryo_freeze = { version = "0.2.0", path = "../freeze" } -colored = "2.0.0" -thousands = "0.2.0" -chrono = "0.4.26" -anstyle = "1.0.1" -eyre = "0.6.8" -governor = "0.5.1" -serde = { version = "1.0.183", features = ["derive"] } -serde_json = "1.0.104" + +anstyle = { workspace = true } +chrono = { workspace = true } +clap_cryo = { workspace = true } +colored = { workspace = true } +color-print = { workspace = true } +ethers = { workspace = true } +eyre = { workspace = true } +governor = { workspace = true } +hex = { workspace = true } +indicatif = { workspace = true } +polars = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +thousands = { workspace = true } +tokio = { workspace = true } diff --git a/crates/freeze/Cargo.toml b/crates/freeze/Cargo.toml index da06c6f6..4a6c51fa 100644 --- a/crates/freeze/Cargo.toml +++ b/crates/freeze/Cargo.toml @@ -10,17 +10,16 @@ repository.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -async-trait = "0.1.68" -ethers = { version = "2.0.7", features = ["rustls", "ws", "ipc"] } -ethers-core = "2.0.8" -futures = "0.3.28" -governor = "0.5.1" -indexmap = "2.0.0" -indicatif = "0.17.5" -polars = { version = "0.32.1", features = ["parquet", "string_encoding", "polars-lazy", "lazy", "binary_encoding", "json", "dtype-struct"] } -prefix-hex = "0.7.0" -serde = { version = "1.0.183", features = ["derive"] } -serde_json = "1.0.104" -thiserror = "1.0.40" -tokio = { version = "1.28.2", features = ["macros", "rt-multi-thread", "sync"] } - +async-trait = { workspace = true } +ethers = { workspace = true } +ethers-core = { workspace = true } +futures = { workspace = true } +governor = { workspace = true } +indexmap = { workspace = true } +indicatif = { workspace = true } +polars = { workspace = true } +prefix-hex = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } diff --git a/crates/python/Cargo.toml b/crates/python/Cargo.toml index f988d0df..ab249f2a 100644 --- a/crates/python/Cargo.toml +++ b/crates/python/Cargo.toml @@ -12,11 +12,12 @@ crate-type = ["cdylib"] [dependencies] cryo_cli = { version = "0.2.0", path = "../cli" } cryo_freeze = { version = "0.2.0", path = "../freeze" } -polars = { version = "0.32.1", features = ["parquet", "string_encoding", "polars-lazy", "lazy", "binary_encoding", "json", "dtype-struct"] } -pyo3 = { version = "0.19.2", features = ["extension-module"] } -pyo3-asyncio = { version = "0.19.0", features = ["tokio-runtime"] } -pyo3-polars = "0.6.0" -tokio = "1.29.0" + +polars = { workspace = true } +pyo3 = { workspace = true } +pyo3-asyncio = { workspace = true } +pyo3-polars = { workspace = true } +tokio = { workspace = true } [build-dependencies] -pyo3-build-config = "0.18.0" +pyo3-build-config = { workspace = true } From 856c4aeef926856f8a19d5a67afceb56742cf8af Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Tue, 29 Aug 2023 22:58:52 +0200 Subject: [PATCH 06/16] Use Path instead of String for output dir. (#37) * Use Path instead of String for output dir. * Use repo's cargo fmt. --- crates/cli/src/parse/file_output.rs | 9 +++---- crates/cli/src/summaries.rs | 2 +- crates/freeze/src/types/chunks/chunk.rs | 6 ++--- crates/freeze/src/types/chunks/chunk_ops.rs | 7 ++---- crates/freeze/src/types/dataframes/export.rs | 26 +++++++++++--------- crates/freeze/src/types/files.rs | 2 +- crates/freeze/src/types/queries.rs | 13 +++++----- crates/freeze/src/types/summaries.rs | 12 ++++----- crates/python/src/freeze_adapter.rs | 2 +- 9 files changed, 38 insertions(+), 41 deletions(-) diff --git a/crates/cli/src/parse/file_output.rs b/crates/cli/src/parse/file_output.rs index cb6010ed..2ef4f009 100644 --- a/crates/cli/src/parse/file_output.rs +++ b/crates/cli/src/parse/file_output.rs @@ -8,12 +8,9 @@ use crate::args::Args; pub(crate) fn parse_file_output(args: &Args, source: &Source) -> Result { // process output directory - let output_dir = std::fs::canonicalize(args.output_dir.clone()) - .map_err(|_e| { - ParseError::ParseError("Failed to canonicalize output directory".to_string()) - })? - .to_string_lossy() - .into_owned(); + let output_dir = std::fs::canonicalize(args.output_dir.clone()).map_err(|_e| { + ParseError::ParseError("Failed to canonicalize output directory".to_string()) + })?; match fs::create_dir_all(&output_dir) { Ok(_) => {} Err(e) => return Err(ParseError::ParseError(format!("Error creating directory: {}", e))), diff --git a/crates/cli/src/summaries.rs b/crates/cli/src/summaries.rs index 7bd496ec..20521fbb 100644 --- a/crates/cli/src/summaries.rs +++ b/crates/cli/src/summaries.rs @@ -57,7 +57,7 @@ pub(crate) fn print_cryo_summary( print_bullet("inner request size", source.inner_request_size.to_string()); }; print_bullet("output format", sink.format.as_str()); - print_bullet("output dir", &sink.output_dir); + print_bullet("output dir", sink.output_dir.to_string_lossy()); match report_path { None => print_bullet("report file", "None"), Some(path) => print_bullet("report file", path), diff --git a/crates/freeze/src/types/chunks/chunk.rs b/crates/freeze/src/types/chunks/chunk.rs index eda432dc..5699f3b7 100644 --- a/crates/freeze/src/types/chunks/chunk.rs +++ b/crates/freeze/src/types/chunks/chunk.rs @@ -1,5 +1,5 @@ use crate::types::{Datatype, FileError, FileOutput}; -use std::collections::HashMap; +use std::{collections::HashMap, path::PathBuf}; use super::{binary_chunk::BinaryChunk, chunk_ops::ChunkData, number_chunk::NumberChunk}; @@ -33,7 +33,7 @@ impl Chunk { datatype: &Datatype, file_output: &FileOutput, chunk_label: &Option, - ) -> Result { + ) -> Result { match self { Chunk::Block(chunk) => chunk.filepath(datatype, file_output, chunk_label), Chunk::Transaction(chunk) => chunk.filepath(datatype, file_output, chunk_label), @@ -47,7 +47,7 @@ impl Chunk { datatypes: Vec<&Datatype>, file_output: &FileOutput, chunk_label: &Option, - ) -> Result, FileError> { + ) -> Result, FileError> { let mut paths = HashMap::new(); for datatype in datatypes { let path = self.filepath(datatype, file_output, chunk_label)?; diff --git a/crates/freeze/src/types/chunks/chunk_ops.rs b/crates/freeze/src/types/chunks/chunk_ops.rs index 65e4e3de..2e239c0e 100644 --- a/crates/freeze/src/types/chunks/chunk_ops.rs +++ b/crates/freeze/src/types/chunks/chunk_ops.rs @@ -33,7 +33,7 @@ pub trait ChunkData: Sized { datatype: &Datatype, file_output: &FileOutput, chunk_label: &Option, - ) -> Result { + ) -> Result { let network_name = file_output.prefix.clone(); let stub = match chunk_label { Some(chunk_label) => chunk_label.clone(), @@ -46,10 +46,7 @@ pub trait ChunkData: Sized { None => vec![network_name, datatype.dataset().name().to_string(), stub], }; let filename = format!("{}.{}", pieces.join("__"), file_output.format.as_str()); - match file_output.output_dir.as_str() { - "." => Ok(filename), - output_dir => Ok(output_dir.to_string() + "/" + filename.as_str()), - } + Ok(file_output.output_dir.join(filename)) } } diff --git a/crates/freeze/src/types/dataframes/export.rs b/crates/freeze/src/types/dataframes/export.rs index a2ba38dc..e8b1d1fe 100644 --- a/crates/freeze/src/types/dataframes/export.rs +++ b/crates/freeze/src/types/dataframes/export.rs @@ -1,4 +1,7 @@ -use std::collections::HashMap; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, +}; use polars::prelude::*; @@ -6,7 +9,7 @@ use crate::types::{FileError, FileOutput}; pub(crate) fn dfs_to_files( dfs: &mut HashMap, - filenames: &HashMap, + filenames: &HashMap, file_output: &FileOutput, ) -> Result<(), FileError> where @@ -28,15 +31,14 @@ where /// write polars dataframe to file pub(crate) fn df_to_file( df: &mut DataFrame, - filename: &str, + filename: &Path, file_output: &FileOutput, ) -> Result<(), FileError> { - let binding = filename.to_string() + "_tmp"; - let tmp_filename = binding.as_str(); - let result = match filename { - _ if filename.ends_with(".parquet") => df_to_parquet(df, tmp_filename, file_output), - _ if filename.ends_with(".csv") => df_to_csv(df, tmp_filename), - _ if filename.ends_with(".json") => df_to_json(df, tmp_filename), + let tmp_filename = filename.with_extension("_tmp"); + let result = match filename.extension().and_then(|ex| ex.to_str()) { + Some("parquet") => df_to_parquet(df, &tmp_filename, file_output), + Some("csv") => df_to_csv(df, &tmp_filename), + Some("json") => df_to_json(df, &tmp_filename), _ => return Err(FileError::FileWriteError), }; match result { @@ -48,7 +50,7 @@ pub(crate) fn df_to_file( /// write polars dataframe to parquet file fn df_to_parquet( df: &mut DataFrame, - filename: &str, + filename: &Path, file_output: &FileOutput, ) -> Result<(), FileError> { let file = std::fs::File::create(filename).map_err(|_e| FileError::FileWriteError)?; @@ -64,7 +66,7 @@ fn df_to_parquet( } /// write polars dataframe to csv file -fn df_to_csv(df: &mut DataFrame, filename: &str) -> Result<(), FileError> { +fn df_to_csv(df: &mut DataFrame, filename: &Path) -> Result<(), FileError> { let file = std::fs::File::create(filename).map_err(|_e| FileError::FileWriteError)?; let result = CsvWriter::new(file).finish(df); match result { @@ -74,7 +76,7 @@ fn df_to_csv(df: &mut DataFrame, filename: &str) -> Result<(), FileError> { } /// write polars dataframe to json file -fn df_to_json(df: &mut DataFrame, filename: &str) -> Result<(), FileError> { +fn df_to_json(df: &mut DataFrame, filename: &Path) -> Result<(), FileError> { let file = std::fs::File::create(filename).map_err(|_e| FileError::FileWriteError)?; let result = JsonWriter::new(file).with_json_format(JsonFormat::Json).finish(df); match result { diff --git a/crates/freeze/src/types/files.rs b/crates/freeze/src/types/files.rs index 0866719b..eff709cf 100644 --- a/crates/freeze/src/types/files.rs +++ b/crates/freeze/src/types/files.rs @@ -4,7 +4,7 @@ use polars::prelude::*; #[derive(Clone)] pub struct FileOutput { /// Path of directory where to save files - pub output_dir: String, + pub output_dir: std::path::PathBuf, /// Prefix of file name pub prefix: String, /// Suffix to use at the end of file names diff --git a/crates/freeze/src/types/queries.rs b/crates/freeze/src/types/queries.rs index ccc5afa8..6943a60c 100644 --- a/crates/freeze/src/types/queries.rs +++ b/crates/freeze/src/types/queries.rs @@ -1,4 +1,7 @@ -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + path::{Path, PathBuf}, +}; use ethers::prelude::*; @@ -34,7 +37,7 @@ pub struct MultiQuery { impl MultiQuery { /// get number of chunks that have not yet been collected pub fn get_n_chunks_remaining(&self, sink: &FileOutput) -> Result { - let actual_files: HashSet = list_files(&sink.output_dir) + let actual_files: HashSet = list_files(&sink.output_dir) .map_err(|_e| { FreezeError::CollectError(CollectError::CollectError( "could not list files in output dir".to_string(), @@ -53,13 +56,11 @@ impl MultiQuery { } } -fn list_files(dir: &str) -> Result, std::io::Error> { +fn list_files(dir: &Path) -> Result, std::io::Error> { let mut file_list = Vec::new(); for entry in std::fs::read_dir(dir)? { let entry = entry?; - if let Some(filename) = entry.path().to_str() { - file_list.push(filename.to_string()); - } + file_list.push(entry.path()); } Ok(file_list) } diff --git a/crates/freeze/src/types/summaries.rs b/crates/freeze/src/types/summaries.rs index 5f8c72d8..4be793c0 100644 --- a/crates/freeze/src/types/summaries.rs +++ b/crates/freeze/src/types/summaries.rs @@ -1,5 +1,5 @@ use crate::types::Datatype; -use std::collections::HashMap; +use std::{collections::HashMap, path::PathBuf}; /// Summary of freeze operation #[derive(serde::Serialize, Debug)] @@ -11,7 +11,7 @@ pub struct FreezeSummary { /// number of chunks that encountered an error pub n_errored: u64, /// paths - pub paths: HashMap>, + pub paths: HashMap>, } pub(crate) trait FreezeSummaryAgg { @@ -49,19 +49,19 @@ pub struct FreezeChunkSummary { /// whether chunk encountered an error pub errored: bool, /// output paths - pub paths: HashMap, + pub paths: HashMap, } impl FreezeChunkSummary { - pub(crate) fn success(paths: HashMap) -> FreezeChunkSummary { + pub(crate) fn success(paths: HashMap) -> FreezeChunkSummary { FreezeChunkSummary { skipped: false, errored: false, paths } } - pub(crate) fn error(paths: HashMap) -> FreezeChunkSummary { + pub(crate) fn error(paths: HashMap) -> FreezeChunkSummary { FreezeChunkSummary { skipped: false, errored: true, paths } } - pub(crate) fn skip(paths: HashMap) -> FreezeChunkSummary { + pub(crate) fn skip(paths: HashMap) -> FreezeChunkSummary { FreezeChunkSummary { skipped: true, errored: false, paths } } } diff --git a/crates/python/src/freeze_adapter.rs b/crates/python/src/freeze_adapter.rs index 0d6d88d3..85a9c161 100644 --- a/crates/python/src/freeze_adapter.rs +++ b/crates/python/src/freeze_adapter.rs @@ -132,7 +132,7 @@ pub fn _freeze( let paths = PyDict::new(py); for (key, values) in &result.paths { let key = key.dataset().name(); - let values: Vec<&str> = values.iter().map(AsRef::as_ref).collect(); + let values: Vec<&str> = values.iter().filter_map(|p| p.to_str()).collect(); paths.set_item(key, values).unwrap(); } let paths = paths.to_object(py); From 0d4501e61cf732eb1aa6482eca6cc60d172e91e5 Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Tue, 29 Aug 2023 23:11:09 +0200 Subject: [PATCH 07/16] Fix warnings. (#39) * Fix warnings. * Use repo's cargo fmt. --- crates/cli/src/parse/blocks.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/cli/src/parse/blocks.rs b/crates/cli/src/parse/blocks.rs index 87148aaf..bbe77b89 100644 --- a/crates/cli/src/parse/blocks.rs +++ b/crates/cli/src/parse/blocks.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use ethers::prelude::*; use polars::prelude::*; From 451fb235d0d14b8ae4d6b48bd715cfdada3394dc Mon Sep 17 00:00:00 2001 From: sslivkoff Date: Tue, 29 Aug 2023 15:07:32 -0700 Subject: [PATCH 08/16] fix n_chunks_remaining when using --overwrite --- crates/freeze/src/types/files.rs | 4 ++-- crates/freeze/src/types/queries.rs | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/freeze/src/types/files.rs b/crates/freeze/src/types/files.rs index eff709cf..bb6ec00e 100644 --- a/crates/freeze/src/types/files.rs +++ b/crates/freeze/src/types/files.rs @@ -1,7 +1,7 @@ use polars::prelude::*; /// Options for file output -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct FileOutput { /// Path of directory where to save files pub output_dir: std::path::PathBuf, @@ -22,7 +22,7 @@ pub struct FileOutput { } /// File format -#[derive(Clone, Eq, PartialEq)] +#[derive(Clone, Eq, PartialEq, Debug)] pub enum FileFormat { /// Parquet file format Parquet, diff --git a/crates/freeze/src/types/queries.rs b/crates/freeze/src/types/queries.rs index 6943a60c..e8982da6 100644 --- a/crates/freeze/src/types/queries.rs +++ b/crates/freeze/src/types/queries.rs @@ -37,6 +37,9 @@ pub struct MultiQuery { impl MultiQuery { /// get number of chunks that have not yet been collected pub fn get_n_chunks_remaining(&self, sink: &FileOutput) -> Result { + if sink.overwrite { + return Ok(self.chunks.len() as u64) + }; let actual_files: HashSet = list_files(&sink.output_dir) .map_err(|_e| { FreezeError::CollectError(CollectError::CollectError( From 623fe1660f9acbbc904f97512828635dee7a15b8 Mon Sep 17 00:00:00 2001 From: sslivkoff Date: Tue, 29 Aug 2023 15:29:10 -0700 Subject: [PATCH 09/16] add check that each included column ends up in at least one schema --- crates/cli/src/parse/query.rs | 26 ++++++++++++++++++++++++++ crates/freeze/src/types/errors.rs | 2 +- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/crates/cli/src/parse/query.rs b/crates/cli/src/parse/query.rs index 4504822b..f5c55c12 100644 --- a/crates/cli/src/parse/query.rs +++ b/crates/cli/src/parse/query.rs @@ -107,6 +107,32 @@ fn parse_schemas(args: &Args) -> Result, ParseError> { }) }) .collect(); + + // make sure all included columns ended up in at least one schema + if let (Ok(schemas), Some(include_columns)) = (&schemas, &args.include_columns) { + let mut unknown_columns = Vec::new(); + for column in include_columns.iter() { + let mut in_a_schema = false; + + for schema in schemas.values() { + if schema.has_column(column) { + in_a_schema = true; + break + } + } + + if !in_a_schema { + unknown_columns.push(column); + } + } + if !unknown_columns.is_empty() { + return Err(ParseError::ParseError(format!( + "datatypes do not support these columns: {:?}", + unknown_columns + ))) + } + }; + schemas } diff --git a/crates/freeze/src/types/errors.rs b/crates/freeze/src/types/errors.rs index 64516d3c..cf477fda 100644 --- a/crates/freeze/src/types/errors.rs +++ b/crates/freeze/src/types/errors.rs @@ -79,7 +79,7 @@ pub enum CollectError { #[derive(Error, Debug)] pub enum ParseError { /// Error related to parsing - #[error("Parsing error")] + #[error("Parsing error: {0}")] ParseError(String), /// Error related to provider operations From 50b81c5f905391d976cdb2611623367041540bcf Mon Sep 17 00:00:00 2001 From: sslivkoff Date: Tue, 29 Aug 2023 22:05:18 -0700 Subject: [PATCH 10/16] add Schema Design Guide to README.md --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index ef149caa..d357f3a3 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,16 @@ Many `cryo` cli options will affect output schemas by adding/removing columns or `cryo` will always print out data schemas before collecting any data. To view these schemas without collecting data, use `--dry` to perform a dry run. +#### Schema Design Guide + +An attempt is made to ensure that the dataset schemas conform to a common set of design guidelines: +- By default, rows should contain enough information be order-able +- Columns should be named by their JSON-RPC or ethers.rs defaults, except in cases where a much more explicit name is available +- To make joins across tables easier, a given piece of information should use the same datatype and column name across tables when possible +- Large ints such as `u256` should allow multiple conversions. A `value` column of type `u256` should allow: `value_binary`, `value_string`, `value_f64`, `value_decimal128`, `value_u64_high`, and `value_u64_low` +- By default, columns related to non-identifying cryptographic signatures are omitted by default. For example, `state_root` of a block or `v`/`r`/`s` of a transaction +- Integer values that can never be negative should be stored as unsigned integers + #### JSON-RPC `cryo` currently obtains all of its data using the [JSON-RPC](https://ethereum.org/en/developers/docs/apis/json-rpc/) protocol standard. From ff67c15fddf361ed1f4d8a19f016807936202af4 Mon Sep 17 00:00:00 2001 From: sslivkoff Date: Tue, 29 Aug 2023 22:13:55 -0700 Subject: [PATCH 11/16] change .cryo_reports to .cryo --- crates/cli/src/args.rs | 2 +- crates/cli/src/reports.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/cli/src/args.rs b/crates/cli/src/args.rs index 82f46928..d9b474a3 100644 --- a/crates/cli/src/args.rs +++ b/crates/cli/src/args.rs @@ -134,7 +134,7 @@ pub struct Args { pub compression: Vec, /// Directory to save summary report - /// [default: {output_dir}/.cryo_reports] + /// [default: {output_dir}/.cryo/reports] #[arg(long, help_heading = "Output Options")] pub report_dir: Option, diff --git a/crates/cli/src/reports.rs b/crates/cli/src/reports.rs index dcf1121c..e62e4b7d 100644 --- a/crates/cli/src/reports.rs +++ b/crates/cli/src/reports.rs @@ -19,7 +19,7 @@ pub(crate) fn get_report_path( ) -> Result { let report_dir = match &args.report_dir { Some(report_dir) => Path::new(&report_dir).into(), - None => Path::new(&args.output_dir).join(".cryo_reports"), + None => Path::new(&args.output_dir).join(".cryo/reports"), }; std::fs::create_dir_all(&report_dir)?; let t_start: DateTime = t_start.into(); From fdfe7f6befaa5e76676fdd47b48b40b0056bd991 Mon Sep 17 00:00:00 2001 From: sslivkoff Date: Tue, 29 Aug 2023 22:23:59 -0700 Subject: [PATCH 12/16] format help text so that it fits in 80 column terminal --- crates/cli/src/args.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/crates/cli/src/args.rs b/crates/cli/src/args.rs index d9b474a3..90b7b19f 100644 --- a/crates/cli/src/args.rs +++ b/crates/cli/src/args.rs @@ -23,12 +23,12 @@ pub struct Args { )] pub txs: Option>, - /// Align block chunk boundaries to regular intervals, - /// e.g. (1000, 2000, 3000) instead of (1106, 2106, 3106) + /// Align chunk boundaries to regular intervals, + /// e.g. (1000 2000 3000), not (1106 2106 3106) #[arg(short, long, help_heading = "Content Options", verbatim_doc_comment)] pub align: bool, - /// Reorg buffer, save blocks only when they are this old, + /// Reorg buffer, save blocks only when this old, /// can be a number of blocks #[arg( long, @@ -39,7 +39,7 @@ pub struct Args { )] pub reorg_buffer: u64, - /// Columns to include alongside the default output, + /// Columns to include alongside the defaults, /// use `all` to include all available columns #[arg(short, long, value_name="COLS", num_args(0..), verbatim_doc_comment, help_heading="Content Options")] pub include_columns: Option>, @@ -57,7 +57,7 @@ pub struct Args { #[arg(long, help_heading = "Content Options")] pub hex: bool, - /// Columns(s) to sort by, `none` to disable sorting + /// Columns(s) to sort by, `none` for unordered #[arg(short, long, num_args(0..), help_heading="Content Options")] pub sort: Option>, @@ -65,7 +65,7 @@ pub struct Args { #[arg(short, long, help_heading = "Source Options")] pub rpc: Option, - /// Network name [default: use name of eth_getChainId] + /// Network name [default: name of eth_getChainId] #[arg(long, help_heading = "Source Options")] pub network_name: Option, @@ -105,7 +105,7 @@ pub struct Args { #[arg(long, help_heading = "Output Options")] pub file_suffix: Option, - /// Overwrite existing files instead of skipping them + /// Overwrite existing files instead of skipping #[arg(long, help_heading = "Output Options")] pub overwrite: bool, @@ -129,13 +129,13 @@ pub struct Args { #[arg(long, help_heading = "Output Options")] pub no_stats: bool, - /// Set compression algorithm and level + /// Compression algorithm and level #[arg(long, help_heading="Output Options", value_name="NAME [#]", num_args(1..=2), default_value = "lz4")] pub compression: Vec, /// Directory to save summary report /// [default: {output_dir}/.cryo/reports] - #[arg(long, help_heading = "Output Options")] + #[arg(long, help_heading = "Output Options", verbatim_doc_comment)] pub report_dir: Option, /// Avoid saving a summary report @@ -162,7 +162,7 @@ pub struct Args { #[arg(long, help_heading = "Dataset-specific Options")] pub topic3: Option, - /// [logs] Number of blocks per log request + /// [logs] Blocks per request #[arg( long, value_name = "BLOCKS", From be6de0c1b0db8e944c9b04d1e5cbf43921255e96 Mon Sep 17 00:00:00 2001 From: sslivkoff Date: Tue, 29 Aug 2023 23:54:59 -0700 Subject: [PATCH 13/16] add standard types across tables to schema design guide --- README.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.md b/README.md index d357f3a3..5c40bb45 100644 --- a/README.md +++ b/README.md @@ -97,6 +97,14 @@ An attempt is made to ensure that the dataset schemas conform to a common set of - By default, columns related to non-identifying cryptographic signatures are omitted by default. For example, `state_root` of a block or `v`/`r`/`s` of a transaction - Integer values that can never be negative should be stored as unsigned integers +Standard types across tables: +- `block_number`: `u32` +- `transaction_index`: `u32` +- `nonce`: `u32` +- `gas_used`: `u32` +- `gas_limit`: `u32` +- `chain_id`: `u64` + #### JSON-RPC `cryo` currently obtains all of its data using the [JSON-RPC](https://ethereum.org/en/developers/docs/apis/json-rpc/) protocol standard. From ac8f201b03a459c20dec18ba6a28f68e42e8e224 Mon Sep 17 00:00:00 2001 From: sslivkoff Date: Wed, 30 Aug 2023 00:04:07 -0700 Subject: [PATCH 14/16] add check that each excluded column excluded from at least one schema --- crates/cli/src/parse/query.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/crates/cli/src/parse/query.rs b/crates/cli/src/parse/query.rs index f5c55c12..6d1dba65 100644 --- a/crates/cli/src/parse/query.rs +++ b/crates/cli/src/parse/query.rs @@ -133,6 +133,31 @@ fn parse_schemas(args: &Args) -> Result, ParseError> { } }; + // make sure all excluded columns are excluded from at least one schema + if let (Ok(schemas), Some(exclude_columns)) = (&schemas, &args.exclude_columns) { + let mut unknown_columns = Vec::new(); + for column in exclude_columns.iter() { + let mut in_a_schema = false; + + for datatype in schemas.keys() { + if datatype.dataset().column_types().contains_key(&column.as_str()) { + in_a_schema = true; + break + } + } + + if !in_a_schema { + unknown_columns.push(column); + } + } + if !unknown_columns.is_empty() { + return Err(ParseError::ParseError(format!( + "datatypes do not support these columns: {:?}", + unknown_columns + ))) + } + }; + schemas } From 519e3d72bc6e6a0585e15633e8761ad3ffa8be7a Mon Sep 17 00:00:00 2001 From: sslivkoff Date: Wed, 30 Aug 2023 17:54:37 -0700 Subject: [PATCH 15/16] Alternative u256 representations (#53) * initial u256 conversions commit * basic u256 input parsing * display u256 conversions in print_schema() * add raw data conversions from u256 * update readme to reflect possible u256 conversions * add comment to produce diff on schemas.rs * fix table_schema tests * form table_schema tests --- README.md | 2 +- crates/cli/src/args.rs | 5 + crates/cli/src/parse/query.rs | 34 +++++- crates/cli/src/summaries.rs | 15 ++- crates/freeze/src/datasets/balance_diffs.rs | 4 +- crates/freeze/src/datasets/blocks.rs | 10 +- .../freeze/src/datasets/native_transfers.rs | 2 +- crates/freeze/src/datasets/traces.rs | 2 +- crates/freeze/src/datasets/transactions.rs | 4 +- .../freeze/src/types/dataframes/creation.rs | 73 +++++++++++++ crates/freeze/src/types/files.rs | 2 +- crates/freeze/src/types/mod.rs | 2 +- crates/freeze/src/types/schemas.rs | 102 +++++++++++++++--- crates/python/src/collect_adapter.rs | 3 + crates/python/src/freeze_adapter.rs | 3 + 15 files changed, 231 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 5c40bb45..5614a3c6 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,7 @@ An attempt is made to ensure that the dataset schemas conform to a common set of - By default, rows should contain enough information be order-able - Columns should be named by their JSON-RPC or ethers.rs defaults, except in cases where a much more explicit name is available - To make joins across tables easier, a given piece of information should use the same datatype and column name across tables when possible -- Large ints such as `u256` should allow multiple conversions. A `value` column of type `u256` should allow: `value_binary`, `value_string`, `value_f64`, `value_decimal128`, `value_u64_high`, and `value_u64_low` +- Large ints such as `u256` should allow multiple conversions. A `value` column of type `u256` should allow: `value_binary`, `value_string`, `value_f32`, `value_f64`, `value_u32`, `value_u64`, and `value_d128` - By default, columns related to non-identifying cryptographic signatures are omitted by default. For example, `state_root` of a block or `v`/`r`/`s` of a transaction - Integer values that can never be negative should be stored as unsigned integers diff --git a/crates/cli/src/args.rs b/crates/cli/src/args.rs index 90b7b19f..a35a13cc 100644 --- a/crates/cli/src/args.rs +++ b/crates/cli/src/args.rs @@ -53,6 +53,11 @@ pub struct Args { #[arg(long, value_name="COLS", num_args(0..), verbatim_doc_comment, help_heading="Content Options")] pub columns: Option>, + /// Set output datatype(s) of U256 integers + /// [default: binary, string, f64] + #[arg(long, num_args(1..), help_heading = "Content Options", verbatim_doc_comment)] + pub u256_types: Option>, + /// Use hex string encoding for binary columns #[arg(long, help_heading = "Content Options")] pub hex: bool, diff --git a/crates/cli/src/parse/query.rs b/crates/cli/src/parse/query.rs index 6d1dba65..0c2f6562 100644 --- a/crates/cli/src/parse/query.rs +++ b/crates/cli/src/parse/query.rs @@ -1,4 +1,7 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use ethers::prelude::*; use hex::FromHex; @@ -7,6 +10,7 @@ use cryo_freeze::{ColumnEncoding, Datatype, FileFormat, MultiQuery, ParseError, use super::{blocks, file_output, transactions}; use crate::args::Args; +use cryo_freeze::U256Type; pub(crate) async fn parse_query( args: &Args, @@ -81,6 +85,33 @@ fn parse_datatypes(raw_inputs: &Vec) -> Result, ParseError fn parse_schemas(args: &Args) -> Result, ParseError> { let datatypes = parse_datatypes(&args.datatype)?; let output_format = file_output::parse_output_format(args)?; + + let u256_types = if let Some(raw_u256_types) = &args.u256_types { + let mut u256_types: HashSet = HashSet::new(); + for raw in raw_u256_types.iter() { + let u256_type = match raw.to_lowercase() { + raw if raw == "binary" => U256Type::Binary, + raw if raw == "string" => U256Type::String, + raw if raw == "str" => U256Type::String, + raw if raw == "f32" => U256Type::F32, + raw if raw == "float32" => U256Type::F32, + raw if raw == "f64" => U256Type::F64, + raw if raw == "float64" => U256Type::F64, + raw if raw == "float" => U256Type::F64, + raw if raw == "u32" => U256Type::U32, + raw if raw == "uint32" => U256Type::U32, + raw if raw == "u64" => U256Type::U64, + raw if raw == "uint64" => U256Type::U64, + raw if raw == "decimal128" => U256Type::Decimal128, + raw if raw == "d128" => U256Type::Decimal128, + _ => return Err(ParseError::ParseError("bad u256 type".to_string())), + }; + u256_types.insert(u256_type); + } + u256_types + } else { + HashSet::from_iter(vec![U256Type::Binary, U256Type::String, U256Type::F64]) + }; let binary_column_format = match args.hex | (output_format != FileFormat::Parquet) { true => ColumnEncoding::Hex, false => ColumnEncoding::Binary, @@ -92,6 +123,7 @@ fn parse_schemas(args: &Args) -> Result, ParseError> { .map(|datatype| { datatype .table_schema( + &u256_types, &binary_column_format, &args.include_columns, &args.exclude_columns, diff --git a/crates/cli/src/summaries.rs b/crates/cli/src/summaries.rs index 20521fbb..c83dde80 100644 --- a/crates/cli/src/summaries.rs +++ b/crates/cli/src/summaries.rs @@ -6,8 +6,8 @@ use std::time::SystemTime; use thousands::Separable; use cryo_freeze::{ - BlockChunk, Chunk, ChunkData, Datatype, FileOutput, FreezeSummary, MultiQuery, Source, Table, - TransactionChunk, + BlockChunk, Chunk, ChunkData, ColumnType, Datatype, FileOutput, FreezeSummary, MultiQuery, + Source, Table, TransactionChunk, }; const TITLE_R: u8 = 0; @@ -123,7 +123,16 @@ fn print_schema(name: &Datatype, schema: &Table) { print_header("schema for ".to_string() + name.dataset().name()); for column in schema.columns() { if let Some(column_type) = schema.column_type(column) { - print_bullet(column, column_type.as_str()); + if column_type == ColumnType::UInt256 { + for uint256_type in schema.u256_types.iter() { + print_bullet( + column.to_owned() + uint256_type.suffix().as_str(), + uint256_type.to_columntype().as_str(), + ); + } + } else { + print_bullet(column, column_type.as_str()); + } } } println!(); diff --git a/crates/freeze/src/datasets/balance_diffs.rs b/crates/freeze/src/datasets/balance_diffs.rs index a7962045..7809f25b 100644 --- a/crates/freeze/src/datasets/balance_diffs.rs +++ b/crates/freeze/src/datasets/balance_diffs.rs @@ -24,8 +24,8 @@ impl Dataset for BalanceDiffs { ("transaction_index", ColumnType::Binary), ("transaction_hash", ColumnType::Binary), ("address", ColumnType::Binary), - ("from_value", ColumnType::Binary), - ("to_value", ColumnType::Binary), + ("from_value", ColumnType::UInt256), + ("to_value", ColumnType::UInt256), ("chain_id", ColumnType::UInt64), ]) } diff --git a/crates/freeze/src/datasets/blocks.rs b/crates/freeze/src/datasets/blocks.rs index 3856b53f..13c09436 100644 --- a/crates/freeze/src/datasets/blocks.rs +++ b/crates/freeze/src/datasets/blocks.rs @@ -9,9 +9,9 @@ use crate::{ types::{ conversions::{ToVecHex, ToVecU8}, BlockChunk, Blocks, CollectError, ColumnType, Dataset, Datatype, RowFilter, Source, Table, - TransactionChunk, + TransactionChunk, U256Type, }, - with_series, with_series_binary, + with_series, with_series_binary, with_series_u256, }; pub(crate) type BlockTxGasTuple = Result<(Block, Option>), CollectError>; @@ -321,7 +321,7 @@ pub(crate) struct TransactionColumns { nonce: Vec, from_address: Vec>, to_address: Vec>>, - value: Vec, + value: Vec, input: Vec>, gas_limit: Vec, gas_used: Vec, @@ -364,7 +364,7 @@ impl TransactionColumns { with_series!(cols, "nonce", self.nonce, schema); with_series_binary!(cols, "from_address", self.from_address, schema); with_series_binary!(cols, "to_address", self.to_address, schema); - with_series!(cols, "value", self.value, schema); + with_series_u256!(cols, "value", self.value, schema); with_series_binary!(cols, "input", self.input, schema); with_series!(cols, "gas_limit", self.gas_limit, schema); with_series!(cols, "gas_used", self.gas_used, schema); @@ -471,7 +471,7 @@ fn process_transaction( columns.nonce.push(tx.nonce.as_u64()); } if schema.has_column("value") { - columns.value.push(tx.value.to_string()); + columns.value.push(tx.value); } if schema.has_column("input") { columns.input.push(tx.input.to_vec()); diff --git a/crates/freeze/src/datasets/native_transfers.rs b/crates/freeze/src/datasets/native_transfers.rs index 5b9ed8c4..94115477 100644 --- a/crates/freeze/src/datasets/native_transfers.rs +++ b/crates/freeze/src/datasets/native_transfers.rs @@ -31,7 +31,7 @@ impl Dataset for NativeTransfers { ("transaction_hash", ColumnType::Binary), ("from_address", ColumnType::Binary), ("to_address", ColumnType::Binary), - ("value", ColumnType::Binary), + ("value", ColumnType::UInt256), ("chain_id", ColumnType::UInt64), ]) } diff --git a/crates/freeze/src/datasets/traces.rs b/crates/freeze/src/datasets/traces.rs index efa136ee..b7b33786 100644 --- a/crates/freeze/src/datasets/traces.rs +++ b/crates/freeze/src/datasets/traces.rs @@ -27,7 +27,7 @@ impl Dataset for Traces { HashMap::from_iter(vec![ ("action_from", ColumnType::Binary), ("action_to", ColumnType::Binary), - ("action_value", ColumnType::String), + ("action_value", ColumnType::UInt256), ("action_gas", ColumnType::UInt32), ("action_input", ColumnType::Binary), ("action_call_type", ColumnType::String), diff --git a/crates/freeze/src/datasets/transactions.rs b/crates/freeze/src/datasets/transactions.rs index c42bece8..67a788f7 100644 --- a/crates/freeze/src/datasets/transactions.rs +++ b/crates/freeze/src/datasets/transactions.rs @@ -28,9 +28,7 @@ impl Dataset for Transactions { ("nonce", ColumnType::Int32), ("from_address", ColumnType::Binary), ("to_address", ColumnType::Binary), - ("value", ColumnType::Decimal128), - ("value_str", ColumnType::String), - ("value_float", ColumnType::Float64), + ("value", ColumnType::UInt256), ("input", ColumnType::Binary), ("gas_limit", ColumnType::UInt32), ("gas_used", ColumnType::UInt32), diff --git a/crates/freeze/src/types/dataframes/creation.rs b/crates/freeze/src/types/dataframes/creation.rs index 0f099e6c..a239e7e8 100644 --- a/crates/freeze/src/types/dataframes/creation.rs +++ b/crates/freeze/src/types/dataframes/creation.rs @@ -21,3 +21,76 @@ macro_rules! with_series_binary { } }; } + +/// convert a Vec to variety of u256 Series representations +#[macro_export] +macro_rules! with_series_u256 { + ($all_series:expr, $name:expr, $value:expr, $schema:expr) => { + if $schema.has_column($name) { + // binary + if $schema.u256_types.contains(&U256Type::Binary) { + let name = $name.to_string() + U256Type::Binary.suffix().as_str(); + let name = name.as_str(); + + let converted: Vec> = $value.iter().map(|v| v.to_vec_u8()).collect(); + if let Some(ColumnType::Hex) = $schema.column_type($name) { + $all_series.push(Series::new(name, converted.to_vec_hex())); + } else { + $all_series.push(Series::new(name, converted)); + } + } + + // string + if $schema.u256_types.contains(&U256Type::String) { + let name = $name.to_string() + U256Type::String.suffix().as_str(); + let name = name.as_str(); + + let converted: Vec = $value.iter().map(|v| v.to_string()).collect(); + $all_series.push(Series::new(name, converted)); + } + + // float32 + if $schema.u256_types.contains(&U256Type::F32) { + let name = $name.to_string() + U256Type::F32.suffix().as_str(); + let name = name.as_str(); + + let converted: Vec> = + $value.iter().map(|v| v.to_string().parse::().ok()).collect(); + $all_series.push(Series::new(name, converted)); + } + + // float64 + if $schema.u256_types.contains(&U256Type::F64) { + let name = $name.to_string() + U256Type::F64.suffix().as_str(); + let name = name.as_str(); + + let converted: Vec> = + $value.iter().map(|v| v.to_string().parse::().ok()).collect(); + $all_series.push(Series::new(name, converted)); + } + + // u32 + if $schema.u256_types.contains(&U256Type::U32) { + let name = $name.to_string() + U256Type::U32.suffix().as_str(); + let name = name.as_str(); + + let converted: Vec = $value.iter().map(|v| v.as_u32()).collect(); + $all_series.push(Series::new(name, converted)); + } + + // u64 + if $schema.u256_types.contains(&U256Type::U64) { + let name = $name.to_string() + U256Type::U64.suffix().as_str(); + let name = name.as_str(); + + let converted: Vec = $value.iter().map(|v| v.as_u64()).collect(); + $all_series.push(Series::new(name, converted)); + } + + // decimal128 + if $schema.u256_types.contains(&U256Type::Decimal128) { + panic!("DECIMAL128 not implemented") + } + } + }; +} diff --git a/crates/freeze/src/types/files.rs b/crates/freeze/src/types/files.rs index bb6ec00e..17f09f89 100644 --- a/crates/freeze/src/types/files.rs +++ b/crates/freeze/src/types/files.rs @@ -44,7 +44,7 @@ impl FileFormat { } /// Encoding for binary data in a column -#[derive(Clone, Eq, PartialEq)] +#[derive(Clone, Eq, PartialEq, Debug)] pub enum ColumnEncoding { /// Raw binary encoding Binary, diff --git a/crates/freeze/src/types/mod.rs b/crates/freeze/src/types/mod.rs index 0d369f8e..a1dfcc82 100644 --- a/crates/freeze/src/types/mod.rs +++ b/crates/freeze/src/types/mod.rs @@ -29,7 +29,7 @@ pub use conversions::{ToVecHex, ToVecU8}; pub use datatypes::*; pub use files::{ColumnEncoding, FileFormat, FileOutput}; pub use queries::{MultiQuery, RowFilter, SingleQuery}; -pub use schemas::{ColumnType, Table}; +pub use schemas::{ColumnType, Table, U256Type}; pub use sources::{RateLimiter, Source}; pub(crate) use summaries::FreezeSummaryAgg; pub use summaries::{FreezeChunkSummary, FreezeSummary}; diff --git a/crates/freeze/src/types/schemas.rs b/crates/freeze/src/types/schemas.rs index f9f560d4..020dbe07 100644 --- a/crates/freeze/src/types/schemas.rs +++ b/crates/freeze/src/types/schemas.rs @@ -1,3 +1,6 @@ +/// types and functions related to schemas +use std::collections::HashSet; + use indexmap::{IndexMap, IndexSet}; use thiserror::Error; @@ -13,6 +16,12 @@ pub struct Table { /// sort order for rows pub sort_columns: Option>, + + /// representations to use for u256 columns + pub u256_types: HashSet, + + /// representation to use for binary columns + pub binary_type: ColumnEncoding, } impl Table { @@ -32,6 +41,53 @@ impl Table { } } +/// representation of a U256 datum +#[derive(Hash, Clone, Debug, Eq, PartialEq)] +pub enum U256Type { + /// Binary representation + Binary, + /// String representation + String, + /// F32 representation + F32, + /// F64 representation + F64, + /// U32 representation + U32, + /// U64 representation + U64, + /// Decimal128 representation + Decimal128, +} + +impl U256Type { + /// convert U256Type to Columntype + pub fn to_columntype(&self) -> ColumnType { + match self { + U256Type::Binary => ColumnType::Binary, + U256Type::String => ColumnType::String, + U256Type::F32 => ColumnType::Float32, + U256Type::F64 => ColumnType::Float64, + U256Type::U32 => ColumnType::UInt32, + U256Type::U64 => ColumnType::UInt64, + U256Type::Decimal128 => ColumnType::Decimal128, + } + } + + /// get column name suffix of U256Type + pub fn suffix(&self) -> String { + match self { + U256Type::Binary => "_binary".to_string(), + U256Type::String => "_string".to_string(), + U256Type::F32 => "_f32".to_string(), + U256Type::F64 => "_f64".to_string(), + U256Type::U32 => "_u32".to_string(), + U256Type::U64 => "_u64".to_string(), + U256Type::Decimal128 => "_d128".to_string(), + } + } +} + /// datatype of column #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum ColumnType { @@ -39,10 +95,14 @@ pub enum ColumnType { UInt32, /// UInt64 column type UInt64, + /// U256 column type + UInt256, /// Int32 column type Int32, /// Int64 column type Int64, + /// Float32 column type + Float32, /// Float64 column type Float64, /// Decimal128 column type @@ -61,8 +121,10 @@ impl ColumnType { match *self { ColumnType::UInt32 => "uint32", ColumnType::UInt64 => "uint64", + ColumnType::UInt256 => "uint256", ColumnType::Int32 => "int32", ColumnType::Int64 => "int64", + ColumnType::Float32 => "float32", ColumnType::Float64 => "float64", ColumnType::Decimal128 => "decimal128", ColumnType::String => "string", @@ -84,6 +146,7 @@ impl Datatype { /// get schema for a particular datatype pub fn table_schema( &self, + u256_types: &HashSet, binary_column_format: &ColumnEncoding, include_columns: &Option>, exclude_columns: &Option>, @@ -108,7 +171,13 @@ impl Datatype { } columns.insert((*column.clone()).to_string(), *ctype); } - let schema = Table { datatype: *self, sort_columns: sort, columns }; + let schema = Table { + datatype: *self, + sort_columns: sort, + columns, + u256_types: u256_types.clone(), + binary_type: binary_column_format.clone(), + }; Ok(schema) } } @@ -147,17 +216,23 @@ fn compute_used_columns( mod tests { use super::*; + fn get_u256_types() -> HashSet { + HashSet::from_iter(vec![U256Type::Binary, U256Type::String, U256Type::F64]) + } + #[test] fn test_table_schema_explicit_cols() { let cols = Some(vec!["number".to_string(), "hash".to_string()]); - let table = - Datatype::Blocks.table_schema(&ColumnEncoding::Hex, &None, &None, &cols, None).unwrap(); + let table = Datatype::Blocks + .table_schema(&get_u256_types(), &ColumnEncoding::Hex, &None, &None, &cols, None) + .unwrap(); assert_eq!(vec!["number", "hash"], table.columns()); // "all" marker support let cols = Some(vec!["all".to_string()]); - let table = - Datatype::Blocks.table_schema(&ColumnEncoding::Hex, &None, &None, &cols, None).unwrap(); + let table = Datatype::Blocks + .table_schema(&get_u256_types(), &ColumnEncoding::Hex, &None, &None, &cols, None) + .unwrap(); assert_eq!(15, table.columns().len()); assert!(table.columns().contains(&"hash")); assert!(table.columns().contains(&"transactions_root")); @@ -167,7 +242,7 @@ mod tests { fn test_table_schema_include_cols() { let inc_cols = Some(vec!["chain_id".to_string(), "receipts_root".to_string()]); let table = Datatype::Blocks - .table_schema(&ColumnEncoding::Hex, &inc_cols, &None, &None, None) + .table_schema(&get_u256_types(), &ColumnEncoding::Hex, &inc_cols, &None, &None, None) .unwrap(); assert_eq!(9, table.columns().len()); assert_eq!(["chain_id", "receipts_root"], table.columns()[7..9]); @@ -175,7 +250,7 @@ mod tests { // Non-existing include is skipped let inc_cols = Some(vec!["chain_id".to_string(), "foo_bar".to_string()]); let table = Datatype::Blocks - .table_schema(&ColumnEncoding::Hex, &inc_cols, &None, &None, None) + .table_schema(&get_u256_types(), &ColumnEncoding::Hex, &inc_cols, &None, &None, None) .unwrap(); assert_eq!(Some(&"chain_id"), table.columns().last()); assert!(!table.columns().contains(&"foo_bar")); @@ -183,7 +258,7 @@ mod tests { // "all" marker support let inc_cols = Some(vec!["all".to_string()]); let table = Datatype::Blocks - .table_schema(&ColumnEncoding::Hex, &inc_cols, &None, &None, None) + .table_schema(&get_u256_types(), &ColumnEncoding::Hex, &inc_cols, &None, &None, None) .unwrap(); assert_eq!(15, table.columns().len()); assert!(table.columns().contains(&"hash")); @@ -193,15 +268,16 @@ mod tests { #[test] fn test_table_schema_exclude_cols() { // defaults - let table = - Datatype::Blocks.table_schema(&ColumnEncoding::Hex, &None, &None, &None, None).unwrap(); + let table = Datatype::Blocks + .table_schema(&get_u256_types(), &ColumnEncoding::Hex, &None, &None, &None, None) + .unwrap(); assert_eq!(7, table.columns().len()); assert!(table.columns().contains(&"author")); assert!(table.columns().contains(&"extra_data")); let ex_cols = Some(vec!["author".to_string(), "extra_data".to_string()]); let table = Datatype::Blocks - .table_schema(&ColumnEncoding::Hex, &None, &ex_cols, &None, None) + .table_schema(&get_u256_types(), &ColumnEncoding::Hex, &None, &ex_cols, &None, None) .unwrap(); assert_eq!(5, table.columns().len()); assert!(!table.columns().contains(&"author")); @@ -210,7 +286,7 @@ mod tests { // Non-existing exclude is ignored let ex_cols = Some(vec!["timestamp".to_string(), "foo_bar".to_string()]); let table = Datatype::Blocks - .table_schema(&ColumnEncoding::Hex, &None, &ex_cols, &None, None) + .table_schema(&get_u256_types(), &ColumnEncoding::Hex, &None, &ex_cols, &None, None) .unwrap(); assert_eq!(6, table.columns().len()); assert!(!table.columns().contains(&"timestamp")); @@ -222,7 +298,7 @@ mod tests { let inc_cols = Some(vec!["chain_id".to_string(), "receipts_root".to_string()]); let ex_cols = Some(vec!["author".to_string(), "extra_data".to_string()]); let table = Datatype::Blocks - .table_schema(&ColumnEncoding::Hex, &inc_cols, &ex_cols, &None, None) + .table_schema(&get_u256_types(), &ColumnEncoding::Hex, &inc_cols, &ex_cols, &None, None) .unwrap(); assert!(!table.columns().contains(&"author")); assert!(!table.columns().contains(&"extra_data")); diff --git a/crates/python/src/collect_adapter.rs b/crates/python/src/collect_adapter.rs index a09176dd..75f9e204 100644 --- a/crates/python/src/collect_adapter.rs +++ b/crates/python/src/collect_adapter.rs @@ -16,6 +16,7 @@ use cryo_freeze::collect; include_columns = None, exclude_columns = None, columns = None, + u256_types = None, hex = false, sort = None, rpc = None, @@ -57,6 +58,7 @@ pub fn _collect( include_columns: Option>, exclude_columns: Option>, columns: Option>, + u256_types: Option>, hex: bool, sort: Option>, rpc: Option, @@ -95,6 +97,7 @@ pub fn _collect( include_columns, exclude_columns, columns, + u256_types, hex, sort, rpc, diff --git a/crates/python/src/freeze_adapter.rs b/crates/python/src/freeze_adapter.rs index 85a9c161..cdf2a75a 100644 --- a/crates/python/src/freeze_adapter.rs +++ b/crates/python/src/freeze_adapter.rs @@ -17,6 +17,7 @@ use cryo_cli::{run, Args}; include_columns = None, exclude_columns = None, columns = None, + u256_types = None, hex = false, sort = None, rpc = None, @@ -58,6 +59,7 @@ pub fn _freeze( include_columns: Option>, exclude_columns: Option>, columns: Option>, + u256_types: Option>, hex: bool, sort: Option>, rpc: Option, @@ -96,6 +98,7 @@ pub fn _freeze( include_columns, exclude_columns, columns, + u256_types, hex, sort, rpc, From eedcbeb60aeaaddfbeb807121a4a7b2d401c7b74 Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Thu, 31 Aug 2023 19:02:10 +0200 Subject: [PATCH 16/16] refactor: cleanup collecting and managing column data (#54) * Create LogsColumns and refactor transaction columns to transaction's file. * Move counting n_rows to column objects. * Formatting. --- crates/freeze/src/datasets/blocks.rs | 296 +++++---------------- crates/freeze/src/datasets/logs.rs | 185 +++++++------ crates/freeze/src/datasets/transactions.rs | 130 ++++++++- 3 files changed, 288 insertions(+), 323 deletions(-) diff --git a/crates/freeze/src/datasets/blocks.rs b/crates/freeze/src/datasets/blocks.rs index 13c09436..22853482 100644 --- a/crates/freeze/src/datasets/blocks.rs +++ b/crates/freeze/src/datasets/blocks.rs @@ -9,11 +9,13 @@ use crate::{ types::{ conversions::{ToVecHex, ToVecU8}, BlockChunk, Blocks, CollectError, ColumnType, Dataset, Datatype, RowFilter, Source, Table, - TransactionChunk, U256Type, + TransactionChunk, }, - with_series, with_series_binary, with_series_u256, + with_series, with_series_binary, }; +use super::transactions::TransactionColumns; + pub(crate) type BlockTxGasTuple = Result<(Block, Option>), CollectError>; #[async_trait::async_trait] @@ -182,7 +184,7 @@ impl ProcessTransactions for TxHash { impl ProcessTransactions for Transaction { fn process(&self, schema: &Table, columns: &mut TransactionColumns, gas_used: Option) { - process_transaction(self, schema, columns, gas_used) + columns.process_transaction(self, schema, gas_used) } } @@ -193,35 +195,25 @@ pub(crate) async fn blocks_to_dfs( chain_id: u64, ) -> Result<(Option, Option), CollectError> { // initialize - let mut block_columns = - if blocks_schema.is_none() { BlockColumns::new(0) } else { BlockColumns::new(100) }; - let mut transaction_columns = if transactions_schema.is_none() { - TransactionColumns::new(0) - } else { - TransactionColumns::new(100) - }; + let mut block_columns = BlockColumns::default(); + let mut transaction_columns = TransactionColumns::default(); // parse stream of blocks - let mut n_blocks = 0; - let mut n_txs = 0; while let Some(message) = blocks.recv().await { match message { Ok((block, gas_used)) => { - n_blocks += 1; if let Some(schema) = blocks_schema { - process_block(&block, schema, &mut block_columns) + block_columns.process_block(&block, schema) } if let Some(schema) = transactions_schema { match gas_used { Some(gas_used) => { for (tx, gas_used) in block.transactions.iter().zip(gas_used) { - n_txs += 1; tx.process(schema, &mut transaction_columns, Some(gas_used)) } } None => { for tx in block.transactions.iter() { - n_txs += 1; tx.process(schema, &mut transaction_columns, None) } } @@ -237,17 +229,19 @@ pub(crate) async fn blocks_to_dfs( // convert to dataframes let blocks_df = match blocks_schema { - Some(schema) => Some(block_columns.create_df(schema, chain_id, n_blocks)?), + Some(schema) => Some(block_columns.create_df(schema, chain_id)?), None => None, }; let transactions_df = match transactions_schema { - Some(schema) => Some(transaction_columns.create_df(schema, chain_id, n_txs)?), + Some(schema) => Some(transaction_columns.create_df(schema, chain_id)?), None => None, }; Ok((blocks_df, transactions_df)) } +#[derive(Default)] struct BlockColumns { + n_rows: usize, hash: Vec>, parent_hash: Vec>, author: Vec>, @@ -265,32 +259,63 @@ struct BlockColumns { } impl BlockColumns { - fn new(n: usize) -> Self { - Self { - hash: Vec::with_capacity(n), - parent_hash: Vec::with_capacity(n), - author: Vec::with_capacity(n), - state_root: Vec::with_capacity(n), - transactions_root: Vec::with_capacity(n), - receipts_root: Vec::with_capacity(n), - number: Vec::with_capacity(n), - gas_used: Vec::with_capacity(n), - extra_data: Vec::with_capacity(n), - logs_bloom: Vec::with_capacity(n), - timestamp: Vec::with_capacity(n), - total_difficulty: Vec::with_capacity(n), - size: Vec::with_capacity(n), - base_fee_per_gas: Vec::with_capacity(n), + fn process_block(&mut self, block: &Block, schema: &Table) { + self.n_rows += 1; + if schema.has_column("hash") { + match block.hash { + Some(h) => self.hash.push(h.as_bytes().to_vec()), + _ => panic!("invalid block"), + } + } + if schema.has_column("parent_hash") { + self.parent_hash.push(block.parent_hash.as_bytes().to_vec()); + } + if schema.has_column("author") { + match block.author { + Some(a) => self.author.push(a.as_bytes().to_vec()), + _ => panic!("invalid block"), + } + } + if schema.has_column("state_root") { + self.state_root.push(block.state_root.as_bytes().to_vec()); + } + if schema.has_column("transactions_root") { + self.transactions_root.push(block.transactions_root.as_bytes().to_vec()); + } + if schema.has_column("receipts_root") { + self.receipts_root.push(block.receipts_root.as_bytes().to_vec()); + } + if schema.has_column("number") { + match block.number { + Some(n) => self.number.push(n.as_u32()), + _ => panic!("invalid block"), + } + } + if schema.has_column("gas_used") { + self.gas_used.push(block.gas_used.as_u32()); + } + if schema.has_column("extra_data") { + self.extra_data.push(block.extra_data.to_vec()); + } + if schema.has_column("logs_bloom") { + self.logs_bloom.push(block.logs_bloom.map(|x| x.0.to_vec())); + } + if schema.has_column("timestamp") { + self.timestamp.push(block.timestamp.as_u32()); + } + if schema.has_column("total_difficulty") { + self.total_difficulty.push(block.total_difficulty.map(|x| x.to_vec_u8())); + } + if schema.has_column("size") { + self.size.push(block.size.map(|x| x.as_u32())); + } + if schema.has_column("base_fee_per_gas") { + self.base_fee_per_gas.push(block.base_fee_per_gas.map(|value| value.as_u64())); } } - fn create_df( - self, - schema: &Table, - chain_id: u64, - n_rows: u64, - ) -> Result { - let mut cols = Vec::new(); + fn create_df(self, schema: &Table, chain_id: u64) -> Result { + let mut cols = Vec::with_capacity(schema.columns().len()); with_series_binary!(cols, "hash", self.hash, schema); with_series_binary!(cols, "parent_hash", self.parent_hash, schema); with_series_binary!(cols, "author", self.author, schema); @@ -305,195 +330,8 @@ impl BlockColumns { with_series_binary!(cols, "total_difficulty", self.total_difficulty, schema); with_series!(cols, "size", self.size, schema); with_series!(cols, "base_fee_per_gas", self.base_fee_per_gas, schema); - - if schema.has_column("chain_id") { - cols.push(Series::new("chain_id", vec![chain_id; n_rows as usize])); - } + with_series!(cols, "chain_id", vec![chain_id; self.n_rows], schema); DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema) } } - -pub(crate) struct TransactionColumns { - block_number: Vec>, - transaction_index: Vec>, - transaction_hash: Vec>, - nonce: Vec, - from_address: Vec>, - to_address: Vec>>, - value: Vec, - input: Vec>, - gas_limit: Vec, - gas_used: Vec, - gas_price: Vec>, - transaction_type: Vec>, - max_priority_fee_per_gas: Vec>, - max_fee_per_gas: Vec>, -} - -impl TransactionColumns { - pub(crate) fn new(n: usize) -> Self { - Self { - block_number: Vec::with_capacity(n), - transaction_index: Vec::with_capacity(n), - transaction_hash: Vec::with_capacity(n), - nonce: Vec::with_capacity(n), - from_address: Vec::with_capacity(n), - to_address: Vec::with_capacity(n), - value: Vec::with_capacity(n), - input: Vec::with_capacity(n), - gas_limit: Vec::with_capacity(n), - gas_used: Vec::with_capacity(n), - gas_price: Vec::with_capacity(n), - transaction_type: Vec::with_capacity(n), - max_priority_fee_per_gas: Vec::with_capacity(n), - max_fee_per_gas: Vec::with_capacity(n), - } - } - - pub(crate) fn create_df( - self, - schema: &Table, - chain_id: u64, - n_rows: usize, - ) -> Result { - let mut cols = Vec::new(); - with_series!(cols, "block_number", self.block_number, schema); - with_series!(cols, "transaction_index", self.transaction_index, schema); - with_series_binary!(cols, "transaction_hash", self.transaction_hash, schema); - with_series!(cols, "nonce", self.nonce, schema); - with_series_binary!(cols, "from_address", self.from_address, schema); - with_series_binary!(cols, "to_address", self.to_address, schema); - with_series_u256!(cols, "value", self.value, schema); - with_series_binary!(cols, "input", self.input, schema); - with_series!(cols, "gas_limit", self.gas_limit, schema); - with_series!(cols, "gas_used", self.gas_used, schema); - with_series!(cols, "gas_price", self.gas_price, schema); - with_series!(cols, "transaction_type", self.transaction_type, schema); - with_series!(cols, "max_priority_fee_per_gas", self.max_priority_fee_per_gas, schema); - with_series!(cols, "max_fee_per_gas", self.max_fee_per_gas, schema); - - if schema.has_column("chain_id") { - cols.push(Series::new("chain_id", vec![chain_id; n_rows])); - } - - DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema) - } -} - -fn process_block(block: &Block, schema: &Table, columns: &mut BlockColumns) { - if schema.has_column("hash") { - match block.hash { - Some(h) => columns.hash.push(h.as_bytes().to_vec()), - _ => panic!("invalid block"), - } - } - if schema.has_column("parent_hash") { - columns.parent_hash.push(block.parent_hash.as_bytes().to_vec()); - } - if schema.has_column("author") { - match block.author { - Some(a) => columns.author.push(a.as_bytes().to_vec()), - _ => panic!("invalid block"), - } - } - if schema.has_column("state_root") { - columns.state_root.push(block.state_root.as_bytes().to_vec()); - } - if schema.has_column("transactions_root") { - columns.transactions_root.push(block.transactions_root.as_bytes().to_vec()); - } - if schema.has_column("receipts_root") { - columns.receipts_root.push(block.receipts_root.as_bytes().to_vec()); - } - if schema.has_column("number") { - match block.number { - Some(n) => columns.number.push(n.as_u32()), - _ => panic!("invalid block"), - } - } - if schema.has_column("gas_used") { - columns.gas_used.push(block.gas_used.as_u32()); - } - if schema.has_column("extra_data") { - columns.extra_data.push(block.extra_data.to_vec()); - } - if schema.has_column("logs_bloom") { - columns.logs_bloom.push(block.logs_bloom.map(|x| x.0.to_vec())); - } - if schema.has_column("timestamp") { - columns.timestamp.push(block.timestamp.as_u32()); - } - if schema.has_column("total_difficulty") { - columns.total_difficulty.push(block.total_difficulty.map(|x| x.to_vec_u8())); - } - if schema.has_column("size") { - columns.size.push(block.size.map(|x| x.as_u32())); - } - if schema.has_column("base_fee_per_gas") { - columns.base_fee_per_gas.push(block.base_fee_per_gas.map(|value| value.as_u64())); - } -} - -fn process_transaction( - tx: &Transaction, - schema: &Table, - columns: &mut TransactionColumns, - gas_used: Option, -) { - if schema.has_column("block_number") { - match tx.block_number { - Some(block_number) => columns.block_number.push(Some(block_number.as_u64())), - None => columns.block_number.push(None), - } - } - if schema.has_column("transaction_index") { - match tx.transaction_index { - Some(transaction_index) => { - columns.transaction_index.push(Some(transaction_index.as_u64())) - } - None => columns.transaction_index.push(None), - } - } - if schema.has_column("transaction_hash") { - columns.transaction_hash.push(tx.hash.as_bytes().to_vec()); - } - if schema.has_column("from_address") { - columns.from_address.push(tx.from.as_bytes().to_vec()); - } - if schema.has_column("to_address") { - match tx.to { - Some(to_address) => columns.to_address.push(Some(to_address.as_bytes().to_vec())), - None => columns.to_address.push(None), - } - } - if schema.has_column("nonce") { - columns.nonce.push(tx.nonce.as_u64()); - } - if schema.has_column("value") { - columns.value.push(tx.value); - } - if schema.has_column("input") { - columns.input.push(tx.input.to_vec()); - } - if schema.has_column("gas_limit") { - columns.gas_limit.push(tx.gas.as_u32()); - } - if schema.has_column("gas_used") { - columns.gas_used.push(gas_used.unwrap()) - } - if schema.has_column("gas_price") { - columns.gas_price.push(tx.gas_price.map(|gas_price| gas_price.as_u64())); - } - if schema.has_column("transaction_type") { - columns.transaction_type.push(tx.transaction_type.map(|value| value.as_u32())); - } - if schema.has_column("max_priority_fee_per_gas") { - columns - .max_priority_fee_per_gas - .push(tx.max_priority_fee_per_gas.map(|value| value.as_u64())); - } - if schema.has_column("max_fee_per_gas") { - columns.max_fee_per_gas.push(tx.max_fee_per_gas.map(|value| value.as_u64())); - } -} diff --git a/crates/freeze/src/datasets/logs.rs b/crates/freeze/src/datasets/logs.rs index 7c135e28..06f884f3 100644 --- a/crates/freeze/src/datasets/logs.rs +++ b/crates/freeze/src/datasets/logs.rs @@ -190,99 +190,116 @@ async fn fetch_transaction_logs( } } -async fn logs_to_df( - mut logs: mpsc::Receiver, CollectError>>, - schema: &Table, - chain_id: u64, -) -> Result { - let mut block_number: Vec = Vec::new(); - let mut transaction_index: Vec = Vec::new(); - let mut log_index: Vec = Vec::new(); - let mut transaction_hash: Vec> = Vec::new(); - let mut address: Vec> = Vec::new(); - let mut topic0: Vec>> = Vec::new(); - let mut topic1: Vec>> = Vec::new(); - let mut topic2: Vec>> = Vec::new(); - let mut topic3: Vec>> = Vec::new(); - let mut data: Vec> = Vec::new(); +#[derive(Default)] +pub(crate) struct LogColumns { + n_rows: usize, + block_number: Vec, + transaction_index: Vec, + log_index: Vec, + transaction_hash: Vec>, + address: Vec>, + topic0: Vec>>, + topic1: Vec>>, + topic2: Vec>>, + topic3: Vec>>, + data: Vec>, +} - let mut n_rows = 0; - // while let Some(Ok(logs)) = logs.recv().await { - while let Some(message) = logs.recv().await { - match message { - Ok(logs) => { - for log in logs.iter() { - if let Some(true) = log.removed { - continue +impl LogColumns { + pub(crate) fn process_logs( + &mut self, + logs: Vec, + schema: &Table, + ) -> Result<(), CollectError> { + for log in logs { + if let Some(true) = log.removed { + continue + } + if let (Some(bn), Some(tx), Some(ti), Some(li)) = + (log.block_number, log.transaction_hash, log.transaction_index, log.log_index) + { + self.n_rows += 1; + self.address.push(log.address.as_bytes().to_vec()); + match log.topics.len() { + 0 => { + self.topic0.push(None); + self.topic1.push(None); + self.topic2.push(None); + self.topic3.push(None); } - if let (Some(bn), Some(tx), Some(ti), Some(li)) = ( - log.block_number, - log.transaction_hash, - log.transaction_index, - log.log_index, - ) { - n_rows += 1; - address.push(log.address.as_bytes().to_vec()); - match log.topics.len() { - 0 => { - topic0.push(None); - topic1.push(None); - topic2.push(None); - topic3.push(None); - } - 1 => { - topic0.push(Some(log.topics[0].as_bytes().to_vec())); - topic1.push(None); - topic2.push(None); - topic3.push(None); - } - 2 => { - topic0.push(Some(log.topics[0].as_bytes().to_vec())); - topic1.push(Some(log.topics[1].as_bytes().to_vec())); - topic2.push(None); - topic3.push(None); - } - 3 => { - topic0.push(Some(log.topics[0].as_bytes().to_vec())); - topic1.push(Some(log.topics[1].as_bytes().to_vec())); - topic2.push(Some(log.topics[2].as_bytes().to_vec())); - topic3.push(None); - } - 4 => { - topic0.push(Some(log.topics[0].as_bytes().to_vec())); - topic1.push(Some(log.topics[1].as_bytes().to_vec())); - topic2.push(Some(log.topics[2].as_bytes().to_vec())); - topic3.push(Some(log.topics[3].as_bytes().to_vec())); - } - _ => return Err(CollectError::InvalidNumberOfTopics), - } - data.push(log.data.clone().to_vec()); - block_number.push(bn.as_u32()); - transaction_hash.push(tx.as_bytes().to_vec()); - transaction_index.push(ti.as_u32()); - log_index.push(li.as_u32()); + 1 => { + self.topic0.push(Some(log.topics[0].as_bytes().to_vec())); + self.topic1.push(None); + self.topic2.push(None); + self.topic3.push(None); + } + 2 => { + self.topic0.push(Some(log.topics[0].as_bytes().to_vec())); + self.topic1.push(Some(log.topics[1].as_bytes().to_vec())); + self.topic2.push(None); + self.topic3.push(None); + } + 3 => { + self.topic0.push(Some(log.topics[0].as_bytes().to_vec())); + self.topic1.push(Some(log.topics[1].as_bytes().to_vec())); + self.topic2.push(Some(log.topics[2].as_bytes().to_vec())); + self.topic3.push(None); + } + 4 => { + self.topic0.push(Some(log.topics[0].as_bytes().to_vec())); + self.topic1.push(Some(log.topics[1].as_bytes().to_vec())); + self.topic2.push(Some(log.topics[2].as_bytes().to_vec())); + self.topic3.push(Some(log.topics[3].as_bytes().to_vec())); } + _ => return Err(CollectError::InvalidNumberOfTopics), } + if schema.has_column("data") { + self.data.push(log.data.to_vec()); + } + self.block_number.push(bn.as_u32()); + self.transaction_hash.push(tx.as_bytes().to_vec()); + self.transaction_index.push(ti.as_u32()); + self.log_index.push(li.as_u32()); } - _ => return Err(CollectError::TooManyRequestsError), } + Ok(()) } - let mut cols = Vec::new(); - with_series!(cols, "block_number", block_number, schema); - with_series!(cols, "transaction_index", transaction_index, schema); - with_series!(cols, "log_index", log_index, schema); - with_series_binary!(cols, "transaction_hash", transaction_hash, schema); - with_series_binary!(cols, "contract_address", address, schema); - with_series_binary!(cols, "topic0", topic0, schema); - with_series_binary!(cols, "topic1", topic1, schema); - with_series_binary!(cols, "topic2", topic2, schema); - with_series_binary!(cols, "topic3", topic3, schema); - with_series_binary!(cols, "data", data, schema); + pub(crate) fn create_df( + self, + schema: &Table, + chain_id: u64, + ) -> Result { + let mut cols = Vec::with_capacity(schema.columns().len()); + with_series!(cols, "block_number", self.block_number, schema); + with_series!(cols, "transaction_index", self.transaction_index, schema); + with_series!(cols, "log_index", self.log_index, schema); + with_series_binary!(cols, "transaction_hash", self.transaction_hash, schema); + with_series_binary!(cols, "contract_address", self.address, schema); + with_series_binary!(cols, "topic0", self.topic0, schema); + with_series_binary!(cols, "topic1", self.topic1, schema); + with_series_binary!(cols, "topic2", self.topic2, schema); + with_series_binary!(cols, "topic3", self.topic3, schema); + with_series_binary!(cols, "data", self.data, schema); + with_series!(cols, "chain_id", vec![chain_id; self.n_rows], schema); - if schema.has_column("chain_id") { - cols.push(Series::new("chain_id", vec![chain_id; n_rows])); + DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema) } +} - DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema) +async fn logs_to_df( + mut logs: mpsc::Receiver, CollectError>>, + schema: &Table, + chain_id: u64, +) -> Result { + let mut columns = LogColumns::default(); + + while let Some(message) = logs.recv().await { + if let Ok(logs) = message { + columns.process_logs(logs, schema)? + } else { + return Err(CollectError::TooManyRequestsError) + } + } + columns.create_df(schema, chain_id) } diff --git a/crates/freeze/src/datasets/transactions.rs b/crates/freeze/src/datasets/transactions.rs index 67a788f7..1dacf7f4 100644 --- a/crates/freeze/src/datasets/transactions.rs +++ b/crates/freeze/src/datasets/transactions.rs @@ -5,9 +5,14 @@ use polars::prelude::*; use tokio::{sync::mpsc, task}; use super::{blocks, blocks::ProcessTransactions, blocks_and_transactions}; -use crate::types::{ - BlockChunk, CollectError, ColumnType, Dataset, Datatype, RowFilter, Source, Table, - TransactionChunk, Transactions, +use crate::{ + dataframes::SortableDataFrame, + types::{ + conversions::{ToVecHex, ToVecU8}, + BlockChunk, CollectError, ColumnType, Dataset, Datatype, RowFilter, Source, Table, + TransactionChunk, Transactions, U256Type, + }, + with_series, with_series_binary, with_series_u256, }; #[async_trait::async_trait] @@ -167,21 +172,126 @@ async fn fetch_transactions( } } +#[derive(Default)] +pub(crate) struct TransactionColumns { + n_rows: usize, + block_number: Vec>, + transaction_index: Vec>, + transaction_hash: Vec>, + nonce: Vec, + from_address: Vec>, + to_address: Vec>>, + value: Vec, + input: Vec>, + gas_limit: Vec, + gas_used: Vec, + gas_price: Vec>, + transaction_type: Vec>, + max_priority_fee_per_gas: Vec>, + max_fee_per_gas: Vec>, +} + +impl TransactionColumns { + pub(crate) fn create_df( + self, + schema: &Table, + chain_id: u64, + ) -> Result { + let mut cols = Vec::with_capacity(schema.columns().len()); + with_series!(cols, "block_number", self.block_number, schema); + with_series!(cols, "transaction_index", self.transaction_index, schema); + with_series_binary!(cols, "transaction_hash", self.transaction_hash, schema); + with_series!(cols, "nonce", self.nonce, schema); + with_series_binary!(cols, "from_address", self.from_address, schema); + with_series_binary!(cols, "to_address", self.to_address, schema); + with_series_u256!(cols, "value", self.value, schema); + with_series_binary!(cols, "input", self.input, schema); + with_series!(cols, "gas_limit", self.gas_limit, schema); + with_series!(cols, "gas_used", self.gas_used, schema); + with_series!(cols, "gas_price", self.gas_price, schema); + with_series!(cols, "transaction_type", self.transaction_type, schema); + with_series!(cols, "max_priority_fee_per_gas", self.max_priority_fee_per_gas, schema); + with_series!(cols, "max_fee_per_gas", self.max_fee_per_gas, schema); + with_series!(cols, "chain_id", vec![chain_id; self.n_rows], schema); + + DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema) + } + + pub(crate) fn process_transaction( + &mut self, + tx: &Transaction, + schema: &Table, + gas_used: Option, + ) { + self.n_rows += 1; + if schema.has_column("block_number") { + match tx.block_number { + Some(block_number) => self.block_number.push(Some(block_number.as_u64())), + None => self.block_number.push(None), + } + } + if schema.has_column("transaction_index") { + match tx.transaction_index { + Some(transaction_index) => { + self.transaction_index.push(Some(transaction_index.as_u64())) + } + None => self.transaction_index.push(None), + } + } + if schema.has_column("transaction_hash") { + self.transaction_hash.push(tx.hash.as_bytes().to_vec()); + } + if schema.has_column("from_address") { + self.from_address.push(tx.from.as_bytes().to_vec()); + } + if schema.has_column("to_address") { + match tx.to { + Some(to_address) => self.to_address.push(Some(to_address.as_bytes().to_vec())), + None => self.to_address.push(None), + } + } + if schema.has_column("nonce") { + self.nonce.push(tx.nonce.as_u64()); + } + if schema.has_column("value") { + self.value.push(tx.value); + } + if schema.has_column("input") { + self.input.push(tx.input.to_vec()); + } + if schema.has_column("gas_limit") { + self.gas_limit.push(tx.gas.as_u32()); + } + if schema.has_column("gas_used") { + self.gas_used.push(gas_used.unwrap()) + } + if schema.has_column("gas_price") { + self.gas_price.push(tx.gas_price.map(|gas_price| gas_price.as_u64())); + } + if schema.has_column("transaction_type") { + self.transaction_type.push(tx.transaction_type.map(|value| value.as_u32())); + } + if schema.has_column("max_priority_fee_per_gas") { + self.max_priority_fee_per_gas + .push(tx.max_priority_fee_per_gas.map(|value| value.as_u64())); + } + if schema.has_column("max_fee_per_gas") { + self.max_fee_per_gas.push(tx.max_fee_per_gas.map(|value| value.as_u64())); + } + } +} + async fn transactions_to_df( mut transactions: mpsc::Receiver), CollectError>>, schema: &Table, chain_id: u64, ) -> Result { - let mut columns = blocks::TransactionColumns::new(100); - let mut n_txs = 0; + let mut columns = TransactionColumns::default(); while let Some(message) = transactions.recv().await { match message { - Ok((transaction, gas_used)) => { - n_txs += 1; - transaction.process(schema, &mut columns, gas_used) - } + Ok((transaction, gas_used)) => transaction.process(schema, &mut columns, gas_used), Err(e) => return Err(e), } } - columns.create_df(schema, chain_id, n_txs) + columns.create_df(schema, chain_id) }