From 444fae102d99f22496a06d58cf06d2f3fd541424 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Wed, 12 Jul 2023 11:14:15 -0500 Subject: [PATCH] fix: add test retries and gc msg valid check --- Cargo.lock | 1 + one-shot/Cargo.toml | 3 +- poi-radio/Cargo.toml | 3 +- poi-radio/src/config.rs | 3 +- poi-radio/src/messages/mod.rs | 59 ----------------------------------- poi-radio/src/operator/mod.rs | 54 ++++++++++++++++++++++++++++++-- poi-radio/src/state.rs | 35 +++++++++++---------- test-runner/Cargo.toml | 3 +- test-runner/src/topics.rs | 28 +++++++++++------ test-sender/Cargo.toml | 3 +- test-sender/src/main.rs | 13 +++++--- test-utils/Cargo.toml | 3 +- test-utils/src/mock_server.rs | 4 +-- 13 files changed, 107 insertions(+), 105 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e05914c..41106e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2294,6 +2294,7 @@ dependencies = [ [[package]] name = "graphcast-sdk" version = "0.3.4" +source = "git+https://github.com/graphops/graphcast-sdk?rev=99d942e#99d942edebb70e2c148c2d0a2008344060107903" dependencies = [ "anyhow", "async-graphql", diff --git a/one-shot/Cargo.toml b/one-shot/Cargo.toml index 8d1ed7a..a17332c 100644 --- a/one-shot/Cargo.toml +++ b/one-shot/Cargo.toml @@ -10,8 +10,7 @@ keywords = ["graphprotocol", "data-integrity", "Indexer", "waku", "p2p"] categories = ["network-programming", "web-programming::http-client"] [dependencies] -# graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "f39cb28" } -graphcast-sdk = { package = "graphcast-sdk", path = "../../graphcast-rs" } +graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "99d942e" } poi-radio = { path = "../poi-radio" } prost = "0.11" once_cell = "1.17" diff --git a/poi-radio/Cargo.toml b/poi-radio/Cargo.toml index ba6957f..6f69c94 100644 --- a/poi-radio/Cargo.toml +++ b/poi-radio/Cargo.toml @@ -10,8 +10,7 @@ keywords = ["graphprotocol", "data-integrity", "Indexer", "waku", "p2p"] categories = ["network-programming", "web-programming::http-client"] [dependencies] -# graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "f39cb28" } -graphcast-sdk = { package = "graphcast-sdk", path = "../../graphcast-rs" } +graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "99d942e" } prost = "0.11" once_cell = "1.17" chrono = "0.4" diff --git a/poi-radio/src/config.rs b/poi-radio/src/config.rs index aea976b..0a79c63 100644 --- a/poi-radio/src/config.rs +++ b/poi-radio/src/config.rs @@ -18,7 +18,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashSet; use tracing::{debug, info, trace}; -use crate::state::PersistedState; +use crate::state::{panic_hook, PersistedState}; use crate::{active_allocation_hashes, syncing_deployment_hashes}; #[derive(clap::ValueEnum, Clone, Debug, Serialize, Deserialize, Default)] @@ -400,6 +400,7 @@ impl Config { "Loaded Persisted state cache" ); + panic_hook(path); state } else { debug!("Created new state"); diff --git a/poi-radio/src/messages/mod.rs b/poi-radio/src/messages/mod.rs index a04439a..b3d3ad2 100644 --- a/poi-radio/src/messages/mod.rs +++ b/poi-radio/src/messages/mod.rs @@ -1,61 +1,2 @@ -use graphcast_sdk::graphcast_agent::{ - message_typing::GraphcastMessage, waku_handling::WakuHandlingError, -}; -use poi::PublicPoiMessage; -use std::{ - any::Any, - sync::{mpsc, Mutex as SyncMutex}, -}; -use tracing::{error, trace}; -use upgrade::VersionUpgradeMessage; - pub mod poi; pub mod upgrade; - -#[derive(Debug, Clone, serde_derive::Deserialize, serde_derive::Serialize)] -pub enum MessageType { - PublicPoi(GraphcastMessage), - VersionUpgrade(GraphcastMessage), -} - -pub fn typed_handler(sender: SyncMutex>, msg: &dyn Any) { - if let Some(Ok(ppoi_message)) = - msg.downcast_ref::, WakuHandlingError>>() - { - trace!( - ppoi_message = tracing::field::debug(&ppoi_message), - "Received Graphcast validated message" - ); - - // let id = ppoi_message.identifier.clone(); - // VALIDATED_MESSAGES.with_label_values(&[&id]).inc(); - - match sender - .lock() - .unwrap() - .send(MessageType::PublicPoi(ppoi_message.clone())) - { - Ok(_) => trace!("Sent received message to radio operator"), - Err(e) => error!("Could not send message to channel: {:#?}", e), - } - } else if let Some(Ok(upgrade_message)) = - msg.downcast_ref::, WakuHandlingError>>() - { - trace!( - upgrade_message = tracing::field::debug(&upgrade_message), - "Received Graphcast validated message" - ); - - // let id = upgrade_message.identifier.clone(); - // VALIDATED_MESSAGES.with_label_values(&[&id]).inc(); - - match sender - .lock() - .unwrap() - .send(MessageType::VersionUpgrade(upgrade_message.clone())) - { - Ok(_) => trace!("Sent upgrade message to radio operator"), - Err(e) => error!("Could not send message to channel: {:#?}", e), - } - } -} diff --git a/poi-radio/src/operator/mod.rs b/poi-radio/src/operator/mod.rs index e6da11f..a6119bf 100644 --- a/poi-radio/src/operator/mod.rs +++ b/poi-radio/src/operator/mod.rs @@ -9,7 +9,11 @@ use tracing::{debug, error, info, trace, warn}; use graphcast_sdk::{ build_wallet, - graphcast_agent::{message_typing::GraphcastMessage, GraphcastAgent}, + graphcast_agent::{ + message_typing::{check_message_validity, GraphcastMessage}, + waku_handling::WakuHandlingError, + GraphcastAgent, + }, graphql::client_graph_node::{subgraph_network_blocks, update_network_chainheads}, }; @@ -117,6 +121,7 @@ impl RadioOperator { let state_ref = persisted_state.clone(); let upgrade_notifier = notifier.clone(); let graph_node = config.graph_node_endpoint().clone(); + // try message format in order of PublicPOIMessage, VersionUpgradeMessage tokio::spawn(async move { for msg in receiver { @@ -124,11 +129,35 @@ impl RadioOperator { let agent = GRAPHCAST_AGENT .get() .expect("Could not retrieve Graphcast agent"); + let id_validation = agent.id_validation.clone(); + let callbook = agent.callbook.clone(); + let nonces = agent.nonces.clone(); + let local_sender = agent.graphcast_identity.graphcast_id.clone(); if let Ok(msg) = agent.decoder::(msg.payload()).await { trace!( message = tracing::field::debug(&msg), - "Parsed and validated as Public PoI message", + "Parseable as Public PoI message, now validate", ); + let msg = match check_message_validity( + msg, + &nonces, + callbook.clone(), + local_sender.clone(), + &id_validation, + ) + .await + .map_err(|e| WakuHandlingError::InvalidMessage(e.to_string())) + { + Ok(msg) => msg, + Err(e) => { + debug!( + err = tracing::field::debug(e), + "Failed to validate by Graphcast" + ); + continue; + } + }; + let identifier = msg.identifier.clone(); let is_valid = msg.payload.validity_check(&msg, &graph_node).await; @@ -150,8 +179,27 @@ impl RadioOperator { { trace!( message = tracing::field::debug(&msg), - "Parsed and validated as Version Upgrade message", + "Parseable as Version Upgrade message, now validate", ); + let msg = match check_message_validity( + msg, + &nonces, + callbook.clone(), + local_sender.clone(), + &id_validation, + ) + .await + .map_err(|e| WakuHandlingError::InvalidMessage(e.to_string())) + { + Ok(msg) => msg, + Err(e) => { + debug!( + err = tracing::field::debug(e), + "Failed to validate by Graphcast" + ); + continue; + } + }; let is_valid = msg.payload.validity_check(&msg, &graph_node).await; if let Ok(payload) = is_valid { diff --git a/poi-radio/src/state.rs b/poi-radio/src/state.rs index 566cf25..0a8b953 100644 --- a/poi-radio/src/state.rs +++ b/poi-radio/src/state.rs @@ -1,14 +1,16 @@ use serde::{Deserialize, Serialize}; +use std::panic::PanicInfo; use std::path::Path; -use std::fs; +use std::str::FromStr; use std::sync::{Arc, Mutex as SyncMutex}; use std::{ collections::HashMap, fs::{remove_file, File}, io::{BufReader, Write}, }; +use std::{fs, panic}; use tracing::{info, trace, warn}; use graphcast_sdk::graphcast_agent::message_typing::GraphcastMessage; @@ -17,6 +19,7 @@ use crate::operator::attestation::{ clear_local_attestation, ComparisonResult, ComparisonResultType, }; use crate::operator::notifier::Notifier; +use crate::RADIO_OPERATOR; use crate::{messages::poi::PublicPoiMessage, operator::attestation::Attestation}; @@ -283,21 +286,21 @@ impl PersistedState { } // TODO: panic hook for updating the cache file before exiting the program -// /// Set up panic hook to store persisted state -// pub fn panic_hook<'a>(file_path: &str) { -// let path = String::from_str(file_path).expect("Invalid file path provided"); -// panic::set_hook(Box::new(move |panic_info| panic_cache(panic_info, &path))); -// } - -// pub fn panic_cache(panic_info: &PanicInfo<'_>, file_path: &str) { -// RADIO_OPERATOR -// .get() -// .unwrap() -// .state() -// .update_cache(file_path); -// // Log panic information and program state -// eprintln!("Panic occurred! Panic info: {:?}", panic_info); -// } +/// Set up panic hook to store persisted state +pub fn panic_hook(file_path: &str) { + let path = String::from_str(file_path).expect("Invalid file path provided"); + panic::set_hook(Box::new(move |panic_info| panic_cache(panic_info, &path))); +} + +pub fn panic_cache(panic_info: &PanicInfo<'_>, file_path: &str) { + RADIO_OPERATOR + .get() + .unwrap() + .state() + .update_cache(file_path); + // Log panic information and program state + eprintln!("Panic occurred! Panic info: {:?}", panic_info); +} #[cfg(test)] mod tests { diff --git a/test-runner/Cargo.toml b/test-runner/Cargo.toml index e8e53f8..4cd1ff3 100644 --- a/test-runner/Cargo.toml +++ b/test-runner/Cargo.toml @@ -23,8 +23,7 @@ categories = [ [dependencies] waku = { version = "0.1.1", package = "waku-bindings" } test-utils = { path = "../test-utils" } -# graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "f39cb28" } -graphcast-sdk = { package = "graphcast-sdk", path = "../../graphcast-rs" } +graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "99d942e" } poi-radio = { path = "../poi-radio" } tokio = { version = "1.1.1", features = ["full", "rt"] } tracing = "0.1" diff --git a/test-runner/src/topics.rs b/test-runner/src/topics.rs index e4e901c..357cc1e 100644 --- a/test-runner/src/topics.rs +++ b/test-runner/src/topics.rs @@ -108,16 +108,16 @@ pub async fn topics_test() { tokio::time::sleep(Duration::from_secs(50)).await; - let persisted_state = PersistedState::load_cache(&store_path); - debug!("persisted state {:?}", persisted_state); - - let remote_messages = persisted_state.remote_messages(); - let test_hash = "QmonlyintestsenderXyZABCdeFgHIjklMNOpqrstuvWXYZabcdEFG"; - let has_test_hash = remote_messages - .iter() - .any(|msg| msg.identifier == test_hash); - + let mut has_test_hash = test_result(&store_path, test_hash); + + let max_test_attempts = 3; + let mut num_test_attempts = 0; + while num_test_attempts < max_test_attempts && !has_test_hash { + tokio::time::sleep(Duration::from_secs(config.topic_update_interval + 1)).await; + has_test_hash = test_result(&store_path, test_hash); + num_test_attempts += 1; + } assert!( has_test_hash, "Expected remote message not found with identifier {}", @@ -126,3 +126,13 @@ pub async fn topics_test() { teardown(process_manager, &store_path); } + +fn test_result(store_path: &str, test_hash: &str) -> bool { + let persisted_state = PersistedState::load_cache(store_path); + debug!("persisted state {:?}", persisted_state); + + let remote_messages = persisted_state.remote_messages(); + remote_messages + .iter() + .any(|msg| msg.identifier == test_hash) +} diff --git a/test-sender/Cargo.toml b/test-sender/Cargo.toml index 76594aa..b012fa5 100644 --- a/test-sender/Cargo.toml +++ b/test-sender/Cargo.toml @@ -22,8 +22,7 @@ categories = [ [dependencies] waku = { version = "0.1.1", package = "waku-bindings" } -# graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "f39cb28" } -graphcast-sdk = { package = "graphcast-sdk", path = "../../graphcast-rs" } +graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "99d942e" } test-utils = { path = "../test-utils" } poi-radio = { path = "../poi-radio" } tokio = { version = "1.1.1", features = ["full", "rt"] } diff --git a/test-sender/src/main.rs b/test-sender/src/main.rs index 19be9f7..2e0bf96 100644 --- a/test-sender/src/main.rs +++ b/test-sender/src/main.rs @@ -71,9 +71,6 @@ async fn start_sender(config: TestSenderConfig) { let pubsub_topic = WakuPubSubTopic::from_str(pubsub_topic_str).unwrap(); loop { for topic in config.topics.clone() { - let timestamp = Utc::now().timestamp(); - let timestamp = (timestamp + 9) / 10 * 10; - let nodes = gather_nodes(vec![], &pubsub_topic); // Connect to peers on the filter protocol connect_multiaddresses(nodes, &node_handle, ProtocolId::Filter); @@ -81,7 +78,13 @@ async fn start_sender(config: TestSenderConfig) { let content_topic = format!("/{}/0/{}/proto", config.radio_name, topic); let content_topic = WakuContentTopic::from_str(&content_topic).unwrap(); - // let nonce = config.nonce.clone().unwrap().parse::().unwrap(); + let timestamp = + if let Some(n) = config.nonce.clone().and_then(|x| x.parse::().ok()) { + n + } else { + Utc::now().timestamp() + }; + let block_number = (timestamp + 9) / 10 * 10; let radio_payload_clone = config.radio_payload.clone(); match radio_payload_clone.as_deref() { @@ -91,7 +94,7 @@ async fn start_sender(config: TestSenderConfig) { config.poi.clone().unwrap(), timestamp, NetworkName::Goerli, - timestamp.try_into().unwrap(), + block_number.try_into().unwrap(), config.block_hash.clone().unwrap(), "0x7e6528e4ce3055e829a32b5dc4450072bac28bc6".to_string(), ); diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index 14b0cb3..fe3fe8e 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -22,8 +22,7 @@ categories = [ [dependencies] waku = { version = "0.1.1", package = "waku-bindings" } -# graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "f39cb28" } -graphcast-sdk = { package = "graphcast-sdk", path = "../../graphcast-rs" } +graphcast-sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", rev = "99d942e" } poi-radio = { path = "../poi-radio" } tokio = { version = "1.1.1", features = ["full", "rt"] } tracing = "0.1" diff --git a/test-utils/src/mock_server.rs b/test-utils/src/mock_server.rs index a766c34..67e6f29 100644 --- a/test-utils/src/mock_server.rs +++ b/test-utils/src/mock_server.rs @@ -59,7 +59,7 @@ pub async fn start_mock_server( async fn handler_graphql(subgraphs: Arc>>) -> Result { let timestamp = Utc::now().timestamp(); - let timestamp = (timestamp + 9) / 10 * 10; + let block_number = (timestamp + 9) / 10 * 10; let subgraphs = subgraphs.lock().await; // Prepare indexingStatuses part of the response dynamically from the subgraphs vector @@ -67,7 +67,7 @@ async fn handler_graphql(subgraphs: Arc>>) -> Result