From 37b8cc238913b4ba15c6462236383cf79d620472 Mon Sep 17 00:00:00 2001 From: njgheorghita Date: Fri, 19 Jan 2024 12:10:37 -0500 Subject: [PATCH] feat: add test_providers script to test ability of various providers to build valid history content --- Cargo.lock | 2 + Cargo.toml | 2 + ethportal-api/src/types/execution/receipts.rs | 9 +- portal-bridge/src/api/execution.rs | 228 ++++++--- portal-bridge/src/bridge/history.rs | 147 ++---- portal-bridge/src/lib.rs | 4 +- portal-bridge/src/types/full_header.rs | 36 ++ src/bin/sample_range.rs | 48 +- src/bin/test_providers.rs | 444 ++++++++++++++++++ trin-validation/src/constants.rs | 12 +- 10 files changed, 744 insertions(+), 188 deletions(-) create mode 100644 src/bin/test_providers.rs diff --git a/Cargo.lock b/Cargo.lock index 89b71c423..94f907eee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7672,6 +7672,7 @@ dependencies = [ "jsonrpsee", "lazy_static", "parking_lot 0.11.2", + "portal-bridge", "portalnet", "prometheus_exporter", "rand 0.8.5", @@ -7682,6 +7683,7 @@ dependencies = [ "serde_yaml", "serial_test", "sha3 0.9.1", + "surf", "tempfile", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 3d5ae3eaa..a903e3211 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ jsonrpsee = "0.20.0" lazy_static = "1.4.0" parking_lot = "0.11.2" portalnet = { path = "portalnet" } +portal-bridge = { path = "portal-bridge" } prometheus_exporter = "0.8.4" rand = "0.8.4" reth-ipc = { tag = "v0.1.0-alpha.10", git = "https://github.com/paradigmxyz/reth.git"} @@ -33,6 +34,7 @@ rlp = "0.5.0" rpc = { path = "rpc"} serde_json = {version = "1.0.89", features = ["preserve_order"]} sha3 = "0.9.1" +surf = "2.3.2" tempfile = "3.3.0" tokio = { version = "1.14.0", features = ["full"] } tracing = "0.1.36" diff --git a/ethportal-api/src/types/execution/receipts.rs b/ethportal-api/src/types/execution/receipts.rs index 98ff7266e..0e434b3e1 100644 --- a/ethportal-api/src/types/execution/receipts.rs +++ b/ethportal-api/src/types/execution/receipts.rs @@ -348,11 +348,13 @@ impl TryFrom for TransactionId { type Error = DecoderError; fn try_from(val: Value) -> Result { - let id = val.as_str().ok_or(DecoderError::Custom("Invalid tx id."))?; + let id = val.as_str().ok_or(DecoderError::Custom( + "Invalid tx id: unable to decode as string.", + ))?; let id = id.trim_start_matches("0x"); let id = id .parse::() - .map_err(|_| DecoderError::Custom("Invalid tx id."))?; + .map_err(|_| DecoderError::Custom("Invalid tx id: unable to parse u8"))?; Self::try_from(id) } } @@ -431,6 +433,9 @@ impl<'de> Deserialize<'de> for Receipt { D: Deserializer<'de>, { let obj: Value = Deserialize::deserialize(deserializer)?; + if obj.is_null() { + return Err(anyhow!("Null receipt found.")).map_err(serde::de::Error::custom); + } let tx_id = TransactionId::try_from(obj["type"].clone()).map_err(serde::de::Error::custom)?; match tx_id { diff --git a/portal-bridge/src/api/execution.rs b/portal-bridge/src/api/execution.rs index 00702174f..245966f93 100644 --- a/portal-bridge/src/api/execution.rs +++ b/portal-bridge/src/api/execution.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use anyhow::{anyhow, bail}; use ethereum_types::H256; use futures::future::join_all; @@ -7,38 +9,43 @@ use surf::{ Body, Client, Config, Request, Response, }; use tokio::time::{sleep, Duration}; -use tracing::{info, warn}; +use tracing::warn; use url::Url; use crate::{ cli::Provider, constants::{BASE_EL_ARCHIVE_ENDPOINT, BASE_EL_ENDPOINT}, - types::{ - full_header::{FullHeader, FullHeaderBatch}, - mode::BridgeMode, - }, + types::{full_header::FullHeader, mode::BridgeMode}, PANDAOPS_CLIENT_ID, PANDAOPS_CLIENT_SECRET, }; use ethportal_api::{ types::{ - execution::block_body::{ - BlockBody, BlockBodyLegacy, BlockBodyMerge, BlockBodyShanghai, MERGE_TIMESTAMP, - SHANGHAI_TIMESTAMP, + execution::{ + accumulator::EpochAccumulator, + block_body::{ + BlockBody, BlockBodyLegacy, BlockBodyMerge, BlockBodyShanghai, MERGE_TIMESTAMP, + SHANGHAI_TIMESTAMP, + }, + header::{AccumulatorProof, BlockHeaderProof, HeaderWithProof, SszNone}, }, jsonrpc::{params::Params, request::JsonRequest}, }, utils::bytes::hex_encode, - Header, Receipts, + BlockBodyKey, BlockHeaderKey, BlockReceiptsKey, Header, HistoryContentKey, HistoryContentValue, + Receipts, }; +use trin_validation::{accumulator::MasterAccumulator, constants::MERGE_BLOCK_NUMBER}; /// Limit the number of requests in a single batch to avoid exceeding the /// provider's batch size limit configuration of 100. const BATCH_LIMIT: usize = 100; /// Implements endpoints from the Execution API to access data from the execution layer. +/// Performs validation of the data returned from the provider. #[derive(Clone, Debug)] pub struct ExecutionApi { - client: Client, + pub client: Client, + pub master_acc: MasterAccumulator, } impl ExecutionApi { @@ -71,14 +78,80 @@ impl ExecutionApi { if provider != Provider::Test { check_provider(&client).await?; } - Ok(Self { client }) + let master_acc = MasterAccumulator::default(); + Ok(Self { client, master_acc }) } - /// Returns an unvalidated block body for the given FullHeader. - pub async fn get_trusted_block_body( + /// Return a validated FullHeader & content key / value pair for the given header. + pub async fn get_header( + &self, + height: u64, + epoch_acc: Option>, + ) -> anyhow::Result<(FullHeader, HistoryContentKey, HistoryContentValue)> { + // Geth requires block numbers to be formatted using the following padding. + let block_param = format!("0x{height:01X}"); + let params = Params::Array(vec![json!(block_param), json!(true)]); + let request = JsonRequest::new("eth_getBlockByNumber".to_string(), params, height as u32); + let response = self.send_request(request).await?; + let result = response + .get("result") + .ok_or_else(|| anyhow!("Unable to fetch header for block: {height:?}"))?; + let mut full_header: FullHeader = FullHeader::try_from(result.clone())?; + + // Add epoch accumulator to header if it's a pre-merge block. + if full_header.header.number < MERGE_BLOCK_NUMBER { + if epoch_acc.is_none() { + bail!("Epoch accumulator is required for pre-merge blocks"); + } + full_header.epoch_acc = epoch_acc; + } + + // Validate header. + if let Err(msg) = full_header.validate() { + bail!("Header validation failed: {msg}"); + }; + + // Construct content key / value pair. + let content_key = HistoryContentKey::BlockHeaderWithProof(BlockHeaderKey { + block_hash: full_header.header.hash().to_fixed_bytes(), + }); + let content_value = match &full_header.epoch_acc { + Some(epoch_acc) => { + // Construct HeaderWithProof + let header_with_proof = + construct_proof(full_header.header.clone(), epoch_acc).await?; + // Double check that the proof is valid + self.master_acc + .validate_header_with_proof(&header_with_proof)?; + HistoryContentValue::BlockHeaderWithProof(header_with_proof) + } + None => { + let header_with_proof = HeaderWithProof { + header: full_header.header.clone(), + proof: BlockHeaderProof::None(SszNone { value: None }), + }; + HistoryContentValue::BlockHeaderWithProof(header_with_proof) + } + }; + Ok((full_header, content_key, content_value)) + } + + /// Return a validated BlockBody content key / value for the given FullHeader. + pub async fn get_block_body( &self, full_header: &FullHeader, - ) -> anyhow::Result { + ) -> anyhow::Result<(HistoryContentKey, HistoryContentValue)> { + let block_body = self.get_trusted_block_body(full_header).await?; + block_body.validate_against_header(&full_header.header)?; + let content_key = HistoryContentKey::BlockBody(BlockBodyKey { + block_hash: full_header.header.hash().to_fixed_bytes(), + }); + let content_value = HistoryContentValue::BlockBody(block_body); + Ok((content_key, content_value)) + } + + /// Return an unvalidated block body for the given FullHeader. + async fn get_trusted_block_body(&self, full_header: &FullHeader) -> anyhow::Result { let txs = full_header.txs.clone(); if full_header.header.timestamp > SHANGHAI_TIMESTAMP { if !full_header.uncles.is_empty() { @@ -103,36 +176,7 @@ impl ExecutionApi { } } - /// Returns unvalidated receipts for the given transaction hashes. - pub async fn get_trusted_receipts(&self, tx_hashes: &[H256]) -> anyhow::Result { - let request: Vec = tx_hashes - .iter() - .enumerate() - .map(|(id, tx_hash)| { - let tx_hash = hex_encode(tx_hash); - let params = Params::Array(vec![json!(tx_hash)]); - let method = "eth_getTransactionReceipt".to_string(); - JsonRequest::new(method, params, id as u32) - }) - .collect(); - let response = self.batch_requests(request).await?; - Ok(serde_json::from_str(&response)?) - } - - pub async fn get_latest_block_number(&self) -> anyhow::Result { - let params = Params::Array(vec![json!("latest"), json!(false)]); - let method = "eth_getBlockByNumber".to_string(); - let request = JsonRequest::new(method, params, 1); - let response = self.batch_requests(vec![request]).await?; - let response: Vec = serde_json::from_str(&response)?; - let result = response[0] - .get("result") - .ok_or_else(|| anyhow!("Unable to fetch latest block"))?; - let header: Header = serde_json::from_value(result.clone())?; - Ok(header.number) - } - - /// Returns unvalidated uncles for the given uncle hashes. + /// Return unvalidated uncles for the given uncle hashes. async fn get_trusted_uncles(&self, hashes: &[H256]) -> anyhow::Result> { let batch_request = hashes .iter() @@ -158,21 +202,63 @@ impl ExecutionApi { Ok(headers) } - pub async fn get_header(&self, height: u64) -> anyhow::Result { - // Geth requires block numbers to be formatted using the following padding. - let block_param = format!("0x{height:01X}"); - let params = Params::Array(vec![json!(block_param), json!(true)]); - let batch_request = vec![JsonRequest::new( - "eth_getBlockByNumber".to_string(), - params, - height as u32, - )]; - let response = self.batch_requests(batch_request).await?; - let batch: FullHeaderBatch = serde_json::from_str(&response)?; - if batch.headers.len() != 1 { - bail!("Expected 1 header, got {:#?}", batch.headers.len()); + /// Return validated Receipts content key / value for the given FullHeader. + pub async fn get_receipts( + &self, + full_header: &FullHeader, + ) -> anyhow::Result<(HistoryContentKey, HistoryContentValue)> { + // Build receipts + let receipts = match full_header.txs.len() { + 0 => Receipts { + receipt_list: vec![], + }, + _ => { + self.get_trusted_receipts(&full_header.tx_hashes.hashes) + .await? + } + }; + + // Validate Receipts + let receipts_root = receipts.root()?; + if receipts_root != full_header.header.receipts_root { + bail!( + "Receipts root doesn't match header receipts root: {receipts_root:?} - {:?}", + full_header.header.receipts_root + ); } - Ok(batch.headers[0].clone()) + let content_key = HistoryContentKey::BlockReceipts(BlockReceiptsKey { + block_hash: full_header.header.hash().to_fixed_bytes(), + }); + let content_value = HistoryContentValue::Receipts(receipts); + Ok((content_key, content_value)) + } + + /// Return unvalidated receipts for the given transaction hashes. + async fn get_trusted_receipts(&self, tx_hashes: &[H256]) -> anyhow::Result { + let request: Vec = tx_hashes + .iter() + .enumerate() + .map(|(id, tx_hash)| { + let tx_hash = hex_encode(tx_hash); + let params = Params::Array(vec![json!(tx_hash)]); + let method = "eth_getTransactionReceipt".to_string(); + JsonRequest::new(method, params, id as u32) + }) + .collect(); + let response = self.batch_requests(request).await?; + Ok(serde_json::from_str(&response)?) + } + + pub async fn get_latest_block_number(&self) -> anyhow::Result { + let params = Params::Array(vec![json!("latest"), json!(false)]); + let method = "eth_getBlockByNumber".to_string(); + let request = JsonRequest::new(method, params, 1); + let response = self.send_request(request).await?; + let result = response + .get("result") + .ok_or_else(|| anyhow!("Unable to fetch latest block"))?; + let header: Header = serde_json::from_value(result.clone())?; + Ok(header.number) } /// Used the "surf" library here instead of "ureq" since "surf" is much more capable of handling @@ -213,6 +299,30 @@ impl ExecutionApi { serde_json::from_str::>(&result?) .map_err(|err| anyhow!("Unable to parse execution batch from provider: {err:?}")) } + + async fn send_request(&self, request: JsonRequest) -> anyhow::Result { + let result = self + .client + .post("") + .middleware(Retry::default()) + .body_json(&request) + .map_err(|e| anyhow!("Unable to construct json post request: {e:?}"))? + .recv_string() + .await + .map_err(|err| anyhow!("Unable to request execution payload from provider: {err:?}")); + serde_json::from_str::(&result?) + .map_err(|err| anyhow!("Unable to parse execution response from provider: {err:?}")) + } +} + +/// Create a proof for the given header / epoch acc +async fn construct_proof( + header: Header, + epoch_acc: &EpochAccumulator, +) -> anyhow::Result { + let proof = MasterAccumulator::construct_proof(&header, epoch_acc)?; + let proof = BlockHeaderProof::AccumulatorProof(AccumulatorProof { proof }); + Ok(HeaderWithProof { header, proof }) } #[derive(Debug)] @@ -238,7 +348,7 @@ impl Middleware for Retry { let body = req.take_body().into_bytes().await?; while retry_count < self.attempts { if retry_count > 0 { - info!("Retrying request"); + warn!("Retrying request"); } let mut new_req = req.clone(); new_req.set_body(Body::from_bytes(body.clone())); diff --git a/portal-bridge/src/bridge/history.rs b/portal-bridge/src/bridge/history.rs index e717b063b..54a52efb9 100644 --- a/portal-bridge/src/bridge/history.rs +++ b/portal-bridge/src/bridge/history.rs @@ -5,7 +5,7 @@ use std::{ sync::{Arc, Mutex}, }; -use anyhow::{anyhow, bail}; +use anyhow::anyhow; use ssz::Decode; use tokio::{ sync::{OwnedSemaphorePermit, Semaphore}, @@ -21,18 +21,10 @@ use crate::{ utils::{read_test_assets_from_file, TestAssets}, }; use ethportal_api::{ - jsonrpsee::http_client::HttpClient, - types::execution::{ - accumulator::EpochAccumulator, - header::{AccumulatorProof, BlockHeaderProof, Header, HeaderWithProof, SszNone}, - receipts::Receipts, - }, - utils::bytes::hex_encode, - BlockBodyKey, BlockHeaderKey, BlockReceiptsKey, EpochAccumulatorKey, HistoryContentKey, - HistoryContentValue, + jsonrpsee::http_client::HttpClient, types::execution::accumulator::EpochAccumulator, + utils::bytes::hex_encode, EpochAccumulatorKey, HistoryContentKey, HistoryContentValue, }; use trin_validation::{ - accumulator::MasterAccumulator, constants::{EPOCH_SIZE as EPOCH_SIZE_USIZE, MERGE_BLOCK_NUMBER}, oracle::HeaderOracle, }; @@ -40,7 +32,7 @@ use trin_validation::{ // todo: calculate / test optimal saturation delay const HEADER_SATURATION_DELAY: u64 = 10; // seconds const LATEST_BLOCK_POLL_RATE: u64 = 5; // seconds -const EPOCH_SIZE: u64 = EPOCH_SIZE_USIZE as u64; +pub const EPOCH_SIZE: u64 = EPOCH_SIZE_USIZE as u64; const GOSSIP_LIMIT: usize = 32; const SERVE_BLOCK_TIMEOUT: Duration = Duration::from_secs(120); @@ -218,14 +210,24 @@ impl HistoryBridge { execution_api: ExecutionApi, ) -> anyhow::Result<()> { info!("Serving block: {height}"); - let mut full_header = execution_api.get_header(height).await?; - if full_header.header.number <= MERGE_BLOCK_NUMBER { - full_header.epoch_acc = epoch_acc; - } + let (full_header, header_content_key, header_content_value) = + execution_api.get_header(height, epoch_acc).await?; let block_stats = Arc::new(Mutex::new(HistoryBlockStats::new( full_header.header.number, ))); - HistoryBridge::gossip_header(&full_header, &portal_clients, block_stats.clone()).await?; + + debug!("Built and validated HeaderWithProof for Block #{height:?}: now gossiping."); + if let Err(msg) = gossip_history_content( + &portal_clients, + header_content_key, + header_content_value, + block_stats.clone(), + ) + .await + { + warn!("Error gossiping HeaderWithProof #{height:?}: {msg:?}"); + }; + // Sleep for 10 seconds to allow headers to saturate network, // since they must be available for body / receipt validation. sleep(Duration::from_secs(HEADER_SATURATION_DELAY)).await; @@ -259,55 +261,6 @@ impl HistoryBridge { Ok(()) } - async fn gossip_header( - full_header: &FullHeader, - portal_clients: &Vec, - block_stats: Arc>, - ) -> anyhow::Result<()> { - debug!("Serving header: {}", full_header.header.number); - if full_header.header.number < MERGE_BLOCK_NUMBER && full_header.epoch_acc.is_none() { - bail!("Invalid header, expected to have epoch accumulator"); - } - let content_key = HistoryContentKey::BlockHeaderWithProof(BlockHeaderKey { - block_hash: full_header.header.hash().to_fixed_bytes(), - }); - // validate pre-merge - let content_value = match &full_header.epoch_acc { - Some(epoch_acc) => { - // Fetch HeaderRecord from EpochAccumulator for validation - let header_index = full_header.header.number % EPOCH_SIZE; - let header_record = &epoch_acc[header_index as usize]; - - // Validate Header - if header_record.block_hash != full_header.header.hash() { - bail!( - "Header hash doesn't match record in local accumulator: {:?} - {:?}", - full_header.header.hash(), - header_record.block_hash - ); - } - // Construct HeaderWithProof - let header_with_proof = - HistoryBridge::construct_proof(full_header.header.clone(), epoch_acc).await?; - HistoryContentValue::BlockHeaderWithProof(header_with_proof) - } - None => { - let header_with_proof = HeaderWithProof { - header: full_header.header.clone(), - proof: BlockHeaderProof::None(SszNone { value: None }), - }; - HistoryContentValue::BlockHeaderWithProof(header_with_proof) - } - }; - debug!( - "Gossip: Block #{:?} HeaderWithProof", - full_header.header.number - ); - let _ = - gossip_history_content(portal_clients, content_key, content_value, block_stats).await; - Ok(()) - } - /// Attempt to lookup an epoch accumulator from local portal-accumulators path provided via cli /// arg. Gossip the epoch accumulator if found. async fn get_epoch_acc(&self, epoch_index: u64) -> anyhow::Result> { @@ -331,6 +284,7 @@ impl HistoryBridge { let content_value = HistoryContentValue::EpochAccumulator(local_epoch_acc.clone()); // create unique stats for epoch accumulator, since it's rarely gossiped let block_stats = Arc::new(Mutex::new(HistoryBlockStats::new(epoch_index * EPOCH_SIZE))); + debug!("Built EpochAccumulator for Epoch #{epoch_index:?}: now gossiping."); let _ = gossip_history_content( &self.portal_clients, content_key, @@ -347,34 +301,12 @@ impl HistoryBridge { execution_api: &ExecutionApi, block_stats: Arc>, ) -> anyhow::Result<()> { - debug!("Serving receipt: {:?}", full_header.header.number); - let receipts = match full_header.txs.len() { - 0 => Receipts { - receipt_list: vec![], - }, - _ => { - execution_api - .get_trusted_receipts(&full_header.tx_hashes.hashes) - .await? - } - }; - - // Validate Receipts - let receipts_root = receipts.root()?; - if receipts_root != full_header.header.receipts_root { - bail!( - "Receipts root doesn't match header receipts root: {receipts_root:?} - {:?}", - full_header.header.receipts_root - ); - } - let content_key = HistoryContentKey::BlockReceipts(BlockReceiptsKey { - block_hash: full_header.header.hash().to_fixed_bytes(), - }); - let content_value = HistoryContentValue::Receipts(receipts); - debug!("Gossip: Block #{:?} Receipts", full_header.header.number,); - let _ = - gossip_history_content(portal_clients, content_key, content_value, block_stats).await; - Ok(()) + let (content_key, content_value) = execution_api.get_receipts(full_header).await?; + debug!( + "Built and validated Receipts for Block #{:?}: now gossiping.", + full_header.header.number + ); + gossip_history_content(portal_clients, content_key, content_value, block_stats).await } async fn construct_and_gossip_block_body( @@ -383,26 +315,11 @@ impl HistoryBridge { execution_api: &ExecutionApi, block_stats: Arc>, ) -> anyhow::Result<()> { - let block_body = execution_api.get_trusted_block_body(full_header).await?; - block_body.validate_against_header(&full_header.header)?; - - let content_key = HistoryContentKey::BlockBody(BlockBodyKey { - block_hash: full_header.header.hash().to_fixed_bytes(), - }); - let content_value = HistoryContentValue::BlockBody(block_body); - debug!("Gossip: Block #{:?} BlockBody", full_header.header.number); - let _ = - gossip_history_content(portal_clients, content_key, content_value, block_stats).await; - Ok(()) - } - - /// Create a proof for the given header / epoch acc - async fn construct_proof( - header: Header, - epoch_acc: &EpochAccumulator, - ) -> anyhow::Result { - let proof = MasterAccumulator::construct_proof(&header, epoch_acc)?; - let proof = BlockHeaderProof::AccumulatorProof(AccumulatorProof { proof }); - Ok(HeaderWithProof { header, proof }) + let (content_key, content_value) = execution_api.get_block_body(full_header).await?; + debug!( + "Built and validated BlockBody for Block #{:?}: now gossiping.", + full_header.header.number + ); + gossip_history_content(portal_clients, content_key, content_value, block_stats).await } } diff --git a/portal-bridge/src/lib.rs b/portal-bridge/src/lib.rs index 328e16e49..1158f501e 100644 --- a/portal-bridge/src/lib.rs +++ b/portal-bridge/src/lib.rs @@ -15,8 +15,8 @@ use lazy_static::lazy_static; use std::env; lazy_static! { - static ref PANDAOPS_CLIENT_ID: String = + pub static ref PANDAOPS_CLIENT_ID: String = env::var("PANDAOPS_CLIENT_ID").expect("PANDAOPS_CLIENT_ID env var not set."); - static ref PANDAOPS_CLIENT_SECRET: String = + pub static ref PANDAOPS_CLIENT_SECRET: String = env::var("PANDAOPS_CLIENT_SECRET").expect("PANDAOPS_CLIENT_SECRET env var not set."); } diff --git a/portal-bridge/src/types/full_header.rs b/portal-bridge/src/types/full_header.rs index 67928285f..6452181a3 100644 --- a/portal-bridge/src/types/full_header.rs +++ b/portal-bridge/src/types/full_header.rs @@ -1,9 +1,11 @@ use std::sync::Arc; +use anyhow::{anyhow, ensure}; use ethereum_types::H256; use serde::{Deserialize, Deserializer}; use serde_json::Value; +use crate::bridge::history::EPOCH_SIZE; use ethportal_api::types::{ consensus::withdrawal::Withdrawal, execution::{ @@ -12,6 +14,7 @@ use ethportal_api::types::{ transaction::Transaction, }, }; +use trin_validation::constants::MERGE_BLOCK_NUMBER; /// Helper type to deserialize a response from a batched Header request. #[derive(Debug, Clone, PartialEq, Eq)] @@ -77,6 +80,39 @@ impl TryFrom for FullHeader { } } +impl FullHeader { + pub fn validate(&self) -> anyhow::Result<()> { + // validation for pre-merge blocks + if self.header.number < MERGE_BLOCK_NUMBER { + let epoch_acc = self + .epoch_acc + .as_ref() + .ok_or_else(|| anyhow!("epoch_acc is missing for pre-merge block"))?; + + // Fetch HeaderRecord from EpochAccumulator for validation + let header_index = self.header.number % EPOCH_SIZE; + let header_record = &epoch_acc[header_index as usize]; + + // Validate Header + let actual_header_hash = self.header.hash(); + + ensure!( + header_record.block_hash == actual_header_hash, + "Header hash doesn't match record in local accumulator: {:?} - {:?}", + actual_header_hash, + header_record.block_hash + ); + } + ensure!( + self.txs.len() == self.tx_hashes.hashes.len(), + "txs.len() != tx_hashes.hashes.len(): {} != {}", + self.txs.len(), + self.tx_hashes.hashes.len() + ); + Ok(()) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/bin/sample_range.rs b/src/bin/sample_range.rs index fccac9490..8cef11111 100644 --- a/src/bin/sample_range.rs +++ b/src/bin/sample_range.rs @@ -27,6 +27,8 @@ use trin_validation::constants::MERGE_BLOCK_NUMBER; // cargo run --bin sample_range -- --sample-size 50 --range since:100000 // to sample 5 blocks from the latest 500 blocks: // cargo run --bin sample_range -- --sample-size 5 --range latest:500 +// with a custom node ip: +// cargo run --bin sample_range -- --sample-size 1 --range range:1-2 --node-ip http://127.0.0.1:50933 #[derive(Default)] struct Metrics { @@ -39,9 +41,9 @@ impl Metrics { fn display_stats(&self) { info!( "Headers {:?}% // Bodies {:?}% // Receipts {:?}%", - (self.header.success_count * 100) / self.header.total_count(), - (self.block_body.success_count * 100) / self.block_body.total_count(), - (self.receipts.success_count * 100) / self.receipts.total_count() + self.header.success_rate(), + self.block_body.success_rate(), + self.receipts.success_rate(), ); debug!( "Headers: {:?}/{:?} // Bodies: {:?}/{:?} // Receipts: {:?}/{:?}", @@ -69,6 +71,14 @@ impl Details { fn total_count(&self) -> u32 { self.success_count + self.failure_count } + + fn success_rate(&self) -> u32 { + if self.total_count() == 0 { + 0 + } else { + (self.success_count * 100) / self.total_count() + } + } } #[tokio::main] @@ -87,6 +97,7 @@ pub async fn main() -> Result<()> { SampleRange::FourFours => Uniform::new_inclusive(0, MERGE_BLOCK_NUMBER), SampleRange::Since(since) => Uniform::new_inclusive(since, latest_block), SampleRange::Latest(latest) => Uniform::from(latest_block - latest..latest_block), + SampleRange::Range(start, end) => Uniform::from(start..end), }; info!( "Sampling {} Blocks from Range: {:?}", @@ -179,7 +190,7 @@ async fn audit_block( pub struct SampleConfig { #[arg( long, - help = "Range to sample blocks from (shanghai, fourfours, since:123, latest:123)" + help = "Range to sample blocks from (shanghai, fourfours, since:123, latest:123, range:123-456)" )] pub range: SampleRange, @@ -196,6 +207,7 @@ pub enum SampleRange { Shanghai, Since(u64), Latest(u64), + Range(u64, u64), } type ParseError = &'static str; @@ -211,12 +223,30 @@ impl FromStr for SampleRange { let index = val.find(':').ok_or("Invalid sample range, missing `:`")?; let (mode, val) = val.split_at(index); let val = val.trim_start_matches(':'); - let block = val - .parse::() - .map_err(|_| "Invalid sample range: unable to parse block number")?; match mode { - "since" => Ok(Self::Since(block)), - "latest" => Ok(Self::Latest(block)), + "since" => { + let block = val + .parse::() + .map_err(|_| "Invalid sample range: unable to parse block number")?; + Ok(Self::Since(block)) + } + "latest" => { + let block = val + .parse::() + .map_err(|_| "Invalid sample range: unable to parse block number")?; + Ok(Self::Latest(block)) + } + "range" => { + let index = val.find('-').ok_or("Invalid sample range, missing `-`")?; + let (start, end) = val.split_at(index); + let start = start.parse::().map_err(|_| { + "Invalid sample range: unable to parse start block number" + })?; + let end = end.trim_start_matches('-').parse::().map_err(|_| { + "Invalid sample range: unable to parse end block number" + })?; + Ok(Self::Range(start, end)) + } _ => Err("Invalid sample range: invalid mode"), } } diff --git a/src/bin/test_providers.rs b/src/bin/test_providers.rs new file mode 100644 index 000000000..10ed8a92f --- /dev/null +++ b/src/bin/test_providers.rs @@ -0,0 +1,444 @@ +use std::{fmt, fs, ops::Range, sync::Arc}; + +use anyhow::{anyhow, Result}; +use clap::Parser; +use rand::{ + distributions::{Distribution, Uniform}, + thread_rng, +}; +use ssz::Decode; +use surf::{Client, Config}; +use tracing::{debug, info, warn}; +use url::Url; + +use ethportal_api::{types::execution::accumulator::EpochAccumulator, utils::bytes::hex_encode}; +use portal_bridge::{ + api::execution::ExecutionApi, bridge::history::EPOCH_SIZE, PANDAOPS_CLIENT_ID, + PANDAOPS_CLIENT_SECRET, +}; +use trin_utils::log::init_tracing_logger; +use trin_validation::{ + accumulator::MasterAccumulator, + constants::{ + BERLIN_BLOCK_NUMBER, BYZANTIUM_BLOCK_NUMBER, CONSTANTINOPLE_BLOCK_NUMBER, + HOMESTEAD_BLOCK_NUMBER, ISTANBUL_BLOCK_NUMBER, LONDON_BLOCK_NUMBER, MERGE_BLOCK_NUMBER, + SHANGHAI_BLOCK_NUMBER, + }, +}; + +// tldr: +// Randomly samples X blocks from every hard fork range. +// Validates that each provider is able to return valid +// headers, receipts, and block bodies for each randomly sampled block. +// +// Tested Providers: +// - Infura +// - PandaOps-Erigon +// - PandaOps-Geth +// - PandaOps-Archive +// +// cargo run --bin test_providers -- --sample-size 5 +// + +#[tokio::main] +pub async fn main() -> Result<()> { + init_tracing_logger(); + let config = ProviderConfig::parse(); + let all_providers: Vec = Providers::into_vec(); + let client = all_providers.first().unwrap().get_client(); + let master_acc = MasterAccumulator::default(); + let api = ExecutionApi { + client, + master_acc: master_acc.clone(), + }; + let latest_block = api.get_latest_block_number().await?; + let mut all_ranges = Ranges::into_vec(config.sample_size, latest_block); + let mut all_providers: Vec = Providers::into_vec(); + for provider in all_providers.iter_mut() { + info!("Testing Provider: {provider}"); + let mut provider_failures = 0; + let client = provider.get_client(); + let api = ExecutionApi { + client, + master_acc: master_acc.clone(), + }; + for gossip_range in all_ranges.iter_mut() { + debug!("Testing range: {gossip_range:?}"); + let mut range_failures = 0; + for block in gossip_range.blocks() { + debug!("Testing block: {block}"); + let epoch_acc = match lookup_epoch_acc(*block) { + Ok(epoch_acc) => epoch_acc, + Err(msg) => { + provider_failures += 3; + range_failures += 3; + warn!( + "--- failed to build valid header, receipts, & block body for block: {block}: Invalid epoch acc: {msg}" + ); + continue; + } + }; + let (full_header, _, _) = match api.get_header(*block, epoch_acc).await { + Ok(header) => header, + Err(_) => { + provider_failures += 3; + range_failures += 3; + warn!("--- failed to build valid header, receipts, & block body for block: {block}"); + continue; + } + }; + if let Err(msg) = api.get_receipts(&full_header).await { + provider_failures += 1; + range_failures += 1; + warn!("--- failed to build valid receipts for block: {block}: Error: {msg}"); + }; + if let Err(msg) = api.get_block_body(&full_header).await { + provider_failures += 1; + range_failures += 1; + warn!("--- failed to build valid block body for block: {block}: Error: {msg}"); + }; + } + let total = config.sample_size * 3; + gossip_range.update_success_rate(range_failures, total as u64); + debug!( + "Provider: {provider:?} // Range: {gossip_range:?} // Failures: {range_failures}/{total}" + ); + } + let total = + config.sample_size * Ranges::into_vec(config.sample_size, latest_block).len() * 3; + provider.update_success_rate(provider_failures, total as u64); + debug!("Provider Summary: {provider:?} // Failures: {provider_failures}/{total}"); + } + info!("Range Summary:"); + for range in all_ranges.iter() { + range.display_summary(); + } + info!("Provider Summary:"); + for provider in all_providers.iter() { + provider.display_summary(); + } + info!("Finished testing providers"); + Ok(()) +} + +fn lookup_epoch_acc(block: u64) -> Result>> { + if block >= MERGE_BLOCK_NUMBER { + return Ok(None); + } + let epoch_index = block / EPOCH_SIZE; + let master_acc = MasterAccumulator::default(); + let epoch_hash = master_acc.historical_epochs[epoch_index as usize]; + let epoch_hash_pretty = hex_encode(epoch_hash); + let epoch_hash_pretty = epoch_hash_pretty.trim_start_matches("0x"); + let epoch_acc_path = + format!("./portal-accumulators/bridge_content/0x03{epoch_hash_pretty}.portalcontent"); + let local_epoch_acc = match fs::read(&epoch_acc_path) { + Ok(val) => EpochAccumulator::from_ssz_bytes(&val).map_err(|err| anyhow!("{err:?}"))?, + Err(_) => { + return Err(anyhow!( + "Unable to find local epoch acc at path: {epoch_acc_path:?}" + )) + } + }; + Ok(Some(Arc::new(local_epoch_acc))) +} + +// CLI Parameter Handling +#[derive(Parser, Debug, PartialEq)] +#[command( + name = "Provider Config", + about = "Script to test provider content building stuffs" +)] +pub struct ProviderConfig { + #[arg( + long, + help = "Number of samples to take for each range", + default_value = "5" + )] + pub sample_size: usize, +} + +#[derive(Debug, PartialEq)] +enum Ranges { + // vec of blocks is to store randomly sampled blocks / range + // so that the same blocks are tested across the providers + Frontier((Vec, SuccessRate)), + Homestead((Vec, SuccessRate)), + Byzantium((Vec, SuccessRate)), + Constantinople((Vec, SuccessRate)), + Istanbul((Vec, SuccessRate)), + Berlin((Vec, SuccessRate)), + London((Vec, SuccessRate)), + Merge((Vec, SuccessRate)), + Shanghai((Vec, SuccessRate)), +} + +impl fmt::Display for Ranges { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Ranges::Frontier(_) => write!(f, "Frontier"), + Ranges::Homestead(_) => write!(f, "Homestead"), + Ranges::Byzantium(_) => write!(f, "Byzantium"), + Ranges::Constantinople(_) => write!(f, "Constantinople"), + Ranges::Istanbul(_) => write!(f, "Istanbul"), + Ranges::Berlin(_) => write!(f, "Berlin"), + Ranges::London(_) => write!(f, "London"), + Ranges::Merge(_) => write!(f, "Merge"), + Ranges::Shanghai(_) => write!(f, "Shanghai"), + } + } +} + +impl Ranges { + fn display_summary(&self) { + let success_rate = match self { + Ranges::Frontier((_, success_rate)) + | Ranges::Homestead((_, success_rate)) + | Ranges::Byzantium((_, success_rate)) + | Ranges::Constantinople((_, success_rate)) + | Ranges::Istanbul((_, success_rate)) + | Ranges::Berlin((_, success_rate)) + | Ranges::London((_, success_rate)) + | Ranges::Merge((_, success_rate)) + | Ranges::Shanghai((_, success_rate)) => success_rate, + }; + info!( + "Range: {} // Failure Rate: {}/{}", + self, success_rate.failures, success_rate.total + ); + } + + fn blocks(&self) -> &Vec { + match self { + Ranges::Frontier((blocks, _)) + | Ranges::Homestead((blocks, _)) + | Ranges::Byzantium((blocks, _)) + | Ranges::Constantinople((blocks, _)) + | Ranges::Istanbul((blocks, _)) + | Ranges::Berlin((blocks, _)) + | Ranges::London((blocks, _)) + | Ranges::Merge((blocks, _)) + | Ranges::Shanghai((blocks, _)) => blocks, + } + } + + fn update_success_rate(&mut self, failures: u64, total: u64) { + match self { + Ranges::Frontier((_, success_rate)) + | Ranges::Homestead((_, success_rate)) + | Ranges::Byzantium((_, success_rate)) + | Ranges::Constantinople((_, success_rate)) + | Ranges::Istanbul((_, success_rate)) + | Ranges::Berlin((_, success_rate)) + | Ranges::London((_, success_rate)) + | Ranges::Merge((_, success_rate)) + | Ranges::Shanghai((_, success_rate)) => { + success_rate.failures += failures; + success_rate.total += total; + } + } + } + + pub fn into_vec(sample_size: usize, latest_block: u64) -> Vec { + let mut rng = thread_rng(); + vec![ + Ranges::Frontier(( + Uniform::from(Range { + start: 0, + end: HOMESTEAD_BLOCK_NUMBER, + }) + .sample_iter(&mut rng) + .take(sample_size) + .collect(), + SuccessRate::default(), + )), + Ranges::Homestead(( + Uniform::from(Range { + start: HOMESTEAD_BLOCK_NUMBER, + end: BYZANTIUM_BLOCK_NUMBER, + }) + .sample_iter(&mut rng) + .take(sample_size) + .collect(), + SuccessRate::default(), + )), + Ranges::Byzantium(( + Uniform::from(Range { + start: BYZANTIUM_BLOCK_NUMBER, + end: CONSTANTINOPLE_BLOCK_NUMBER, + }) + .sample_iter(&mut rng) + .take(sample_size) + .collect(), + SuccessRate::default(), + )), + Ranges::Constantinople(( + Uniform::from(Range { + start: CONSTANTINOPLE_BLOCK_NUMBER, + end: ISTANBUL_BLOCK_NUMBER, + }) + .sample_iter(&mut rng) + .take(sample_size) + .collect(), + SuccessRate::default(), + )), + Ranges::Istanbul(( + Uniform::from(Range { + start: ISTANBUL_BLOCK_NUMBER, + end: BERLIN_BLOCK_NUMBER, + }) + .sample_iter(&mut rng) + .take(sample_size) + .collect(), + SuccessRate::default(), + )), + Ranges::Berlin(( + Uniform::from(Range { + start: BERLIN_BLOCK_NUMBER, + end: LONDON_BLOCK_NUMBER, + }) + .sample_iter(&mut rng) + .take(sample_size) + .collect(), + SuccessRate::default(), + )), + Ranges::London(( + Uniform::from(Range { + start: LONDON_BLOCK_NUMBER, + end: MERGE_BLOCK_NUMBER, + }) + .sample_iter(&mut rng) + .take(sample_size) + .collect(), + SuccessRate::default(), + )), + Ranges::Merge(( + Uniform::from(Range { + start: MERGE_BLOCK_NUMBER, + end: SHANGHAI_BLOCK_NUMBER, + }) + .sample_iter(&mut rng) + .take(sample_size) + .collect(), + SuccessRate::default(), + )), + Ranges::Shanghai(( + Uniform::from(Range { + start: SHANGHAI_BLOCK_NUMBER, + end: latest_block, + }) + .sample_iter(&mut rng) + .take(sample_size) + .collect(), + SuccessRate::default(), + )), + ] + } +} + +#[derive(Debug, PartialEq, Default)] +struct SuccessRate { + failures: u64, + total: u64, +} + +#[derive(Debug, PartialEq)] +enum Providers { + PandaGeth(SuccessRate), + PandaErigon(SuccessRate), + PandaArchive(SuccessRate), + Infura(SuccessRate), +} + +impl fmt::Display for Providers { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Providers::PandaGeth(_) => write!(f, "PandaGeth"), + Providers::PandaErigon(_) => write!(f, "PandaErigon"), + Providers::PandaArchive(_) => write!(f, "PandaArchive"), + Providers::Infura(_) => write!(f, "Infura"), + } + } +} + +impl Providers { + fn display_summary(&self) { + let success_rate = match self { + Providers::PandaGeth(success_rate) + | Providers::PandaErigon(success_rate) + | Providers::PandaArchive(success_rate) + | Providers::Infura(success_rate) => success_rate, + }; + info!( + "Provider: {} // Failure Rate: {:?} / {:?}", + self, success_rate.failures, success_rate.total + ); + } + + fn update_success_rate(&mut self, failures: u64, total: u64) { + match self { + Providers::PandaGeth(success_rate) + | Providers::PandaErigon(success_rate) + | Providers::PandaArchive(success_rate) + | Providers::Infura(success_rate) => { + success_rate.failures += failures; + success_rate.total += total; + } + } + } + + fn into_vec() -> Vec { + vec![ + Providers::PandaGeth(SuccessRate::default()), + Providers::PandaErigon(SuccessRate::default()), + Providers::PandaArchive(SuccessRate::default()), + Providers::Infura(SuccessRate::default()), + ] + } + + fn get_client(&self) -> Client { + match self { + Providers::Infura(_) => { + let infura_key = std::env::var("TRIN_INFURA_PROJECT_ID").unwrap(); + let base_infura_url = + Url::parse(&format!("https://mainnet.infura.io/v3/{}", infura_key)).unwrap(); + Config::new() + .add_header("Content-Type", "application/json") + .unwrap() + .set_base_url(base_infura_url) + .try_into() + .unwrap() + } + _ => { + let base_el_endpoint = match self { + Providers::PandaGeth(_) => { + Url::parse("https://geth-lighthouse.mainnet.eu1.ethpandaops.io/") + .expect("to be able to parse static base el endpoint url") + } + Providers::PandaErigon(_) => { + Url::parse("https://erigon-lighthouse.mainnet.eu1.ethpandaops.io/") + .expect("to be able to parse static base el endpoint url") + } + Providers::PandaArchive(_) => { + Url::parse("https://archive.mainnet.ethpandaops.io/") + .expect("to be able to parse static base el endpoint url") + } + _ => panic!("not implemented"), + }; + Config::new() + .add_header("Content-Type", "application/json") + .unwrap() + .add_header("CF-Access-Client-Id", PANDAOPS_CLIENT_ID.to_string()) + .unwrap() + .add_header( + "CF-Access-Client-Secret", + PANDAOPS_CLIENT_SECRET.to_string(), + ) + .unwrap() + .set_base_url(base_el_endpoint) + .try_into() + .unwrap() + } + } + } +} diff --git a/trin-validation/src/constants.rs b/trin-validation/src/constants.rs index 38dc4c6fd..bdccfa50a 100644 --- a/trin-validation/src/constants.rs +++ b/trin-validation/src/constants.rs @@ -1,4 +1,14 @@ -pub const MERGE_BLOCK_NUMBER: u64 = 15_537_393u64; +// Execution Layer hard forks https://ethereum.org/en/history/ +pub const SHANGHAI_BLOCK_NUMBER: u64 = 17_034_870; +pub const MERGE_BLOCK_NUMBER: u64 = 15_537_394; +pub const LONDON_BLOCK_NUMBER: u64 = 12_965_000; +pub const BERLIN_BLOCK_NUMBER: u64 = 12_244_000; +pub const ISTANBUL_BLOCK_NUMBER: u64 = 9_069_000; +pub const CONSTANTINOPLE_BLOCK_NUMBER: u64 = 7_280_000; +pub const BYZANTIUM_BLOCK_NUMBER: u64 = 4_370_000; +pub const HOMESTEAD_BLOCK_NUMBER: u64 = 1_150_000; + +// todo docstring pub const DEFAULT_MASTER_ACC_HASH: &str = "0x8eac399e24480dce3cfe06f4bdecba51c6e5d0c46200e3e8611a0b44a3a69ff9";