Skip to content

Commit

Permalink
feat: add test_providers script to test ability of various providers …
Browse files Browse the repository at this point in the history
…to build valid history content
  • Loading branch information
njgheorghita committed Jan 26, 2024
1 parent 1d2580b commit 37b8cc2
Show file tree
Hide file tree
Showing 10 changed files with 744 additions and 188 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ jsonrpsee = "0.20.0"
lazy_static = "1.4.0"
parking_lot = "0.11.2"
portalnet = { path = "portalnet" }
portal-bridge = { path = "portal-bridge" }
prometheus_exporter = "0.8.4"
rand = "0.8.4"
reth-ipc = { tag = "v0.1.0-alpha.10", git = "https://github.com/paradigmxyz/reth.git"}
rlp = "0.5.0"
rpc = { path = "rpc"}
serde_json = {version = "1.0.89", features = ["preserve_order"]}
sha3 = "0.9.1"
surf = "2.3.2"
tempfile = "3.3.0"
tokio = { version = "1.14.0", features = ["full"] }
tracing = "0.1.36"
Expand Down
9 changes: 7 additions & 2 deletions ethportal-api/src/types/execution/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,11 +348,13 @@ impl TryFrom<Value> for TransactionId {
type Error = DecoderError;

fn try_from(val: Value) -> Result<Self, Self::Error> {
let id = val.as_str().ok_or(DecoderError::Custom("Invalid tx id."))?;
let id = val.as_str().ok_or(DecoderError::Custom(
"Invalid tx id: unable to decode as string.",
))?;
let id = id.trim_start_matches("0x");
let id = id
.parse::<u8>()
.map_err(|_| DecoderError::Custom("Invalid tx id."))?;
.map_err(|_| DecoderError::Custom("Invalid tx id: unable to parse u8"))?;
Self::try_from(id)
}
}
Expand Down Expand Up @@ -431,6 +433,9 @@ impl<'de> Deserialize<'de> for Receipt {
D: Deserializer<'de>,
{
let obj: Value = Deserialize::deserialize(deserializer)?;
if obj.is_null() {
return Err(anyhow!("Null receipt found.")).map_err(serde::de::Error::custom);
}
let tx_id =
TransactionId::try_from(obj["type"].clone()).map_err(serde::de::Error::custom)?;
match tx_id {
Expand Down
228 changes: 169 additions & 59 deletions portal-bridge/src/api/execution.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use anyhow::{anyhow, bail};
use ethereum_types::H256;
use futures::future::join_all;
Expand All @@ -7,38 +9,43 @@ use surf::{
Body, Client, Config, Request, Response,
};
use tokio::time::{sleep, Duration};
use tracing::{info, warn};
use tracing::warn;
use url::Url;

use crate::{
cli::Provider,
constants::{BASE_EL_ARCHIVE_ENDPOINT, BASE_EL_ENDPOINT},
types::{
full_header::{FullHeader, FullHeaderBatch},
mode::BridgeMode,
},
types::{full_header::FullHeader, mode::BridgeMode},
PANDAOPS_CLIENT_ID, PANDAOPS_CLIENT_SECRET,
};
use ethportal_api::{
types::{
execution::block_body::{
BlockBody, BlockBodyLegacy, BlockBodyMerge, BlockBodyShanghai, MERGE_TIMESTAMP,
SHANGHAI_TIMESTAMP,
execution::{
accumulator::EpochAccumulator,
block_body::{
BlockBody, BlockBodyLegacy, BlockBodyMerge, BlockBodyShanghai, MERGE_TIMESTAMP,
SHANGHAI_TIMESTAMP,
},
header::{AccumulatorProof, BlockHeaderProof, HeaderWithProof, SszNone},
},
jsonrpc::{params::Params, request::JsonRequest},
},
utils::bytes::hex_encode,
Header, Receipts,
BlockBodyKey, BlockHeaderKey, BlockReceiptsKey, Header, HistoryContentKey, HistoryContentValue,
Receipts,
};
use trin_validation::{accumulator::MasterAccumulator, constants::MERGE_BLOCK_NUMBER};

/// Limit the number of requests in a single batch to avoid exceeding the
/// provider's batch size limit configuration of 100.
const BATCH_LIMIT: usize = 100;

/// Implements endpoints from the Execution API to access data from the execution layer.
/// Performs validation of the data returned from the provider.
#[derive(Clone, Debug)]
pub struct ExecutionApi {
client: Client,
pub client: Client,
pub master_acc: MasterAccumulator,
}

impl ExecutionApi {
Expand Down Expand Up @@ -71,14 +78,80 @@ impl ExecutionApi {
if provider != Provider::Test {
check_provider(&client).await?;
}
Ok(Self { client })
let master_acc = MasterAccumulator::default();
Ok(Self { client, master_acc })
}

/// Returns an unvalidated block body for the given FullHeader.
pub async fn get_trusted_block_body(
/// Return a validated FullHeader & content key / value pair for the given header.
pub async fn get_header(
&self,
height: u64,
epoch_acc: Option<Arc<EpochAccumulator>>,
) -> anyhow::Result<(FullHeader, HistoryContentKey, HistoryContentValue)> {
// Geth requires block numbers to be formatted using the following padding.
let block_param = format!("0x{height:01X}");
let params = Params::Array(vec![json!(block_param), json!(true)]);
let request = JsonRequest::new("eth_getBlockByNumber".to_string(), params, height as u32);
let response = self.send_request(request).await?;
let result = response
.get("result")
.ok_or_else(|| anyhow!("Unable to fetch header for block: {height:?}"))?;
let mut full_header: FullHeader = FullHeader::try_from(result.clone())?;

// Add epoch accumulator to header if it's a pre-merge block.
if full_header.header.number < MERGE_BLOCK_NUMBER {
if epoch_acc.is_none() {
bail!("Epoch accumulator is required for pre-merge blocks");
}
full_header.epoch_acc = epoch_acc;
}

// Validate header.
if let Err(msg) = full_header.validate() {
bail!("Header validation failed: {msg}");
};

// Construct content key / value pair.
let content_key = HistoryContentKey::BlockHeaderWithProof(BlockHeaderKey {
block_hash: full_header.header.hash().to_fixed_bytes(),
});
let content_value = match &full_header.epoch_acc {
Some(epoch_acc) => {
// Construct HeaderWithProof
let header_with_proof =
construct_proof(full_header.header.clone(), epoch_acc).await?;
// Double check that the proof is valid
self.master_acc
.validate_header_with_proof(&header_with_proof)?;
HistoryContentValue::BlockHeaderWithProof(header_with_proof)
}
None => {
let header_with_proof = HeaderWithProof {
header: full_header.header.clone(),
proof: BlockHeaderProof::None(SszNone { value: None }),
};
HistoryContentValue::BlockHeaderWithProof(header_with_proof)
}
};
Ok((full_header, content_key, content_value))
}

/// Return a validated BlockBody content key / value for the given FullHeader.
pub async fn get_block_body(
&self,
full_header: &FullHeader,
) -> anyhow::Result<BlockBody> {
) -> anyhow::Result<(HistoryContentKey, HistoryContentValue)> {
let block_body = self.get_trusted_block_body(full_header).await?;
block_body.validate_against_header(&full_header.header)?;
let content_key = HistoryContentKey::BlockBody(BlockBodyKey {
block_hash: full_header.header.hash().to_fixed_bytes(),
});
let content_value = HistoryContentValue::BlockBody(block_body);
Ok((content_key, content_value))
}

/// Return an unvalidated block body for the given FullHeader.
async fn get_trusted_block_body(&self, full_header: &FullHeader) -> anyhow::Result<BlockBody> {
let txs = full_header.txs.clone();
if full_header.header.timestamp > SHANGHAI_TIMESTAMP {
if !full_header.uncles.is_empty() {
Expand All @@ -103,36 +176,7 @@ impl ExecutionApi {
}
}

/// Returns unvalidated receipts for the given transaction hashes.
pub async fn get_trusted_receipts(&self, tx_hashes: &[H256]) -> anyhow::Result<Receipts> {
let request: Vec<JsonRequest> = tx_hashes
.iter()
.enumerate()
.map(|(id, tx_hash)| {
let tx_hash = hex_encode(tx_hash);
let params = Params::Array(vec![json!(tx_hash)]);
let method = "eth_getTransactionReceipt".to_string();
JsonRequest::new(method, params, id as u32)
})
.collect();
let response = self.batch_requests(request).await?;
Ok(serde_json::from_str(&response)?)
}

pub async fn get_latest_block_number(&self) -> anyhow::Result<u64> {
let params = Params::Array(vec![json!("latest"), json!(false)]);
let method = "eth_getBlockByNumber".to_string();
let request = JsonRequest::new(method, params, 1);
let response = self.batch_requests(vec![request]).await?;
let response: Vec<Value> = serde_json::from_str(&response)?;
let result = response[0]
.get("result")
.ok_or_else(|| anyhow!("Unable to fetch latest block"))?;
let header: Header = serde_json::from_value(result.clone())?;
Ok(header.number)
}

/// Returns unvalidated uncles for the given uncle hashes.
/// Return unvalidated uncles for the given uncle hashes.
async fn get_trusted_uncles(&self, hashes: &[H256]) -> anyhow::Result<Vec<Header>> {
let batch_request = hashes
.iter()
Expand All @@ -158,21 +202,63 @@ impl ExecutionApi {
Ok(headers)
}

pub async fn get_header(&self, height: u64) -> anyhow::Result<FullHeader> {
// Geth requires block numbers to be formatted using the following padding.
let block_param = format!("0x{height:01X}");
let params = Params::Array(vec![json!(block_param), json!(true)]);
let batch_request = vec![JsonRequest::new(
"eth_getBlockByNumber".to_string(),
params,
height as u32,
)];
let response = self.batch_requests(batch_request).await?;
let batch: FullHeaderBatch = serde_json::from_str(&response)?;
if batch.headers.len() != 1 {
bail!("Expected 1 header, got {:#?}", batch.headers.len());
/// Return validated Receipts content key / value for the given FullHeader.
pub async fn get_receipts(
&self,
full_header: &FullHeader,
) -> anyhow::Result<(HistoryContentKey, HistoryContentValue)> {
// Build receipts
let receipts = match full_header.txs.len() {
0 => Receipts {
receipt_list: vec![],
},
_ => {
self.get_trusted_receipts(&full_header.tx_hashes.hashes)
.await?
}
};

// Validate Receipts
let receipts_root = receipts.root()?;
if receipts_root != full_header.header.receipts_root {
bail!(
"Receipts root doesn't match header receipts root: {receipts_root:?} - {:?}",
full_header.header.receipts_root
);
}
Ok(batch.headers[0].clone())
let content_key = HistoryContentKey::BlockReceipts(BlockReceiptsKey {
block_hash: full_header.header.hash().to_fixed_bytes(),
});
let content_value = HistoryContentValue::Receipts(receipts);
Ok((content_key, content_value))
}

/// Return unvalidated receipts for the given transaction hashes.
async fn get_trusted_receipts(&self, tx_hashes: &[H256]) -> anyhow::Result<Receipts> {
let request: Vec<JsonRequest> = tx_hashes
.iter()
.enumerate()
.map(|(id, tx_hash)| {
let tx_hash = hex_encode(tx_hash);
let params = Params::Array(vec![json!(tx_hash)]);
let method = "eth_getTransactionReceipt".to_string();
JsonRequest::new(method, params, id as u32)
})
.collect();
let response = self.batch_requests(request).await?;
Ok(serde_json::from_str(&response)?)
}

pub async fn get_latest_block_number(&self) -> anyhow::Result<u64> {
let params = Params::Array(vec![json!("latest"), json!(false)]);
let method = "eth_getBlockByNumber".to_string();
let request = JsonRequest::new(method, params, 1);
let response = self.send_request(request).await?;
let result = response
.get("result")
.ok_or_else(|| anyhow!("Unable to fetch latest block"))?;
let header: Header = serde_json::from_value(result.clone())?;
Ok(header.number)
}

/// Used the "surf" library here instead of "ureq" since "surf" is much more capable of handling
Expand Down Expand Up @@ -213,6 +299,30 @@ impl ExecutionApi {
serde_json::from_str::<Vec<Value>>(&result?)
.map_err(|err| anyhow!("Unable to parse execution batch from provider: {err:?}"))
}

async fn send_request(&self, request: JsonRequest) -> anyhow::Result<Value> {
let result = self
.client
.post("")
.middleware(Retry::default())
.body_json(&request)
.map_err(|e| anyhow!("Unable to construct json post request: {e:?}"))?
.recv_string()
.await
.map_err(|err| anyhow!("Unable to request execution payload from provider: {err:?}"));
serde_json::from_str::<Value>(&result?)
.map_err(|err| anyhow!("Unable to parse execution response from provider: {err:?}"))
}
}

/// Create a proof for the given header / epoch acc
async fn construct_proof(
header: Header,
epoch_acc: &EpochAccumulator,
) -> anyhow::Result<HeaderWithProof> {
let proof = MasterAccumulator::construct_proof(&header, epoch_acc)?;
let proof = BlockHeaderProof::AccumulatorProof(AccumulatorProof { proof });
Ok(HeaderWithProof { header, proof })
}

#[derive(Debug)]
Expand All @@ -238,7 +348,7 @@ impl Middleware for Retry {
let body = req.take_body().into_bytes().await?;
while retry_count < self.attempts {
if retry_count > 0 {
info!("Retrying request");
warn!("Retrying request");
}
let mut new_req = req.clone();
new_req.set_body(Body::from_bytes(body.clone()));
Expand Down
Loading

0 comments on commit 37b8cc2

Please sign in to comment.