From ebfc97b3585396c78a6ac40200b858442cf3e563 Mon Sep 17 00:00:00 2001 From: sslivkoff Date: Wed, 22 Nov 2023 12:16:57 -0600 Subject: [PATCH] Skip noncompliant logs (#136) * fix log datasets aliases and null calls * skip noncompliant logs when decoding --- crates/freeze/src/datasets/logs.rs | 36 +++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/crates/freeze/src/datasets/logs.rs b/crates/freeze/src/datasets/logs.rs index f3e7ac1a..322f3c7e 100644 --- a/crates/freeze/src/datasets/logs.rs +++ b/crates/freeze/src/datasets/logs.rs @@ -70,10 +70,38 @@ impl CollectByTransaction for Logs { /// process block into columns fn process_logs(logs: Vec, columns: &mut Logs, schema: &Table) -> R<()> { + let decode_keys = match &schema.log_decoder { + None => None, + Some(decoder) => { + let keys = decoder + .event + .inputs + .clone() + .into_iter() + .map(|i| i.name) + .collect::>(); + Some(keys) + } + }; + for log in logs.iter() { if let (Some(bn), Some(tx), Some(ti), Some(li)) = (log.block_number, log.transaction_hash, log.transaction_index, log.log_index) { + // decode event + if let (Some(decoder), Some(decode_keys)) = (&schema.log_decoder, &decode_keys) { + match decoder.event.parse_log(log.clone().into()) { + Ok(log) => { + for param in log.params { + if decode_keys.contains(param.name.as_str()) { + columns.event_cols.entry(param.name).or_default().push(param.value); + } + } + } + Err(_) => continue, + } + }; + columns.n_rows += 1; store!(schema, columns, block_number, bn.as_u32()); store!(schema, columns, transaction_index, ti.as_u32()); @@ -100,13 +128,5 @@ fn process_logs(logs: Vec, columns: &mut Logs, schema: &Table) -> R<()> { } } - // add decoded event logs - let decoder = schema.log_decoder.clone(); - if let Some(decoder) = decoder { - decoder.parse_log_from_event(logs).into_iter().for_each(|(k, v)| { - columns.event_cols.entry(k).or_default().extend(v); - }); - } - Ok(()) }