diff --git a/src/config.rs b/src/config.rs index 3ad5d0a..d0881de 100644 --- a/src/config.rs +++ b/src/config.rs @@ -231,6 +231,7 @@ mod test { admin_api_secret: None, rate_limit_rps: 1, boot_nodes: BootNodes::Chain, + sync_lookback_hours: 0, }; let names = config.premint_names(); @@ -259,6 +260,7 @@ mod test { admin_api_secret: None, rate_limit_rps: 1, boot_nodes: BootNodes::None, + sync_lookback_hours: 0, }; let names = config.premint_names(); diff --git a/src/controller.rs b/src/controller.rs index 16cb434..9e9c3e7 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -81,8 +81,7 @@ pub struct Controller { external_commands: mpsc::Receiver, store: PremintStorage, rules: RulesEngine, - trusted_peers: Vec, - inclusion_mode: ChainInclusionMode, + config: Config, } @@ -101,8 +100,6 @@ impl Controller { external_commands, store, rules, - trusted_peers: config.trusted_peers(), - inclusion_mode: config.chain_inclusion_mode, config, } } @@ -229,7 +226,7 @@ impl Controller { tracing::debug!("Marked as seen onchain {:?}", claim.clone()); } - if self.inclusion_mode == ChainInclusionMode::Check { + if self.config.chain_inclusion_mode == ChainInclusionMode::Check { if let Err(err) = self .swarm_command_sender .send(SwarmCommand::SendOnchainMintFound(claim)) @@ -263,7 +260,7 @@ impl Controller { } async fn handle_event_onchain_claim(&self, peer_claim: PeerInclusionClaim) -> eyre::Result<()> { - match self.inclusion_mode { + match self.config.chain_inclusion_mode { ChainInclusionMode::Check | ChainInclusionMode::Verify => { let claim = peer_claim.claim; let premint = self @@ -286,7 +283,11 @@ impl Controller { } } ChainInclusionMode::Trust => { - if self.trusted_peers.contains(&peer_claim.from_peer_id) { + if self + .config + .trusted_peers() + .contains(&peer_claim.from_peer_id) + { self.store .mark_seen_on_chain(peer_claim.claim.clone()) .await?; @@ -304,26 +305,6 @@ impl Controller { } } } - - /// temporary solution for full state from a known other node via their http api. - /// We should migrate to syncing based on peer_id & libp2p request_response - async fn api_sync(&self, api_url: String) -> eyre::Result<()> { - let seconds = self.config.sync_lookback_hours * 60 * 60; - let from_time = SystemTime::now().sub(Duration::from_secs(seconds)); - - let from_time = chrono::Utc::now().sub(chrono::Duration::hours( - self.config.sync_lookback_hours as i64, - )); - - let url = reqwest::Url::parse_with_params( - api_url.as_str(), - &[("from", serde_json::to_string(&from_time)?)], - )?; - - // reqwest::get() - - Ok(()) - } } #[derive(Clone)] diff --git a/src/p2p.rs b/src/p2p.rs index 192ad6a..a93b238 100644 --- a/src/p2p.rs +++ b/src/p2p.rs @@ -15,17 +15,16 @@ use libp2p::kad::store::MemoryStore; use libp2p::kad::GetProvidersOk::FoundProviders; use libp2p::kad::{Addresses, QueryResult, RecordKey}; use libp2p::multiaddr::Protocol; -use libp2p::request_response::{Message, ProtocolSupport}; +use libp2p::request_response::{InboundRequestId, Message, ProtocolSupport, ResponseChannel}; use libp2p::swarm::{ConnectionId, NetworkBehaviour, NetworkInfo, SwarmEvent}; use libp2p::{ gossipsub, kad, noise, request_response, tcp, yamux, Multiaddr, PeerId, StreamProtocol, }; +use serde::{Deserialize, Serialize}; use sha256::digest; use std::hash::Hasher; use std::time::Duration; use tokio::select; -use tower::load::pending_requests; -use tower_http::request_id; #[derive(NetworkBehaviour)] pub struct MintpoolBehaviour { @@ -33,7 +32,7 @@ pub struct MintpoolBehaviour { kad: kad::Behaviour, identify: libp2p::identify::Behaviour, ping: libp2p::ping::Behaviour, - request_response: request_response::cbor::Behaviour>, + request_response: request_response::cbor::Behaviour, } pub struct SwarmController { @@ -301,7 +300,12 @@ impl SwarmController { } SwarmEvent::Behaviour(MintpoolBehaviourEvent::RequestResponse(event)) => { - self.handle_request_response_event(event).await + match self.handle_request_response_event(event).await { + Ok(_) => {} + Err(err) => { + tracing::error!("Error handling request response event: {:?}", err); + } + } } SwarmEvent::Dialing { peer_id, .. } => { @@ -570,7 +574,7 @@ impl SwarmController { async fn handle_request_response_event( &mut self, - event: request_response::Event>, + event: request_response::Event, ) -> eyre::Result<()> { match event { request_response::Event::Message { peer, message } => match message { @@ -579,23 +583,12 @@ impl SwarmController { request, channel, } => { - tracing::info!( - request_id = request_id.to_string(), - "processing request for sync" - ); - let (snd, recv) = tokio::sync::oneshot::channel(); - self.event_sender - .send(P2PEvent::SyncRequest { - query: request, - channel: snd, - }) - .await?; - let result = recv.await??; + let resp = self.make_sync_response(request_id, request).await; self.swarm .behaviour_mut() .request_response - .send_response(channel, result) - .map_err(|e| eyre::eyre!("Error sending response: {:?}", e))? + .send_response(channel, resp) + .map_err(|e| eyre::eyre!("Error sending response: {:?}", e))?; } Message::Response { request_id, @@ -605,15 +598,69 @@ impl SwarmController { request_id = request_id.to_string(), "received response for sync" ); - self.event_sender - .send(P2PEvent::SyncResponse { premints: response }) - .await?; + match response { + SyncResponse::Premints(premints) => { + self.event_sender + .send(P2PEvent::SyncResponse { premints }) + .await?; + } + SyncResponse::Error(err) => { + tracing::error!( + request_id = request_id.to_string(), + error = err, + "error received to our sync request" + ); + } + } } }, other => tracing::info!("mintpool sync request/response event: {:?}", other), } Ok(()) } + + // Makes a Response for a request to sync from another node + async fn make_sync_response( + &mut self, + request_id: InboundRequestId, + request: QueryOptions, + ) -> SyncResponse { + tracing::info!( + request_id = request_id.to_string(), + "processing request for sync" + ); + match self.make_sync_response_query(request).await { + Ok(premints) => SyncResponse::Premints(premints), + Err(err) => { + tracing::error!( + request_id = request_id.to_string(), + error = err.to_string(), + "error processing sync request" + ); + SyncResponse::Error(err.to_string()) + } + } + } + + // inner function to make propagating errors that occur during query easier to work with + async fn make_sync_response_query( + &mut self, + request: QueryOptions, + ) -> eyre::Result> { + let (snd, recv) = tokio::sync::oneshot::channel(); + self.event_sender + .send(P2PEvent::SyncRequest { + query: request, + channel: snd, + }) + .await + .map_err(|_| eyre::eyre!("Controller error"))?; + let result = recv + .await + .map_err(|_| eyre::eyre!("Channel error"))? + .map_err(|_| eyre::eyre!("Query error"))?; + Ok(result) + } } fn gossipsub_message_id(message: &gossipsub::Message) -> gossipsub::MessageId { @@ -643,6 +690,12 @@ pub struct NetworkState { pub all_external_addresses: Vec, } +#[derive(Debug, Serialize, Deserialize)] +pub enum SyncResponse { + Premints(Vec), + Error(String), +} + fn announce_topic() -> gossipsub::IdentTopic { gossipsub::IdentTopic::new("mintpool::announce") } diff --git a/tests/api_test.rs b/tests/api_test.rs index afdf137..59742dd 100644 --- a/tests/api_test.rs +++ b/tests/api_test.rs @@ -29,7 +29,7 @@ mod api_test { async fn make_test_router(config: &Config) -> Router { let mut rules = RulesEngine::new(config); rules.add_default_rules(); - let ctl = start_p2p_services(config, rules).await.unwrap(); + let ctl = start_p2p_services(config.clone(), rules).await.unwrap(); let router = api::router_with_defaults(config); let state = AppState::from(config, ctl.clone()).await; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 03a06a0..f9a9f36 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -44,7 +44,7 @@ pub mod mintpool_build { let config = make_config(start_port + i, peer_limit); let ctl = mintpool::run::start_p2p_services( - &config, + config.clone(), RulesEngine::new_with_default_rules(&config), ) .await diff --git a/tests/e2e_test.rs b/tests/e2e_test.rs index 607d8a6..bcdb328 100644 --- a/tests/e2e_test.rs +++ b/tests/e2e_test.rs @@ -57,7 +57,7 @@ async fn test_zora_premint_v2_e2e() { // set this so CHAINS will use the anvil rpc rather than the one in chains.json env::set_var("CHAIN_7777777_RPC_WSS", anvil.ws_endpoint()); - let ctl = run::start_p2p_services(&config, RulesEngine::new_with_default_rules(&config)) + let ctl = run::start_p2p_services(config.clone(), RulesEngine::new_with_default_rules(&config)) .await .unwrap(); run::start_watch_chain::(&config, ctl.clone()).await; @@ -208,14 +208,20 @@ async fn test_verify_e2e() { // Start 3 nodes, one in check, one in verify, one in trust (trusts node 1) // ============================================================================================ - let ctl1 = run::start_p2p_services(&config1, RulesEngine::new_with_default_rules(&config1)) - .await - .unwrap(); + let ctl1 = run::start_p2p_services( + config1.clone(), + RulesEngine::new_with_default_rules(&config1), + ) + .await + .unwrap(); run::start_watch_chain::(&config1, ctl1.clone()).await; - let ctl2 = run::start_p2p_services(&config2, RulesEngine::new_with_default_rules(&config2)) - .await - .unwrap(); + let ctl2 = run::start_p2p_services( + config2.clone(), + RulesEngine::new_with_default_rules(&config2), + ) + .await + .unwrap(); let node_info = ctl1.get_node_info().await.unwrap(); @@ -225,9 +231,12 @@ async fn test_verify_e2e() { config3.chain_inclusion_mode = ChainInclusionMode::Trust; config3.trusted_peers = Some(node_info.peer_id.to_string()); - let ctl3 = run::start_p2p_services(&config3, RulesEngine::new_with_default_rules(&config3)) - .await - .unwrap(); + let ctl3 = run::start_p2p_services( + config3.clone(), + RulesEngine::new_with_default_rules(&config3), + ) + .await + .unwrap(); connect_all_to_first(vec![ctl1.clone(), ctl2.clone(), ctl3.clone()]).await;