Skip to content

Commit

Permalink
feat: add state peertests (ethereum#1210)
Browse files Browse the repository at this point in the history
  • Loading branch information
morph-dev authored Mar 13, 2024
1 parent d0281da commit 34ce5cc
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 34 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ethportal-peertest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ jsonrpsee = {version="0.20.0", features = ["async-client", "client", "macros", "
rand = "0.8.4"
reth-ipc = { tag = "v0.1.0-alpha.10", git = "https://github.com/paradigmxyz/reth.git"}
rpc = { path = "../rpc" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.89"
serde_yaml = "0.9"
tempfile = "3.3.0"
Expand Down
1 change: 1 addition & 0 deletions ethportal-peertest/src/scenarios/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ pub mod find;
pub mod gossip;
pub mod offer_accept;
pub mod paginate;
pub mod state;
pub mod utp;
pub mod validation;
97 changes: 97 additions & 0 deletions ethportal-peertest/src/scenarios/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use crate::{
utils::{
fixtures_state_account_trie_node, fixtures_state_contract_bytecode,
fixtures_state_contract_storage_trie_node, fixtures_state_recursive_gossip,
wait_for_state_content, StateFixture,
},
Peertest, PeertestNode,
};
use ethportal_api::{
jsonrpsee::async_client::Client, types::content_value::state::TrieNode, StateContentValue,
StateNetworkApiClient,
};
use tracing::info;

pub async fn test_state_offer_account_trie_node(peertest: &Peertest, target: &Client) {
for fixture in fixtures_state_account_trie_node() {
info!(
"Testing offering AccountTrieNode for key: {:?}",
fixture.content_data.key
);
test_state_offer(&fixture, target, &peertest.bootnode).await;
}
}

pub async fn test_state_gossip_contract_storage_trie_node(peertest: &Peertest, target: &Client) {
for fixture in fixtures_state_contract_storage_trie_node() {
info!(
"Testing offering ContractStorageTrieNode for key: {:?}",
fixture.content_data.key
);
test_state_offer(&fixture, target, &peertest.bootnode).await;
}
}

pub async fn test_state_gossip_contract_bytecode(peertest: &Peertest, target: &Client) {
for fixture in fixtures_state_contract_bytecode() {
info!(
"Testing offering ContractBytecode for key: {:?}",
fixture.content_data.key
);
test_state_offer(&fixture, target, &peertest.bootnode).await;
}
}

async fn test_state_offer(fixture: &StateFixture, target: &Client, peer: &PeertestNode) {
target
.offer(
peer.enr.clone(),
fixture.content_data.key.clone(),
Some(fixture.content_data.offer_value.clone()),
)
.await
.unwrap();

let lookup_content_value =
wait_for_state_content(&peer.ipc_client, fixture.content_data.key.clone()).await;
assert_eq!(lookup_content_value, fixture.content_data.lookup_value);
}

pub async fn test_state_recursive_gossip(peertest: &Peertest, target: &Client) {
let _ = target.ping(peertest.bootnode.enr.clone()).await.unwrap();

for fixture in fixtures_state_recursive_gossip().unwrap() {
let (first_key, first_value) = &fixture.key_value_pairs.first().unwrap();
info!(
"Testing recursive gossip starting with key: {:?}",
first_key
);

target
.gossip(first_key.clone(), first_value.clone())
.await
.unwrap();

// Verify that every key/value is fully propagated
for (key, value) in fixture.key_value_pairs {
let expected_lookup_trie_node = match value {
StateContentValue::AccountTrieNodeWithProof(value) => {
value.proof.last().unwrap().clone()
}
StateContentValue::ContractStorageTrieNodeWithProof(value) => {
value.storage_proof.last().unwrap().clone()
}
_ => panic!("Unexpected state content value: {value:?}"),
};
let expected_lookup_value = StateContentValue::TrieNode(TrieNode {
node: expected_lookup_trie_node,
});

assert_eq!(
wait_for_state_content(target, key.clone()).await,
expected_lookup_value,
"Expecting lookup for {key:?} to return {expected_lookup_value:?}"
);
}
}
}
176 changes: 142 additions & 34 deletions ethportal-peertest/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,60 +1,76 @@
use std::fs;

use futures::{Future, TryFutureExt};
use tracing::error;

use anyhow::Result;
use serde_yaml::Value;
use std::fs;
use ureq::serde::Deserialize;

use ethportal_api::{
BeaconContentKey, BeaconContentValue, BeaconNetworkApiClient, HistoryContentKey,
HistoryContentValue, HistoryNetworkApiClient,
HistoryContentValue, HistoryNetworkApiClient, StateContentKey, StateContentValue,
StateNetworkApiClient,
};

/// Wait for the history content to be transferred
pub async fn wait_for_history_content<P: HistoryNetworkApiClient + std::marker::Sync>(
ipc_client: &P,
content_key: HistoryContentKey,
) -> HistoryContentValue {
let mut received_content_value = ipc_client.local_content(content_key.clone()).await;

let mut counter = 0;

pub async fn wait_for_successful_result<Fut, O>(f: impl Fn() -> Fut) -> O
where
Fut: Future<Output = Result<O>>,
{
// If content is absent an error will be returned.
while counter < 5 {
let message = match received_content_value {
for counter in 0..60 {
let result = f().await;
match result {
Ok(val) => return val,
Err(e) => format!("received an error {e}"),
Err(e) => {
if counter > 0 {
error!("Retry attempt #{counter} after 0.5s, because we received an error {e}")
}
}
};
error!("Retrying after 0.5s, because {message}");
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
received_content_value = ipc_client.local_content(content_key.clone()).await;
counter += 1;
}

received_content_value.unwrap()
panic!("Retried too many times");
}

/// Wait for the history content to be transferred
pub async fn wait_for_history_content<P: HistoryNetworkApiClient + std::marker::Sync>(
ipc_client: &P,
content_key: HistoryContentKey,
) -> HistoryContentValue {
wait_for_successful_result(|| {
ipc_client
.local_content(content_key.clone())
.map_err(anyhow::Error::from)
})
.await
}

/// Wait for the beacon content to be transferred
pub async fn wait_for_beacon_content<P: BeaconNetworkApiClient + std::marker::Sync>(
ipc_client: &P,
content_key: BeaconContentKey,
) -> BeaconContentValue {
let mut received_content_value = ipc_client.local_content(content_key.clone()).await;

let mut counter = 0;

// If content is absent an error will be returned.
while counter < 60 {
let message = match received_content_value {
Ok(val) => return val,
Err(e) => format!("received an error {e}"),
};
counter += 1;
error!("Retry attempt #{counter} in 0.5s to find beacon content, because {message}");
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
received_content_value = ipc_client.local_content(content_key.clone()).await;
}
wait_for_successful_result(|| {
ipc_client
.local_content(content_key.clone())
.map_err(anyhow::Error::from)
})
.await
}

received_content_value.unwrap()
/// Wait for the state content to be transferred
pub async fn wait_for_state_content<P: StateNetworkApiClient + std::marker::Sync>(
ipc_client: &P,
content_key: StateContentKey,
) -> StateContentValue {
wait_for_successful_result(|| {
ipc_client
.local_content(content_key.clone())
.map_err(anyhow::Error::from)
})
.await
}

fn read_history_content_key_value(
Expand Down Expand Up @@ -101,3 +117,95 @@ pub fn fixture_block_body() -> (HistoryContentKey, HistoryContentValue) {
pub fn fixture_receipts() -> (HistoryContentKey, HistoryContentValue) {
read_fixture("portal-spec-tests/tests/mainnet/history/receipts/14764013.yaml")
}

#[derive(Debug, Clone, Deserialize)]
pub struct StateContentData {
#[serde(rename = "content_key")]
pub key: StateContentKey,
#[serde(rename = "content_value_offer")]
pub offer_value: StateContentValue,
#[serde(rename = "content_value_retrieval")]
pub lookup_value: StateContentValue,
}

#[derive(Debug, Clone)]
pub struct StateFixture {
pub content_data: StateContentData,
pub recursive_gossip: Option<StateContentData>,
}

fn read_state_fixture_from_file(file_name: &str) -> Result<Vec<StateFixture>> {
let yaml_content = fs::read_to_string(file_name)?;
let value: Value = serde_yaml::from_str(&yaml_content)?;

let mut result = vec![];

for fixture in value.as_sequence().unwrap() {
result.push(StateFixture {
content_data: StateContentData::deserialize(fixture)?,
recursive_gossip: fixture.get("recursive_gossip").and_then(|value| {
if value.is_null() {
None
} else {
Some(StateContentData::deserialize(value).unwrap())
}
}),
});
}
Ok(result)
}

fn read_state_fixture(file_name: &str) -> Vec<StateFixture> {
read_state_fixture_from_file(file_name)
.unwrap_or_else(|err| panic!("Error reading fixture: {err}"))
}

pub fn fixtures_state_account_trie_node() -> Vec<StateFixture> {
read_state_fixture("portal-spec-tests/tests/mainnet/state/validation/account_trie_node.yaml")
}

pub fn fixtures_state_contract_storage_trie_node() -> Vec<StateFixture> {
read_state_fixture(
"portal-spec-tests/tests/mainnet/state/validation/contract_storage_trie_node.yaml",
)
}

pub fn fixtures_state_contract_bytecode() -> Vec<StateFixture> {
read_state_fixture("portal-spec-tests/tests/mainnet/state/validation/contract_bytecode.yaml")
}

pub struct StateRecursiveGossipFixture {
pub key_value_pairs: Vec<(StateContentKey, StateContentValue)>,
}

pub fn fixtures_state_recursive_gossip() -> Result<Vec<StateRecursiveGossipFixture>> {
let yaml_content = fs::read_to_string(
"portal-spec-tests/tests/mainnet/state/validation/recursive_gossip.yaml",
)?;
let value: Value = serde_yaml::from_str(&yaml_content)?;

let mut result = vec![];

for fixture in value.as_sequence().unwrap() {
result.push(StateRecursiveGossipFixture {
key_value_pairs: fixture
.as_sequence()
.unwrap()
.iter()
.map(|key_value_container| {
let key = StateContentKey::deserialize(
key_value_container.get("content_key").unwrap(),
)
.unwrap();
let value = StateContentValue::deserialize(
key_value_container.get("content_value").unwrap(),
)
.unwrap();
(key, value)
})
.collect(),
})
}

Ok(result)
}
37 changes: 37 additions & 0 deletions tests/self_peertest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,43 @@ async fn peertest_trace_recursive_utp() {
handle.stop().unwrap();
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn peertest_state_offer_account_trie_node() {
let (peertest, target, handle) = setup_peertest().await;
peertest::scenarios::state::test_state_offer_account_trie_node(&peertest, &target).await;
peertest.exit_all_nodes();
handle.stop().unwrap();
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn peertest_state_offer_contract_storage_trie_node() {
let (peertest, target, handle) = setup_peertest().await;
peertest::scenarios::state::test_state_gossip_contract_storage_trie_node(&peertest, &target)
.await;
peertest.exit_all_nodes();
handle.stop().unwrap();
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn peertest_state_offer_contract_bytecode() {
let (peertest, target, handle) = setup_peertest().await;
peertest::scenarios::state::test_state_gossip_contract_bytecode(&peertest, &target).await;
peertest.exit_all_nodes();
handle.stop().unwrap();
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn peertest_state_recursive_gossip() {
let (peertest, target, handle) = setup_peertest().await;
peertest::scenarios::state::test_state_recursive_gossip(&peertest, &target).await;
peertest.exit_all_nodes();
handle.stop().unwrap();
}

async fn setup_peertest() -> (peertest::Peertest, Client, RpcServerHandle) {
utils::init_tracing();
// Run a client, as a buddy peer for ping tests, etc.
Expand Down

0 comments on commit 34ce5cc

Please sign in to comment.