From beb715b56ba3ed34d2f2a8117f3486ff0e0b6fc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Bierlein?= Date: Sat, 4 May 2024 14:40:23 +0200 Subject: [PATCH 1/4] Lower some log levels --- src/p2p.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/p2p.rs b/src/p2p.rs index 29cfd71..337e506 100644 --- a/src/p2p.rs +++ b/src/p2p.rs @@ -433,6 +433,7 @@ impl SwarmController { tracing::debug!("Discovered relay peer: {:?}", info); for addr in info.listen_addrs { + tracing::debug!("Adding relay address: {:?}", addr); relay_manager.add_address(peer_id, addr); } } @@ -453,11 +454,11 @@ impl SwarmController { } _ => {} } - tracing::debug!("Ping event: {:?}", event); + tracing::trace!("Ping event: {:?}", event); } SwarmEvent::Behaviour(MintpoolBehaviourEvent::Relay(event)) => { - tracing::info!("Relay event: {:?}", event); + tracing::debug!("Relay event: {:?}", event); } SwarmEvent::Behaviour(MintpoolBehaviourEvent::Autonat(event)) => { @@ -470,7 +471,7 @@ impl SwarmController { } SwarmEvent::Behaviour(MintpoolBehaviourEvent::Dcutr(event)) => { - tracing::info!("Dcutr event: {:?}", event); + tracing::debug!("Dcutr event: {:?}", event); } other => { From d2dd613ac95c7aea9eaa84d61e9b3d6538cef583 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Bierlein?= Date: Sun, 5 May 2024 13:58:17 +0200 Subject: [PATCH 2/4] Add MultiTicker --- src/multi_ticker.rs | 103 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 src/multi_ticker.rs diff --git a/src/multi_ticker.rs b/src/multi_ticker.rs new file mode 100644 index 0000000..7c19846 --- /dev/null +++ b/src/multi_ticker.rs @@ -0,0 +1,103 @@ +use std::collections::HashMap; + +use std::hash::Hash; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Instant; + +use futures::Stream; +use futures_ticker::Ticker; +use futures_util::stream::select_all; +use futures_util::StreamExt; + +struct MultiTicker { + tickers: HashMap, +} + +struct KeyedStream(T, S); + +impl Stream for KeyedStream +where + T: Copy + Unpin, + S: Stream + Unpin, +{ + type Item = (T, S::Item); + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let KeyedStream(key, stream) = self.get_mut(); + match stream.poll_next_unpin(cx) { + Poll::Ready(Some(item)) => Poll::Ready(Some((*key, item))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } + + fn size_hint(&self) -> (usize, Option) { + (1, None) + } +} + +impl MultiTicker {} + +impl Stream for MultiTicker { + type Item = (T, Instant); + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + select_all( + self.get_mut() + .tickers + .iter_mut() + .map(|(k, v)| KeyedStream(*k, v)), + ) + .poll_next_unpin(cx) + } + + fn size_hint(&self) -> (usize, Option) { + (self.tickers.len(), None) + } +} + +#[cfg(test)] +mod test { + use std::time::Duration; + + use futures::executor::block_on; + + use super::*; + + #[test] + fn test_multi_ticker() { + #[derive(Clone, Copy, Eq, Debug, Hash, PartialEq)] + enum TickerId { + A, + B, + C, + } + + let mut tickers = HashMap::new(); + tickers.insert(TickerId::A, Ticker::new(Duration::from_millis(1000))); + tickers.insert(TickerId::B, Ticker::new(Duration::from_millis(2100))); + tickers.insert(TickerId::C, Ticker::new(Duration::from_millis(3200))); + + let mut multi_ticker = MultiTicker { tickers }; + + let mut ticks = vec![]; + let mut multi_ticker = Pin::new(&mut multi_ticker); + for _ in 0..5 { + ticks.push(block_on(multi_ticker.next()).unwrap()); + } + + println!("{:?}", ticks); + + assert_eq!( + ticks, + vec![ + (TickerId::A, ticks[0].1), // 1000 + (TickerId::A, ticks[1].1), // 2000 + (TickerId::B, ticks[2].1), // 2100 + (TickerId::A, ticks[3].1), // 3000 + (TickerId::C, ticks[4].1), // 3200 + ] + ); + } +} From d7508d934e94660e843bb4f731f85e397610944b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Bierlein?= Date: Sun, 5 May 2024 14:16:27 +0200 Subject: [PATCH 3/4] Provide subscribed topics --- src/p2p.rs | 53 ++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/src/p2p.rs b/src/p2p.rs index 337e506..b5e41d1 100644 --- a/src/p2p.rs +++ b/src/p2p.rs @@ -7,7 +7,7 @@ use futures_ticker::Ticker; use itertools::Itertools; use libp2p::autonat::NatStatus; use libp2p::futures::StreamExt; -use libp2p::gossipsub::Version; +use libp2p::gossipsub::{IdentTopic, Version}; use libp2p::identify::Event; use libp2p::identity::Keypair; use libp2p::kad::store::{MemoryStore, RecordStore}; @@ -216,20 +216,20 @@ impl SwarmController { .listen_on(format!("/ip4/{listen_ip}/tcp/{port}").parse()?)?; let registry_topic = announce_topic(); - self.swarm - .behaviour_mut() - .gossipsub - .subscribe(®istry_topic)?; - - for premint_name in self.premint_names.iter() { - let topic = premint_name.msg_topic(); - self.swarm.behaviour_mut().gossipsub.subscribe(&topic)?; - let claim_topic = premint_name.claims_topic(); - self.swarm - .behaviour_mut() - .gossipsub - .subscribe(&claim_topic)?; - } + self.gossip_subscribe(®istry_topic)?; + + // subscribe to all relevant topics + self.premint_names + .iter() + .flat_map(|name| vec![name.msg_topic(), name.claims_topic()]) + .collect::>() + .iter() + .for_each(|topic| match self.gossip_subscribe(&topic) { + Ok(_) => {} + Err(err) => { + tracing::error!("Error subscribing to topic: {:?}", err); + } + }); self.run_loop().await; Ok(()) @@ -816,6 +816,29 @@ impl SwarmController { Ok(()) } + fn gossip_subscribe(&mut self, topic: &IdentTopic) -> eyre::Result<()> { + tracing::info!("Subscribing to topic: {}", topic.to_string()); + let b = self.swarm.behaviour_mut(); + + b.gossipsub.subscribe(&topic)?; + b.kad.start_providing(Self::topic_to_record_key(&topic))?; + + Ok(()) + } + + fn gossip_unsubscribe(&mut self, topic: &IdentTopic) -> eyre::Result<()> { + let b = self.swarm.behaviour_mut(); + + b.gossipsub.unsubscribe(topic)?; + b.kad.stop_providing(&Self::topic_to_record_key(&topic)); + + Ok(()) + } + + fn topic_to_record_key(topic: &IdentTopic) -> RecordKey { + RecordKey::new(&format!("topic::{}", topic.to_string()).as_bytes()) + } + // Makes a Response for a request to sync from another node async fn make_sync_response( &mut self, From adebd93d05276f0e2135c8b0bfe021f11d9872f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Bierlein?= Date: Sun, 5 May 2024 16:24:31 +0200 Subject: [PATCH 4/4] Implement regular gossip peer discovery --- src/lib.rs | 1 + src/multi_ticker.rs | 27 +++++++++++++++-- src/p2p.rs | 70 +++++++++++++++++++++++++++++++-------------- 3 files changed, 75 insertions(+), 23 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 2ca0d18..516c39e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ pub mod chain_list; pub mod config; pub mod controller; pub mod metrics; +pub mod multi_ticker; pub mod multiaddr_ext; pub mod p2p; pub mod premints; diff --git a/src/multi_ticker.rs b/src/multi_ticker.rs index 7c19846..be9cf01 100644 --- a/src/multi_ticker.rs +++ b/src/multi_ticker.rs @@ -7,13 +7,27 @@ use std::time::Instant; use futures::Stream; use futures_ticker::Ticker; -use futures_util::stream::select_all; +use futures_util::stream::{select_all, FusedStream}; use futures_util::StreamExt; -struct MultiTicker { +pub struct MultiTicker { tickers: HashMap, } +impl MultiTicker +where + T: Copy + Hash + Eq + Unpin + Sized + 'static, +{ + pub fn new(tickers: I) -> Self + where + I: IntoIterator, + { + MultiTicker { + tickers: tickers.into_iter().collect::>(), + } + } +} + struct KeyedStream(T, S); impl Stream for KeyedStream @@ -37,6 +51,15 @@ where } } +impl FusedStream for MultiTicker +where + T: Copy + Hash + Eq + Unpin + Sized + 'static, +{ + fn is_terminated(&self) -> bool { + self.tickers.is_empty() + } +} + impl MultiTicker {} impl Stream for MultiTicker { diff --git a/src/p2p.rs b/src/p2p.rs index b5e41d1..7391a5b 100644 --- a/src/p2p.rs +++ b/src/p2p.rs @@ -1,4 +1,5 @@ use std::borrow::Cow; +use std::collections::HashMap; use std::hash::Hasher; use std::time::Duration; @@ -7,18 +8,17 @@ use futures_ticker::Ticker; use itertools::Itertools; use libp2p::autonat::NatStatus; use libp2p::futures::StreamExt; -use libp2p::gossipsub::{IdentTopic, Version}; +use libp2p::gossipsub::{IdentTopic, TopicHash, Version}; use libp2p::identify::Event; use libp2p::identity::Keypair; use libp2p::kad::store::{MemoryStore, RecordStore}; -use libp2p::kad::GetProvidersOk::FoundProviders; -use libp2p::kad::{Addresses, ProviderRecord, QueryResult, Record, RecordKey}; -use libp2p::multiaddr::{Error, Protocol}; -use libp2p::request_response::{InboundRequestId, Message, ProtocolSupport, ResponseChannel}; +use libp2p::kad::{Addresses, ProviderRecord, RecordKey}; +use libp2p::multiaddr::Protocol; +use libp2p::request_response::{InboundRequestId, Message, ProtocolSupport}; use libp2p::swarm::behaviour::toggle::Toggle; use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; use libp2p::swarm::DialError::DialPeerConditionFalse; -use libp2p::swarm::{ConnectionId, NetworkBehaviour, NetworkInfo, SwarmEvent, ToSwarm}; +use libp2p::swarm::{ConnectionId, NetworkBehaviour, NetworkInfo, SwarmEvent}; use libp2p::{ autonat, dcutr, gossipsub, kad, noise, relay, request_response, tcp, yamux, Multiaddr, PeerId, StreamProtocol, @@ -30,6 +30,7 @@ use tokio::select; use crate::config::Config; use crate::controller::{P2PEvent, SwarmCommand}; +use crate::multi_ticker::MultiTicker; use crate::multiaddr_ext::MultiaddrExt; use crate::storage::QueryOptions; use crate::types::{ @@ -51,12 +52,18 @@ pub struct MintpoolBehaviour { dcutr: dcutr::Behaviour, } +#[derive(Copy, Clone, PartialEq, Eq, Hash)] +enum SwarmTickers { + Bootstrap, + DiscoverGossipPeers, +} + pub struct SwarmController { swarm: libp2p::Swarm, command_receiver: tokio::sync::mpsc::Receiver, event_sender: tokio::sync::mpsc::Sender, premint_names: Vec, - bootstrap_ticker: Ticker, + tickers: MultiTicker, config: Config, } @@ -94,11 +101,17 @@ impl SwarmController { event_sender, config: config.clone(), premint_names: config.premint_names(), - bootstrap_ticker: Ticker::new( - // spec suggests to run bootstrap every 5 minutes - // first time bootstrap will trigger after first connection - Duration::from_secs(60 * 5), - ), + tickers: MultiTicker::new(vec![ + ( + // documentation suggests bootstrapping every 5 minutes + SwarmTickers::Bootstrap, + Ticker::new(Duration::from_secs(60 * 5)), + ), + ( + SwarmTickers::DiscoverGossipPeers, + Ticker::new(Duration::from_secs(60)), + ), + ]), } } @@ -245,14 +258,27 @@ impl SwarmController { } } event = self.swarm.select_next_some() => self.handle_swarm_event(event).await, - _tick = self.bootstrap_ticker.next() => { - match self.swarm.behaviour_mut().kad.bootstrap() { - Ok(_) => {} - Err(err) => { - tracing::error!("Error bootstrapping kad: {:?}", err); + tick = self.tickers.select_next_some() => { + match tick { + (SwarmTickers::Bootstrap, _) => { + match self.swarm.behaviour_mut().kad.bootstrap() { + Ok(_) => {} + Err(err) => { + tracing::error!("Error bootstrapping kad: {:?}", err); + } + } + } + (SwarmTickers::DiscoverGossipPeers, _) => { + let b = self.swarm.behaviour_mut(); + + b.gossipsub.topics().for_each(|topic| { + // kad will automatically dial all discovered peers, + // gossipsub will automatically sync topics with new peers + b.kad.get_providers(Self::topic_to_record_key(topic)); + }); } } - }, + } } } } @@ -821,7 +847,8 @@ impl SwarmController { let b = self.swarm.behaviour_mut(); b.gossipsub.subscribe(&topic)?; - b.kad.start_providing(Self::topic_to_record_key(&topic))?; + b.kad + .start_providing(Self::topic_to_record_key(&topic.hash()))?; Ok(()) } @@ -830,12 +857,13 @@ impl SwarmController { let b = self.swarm.behaviour_mut(); b.gossipsub.unsubscribe(topic)?; - b.kad.stop_providing(&Self::topic_to_record_key(&topic)); + b.kad + .stop_providing(&Self::topic_to_record_key(&topic.hash())); Ok(()) } - fn topic_to_record_key(topic: &IdentTopic) -> RecordKey { + fn topic_to_record_key(topic: &TopicHash) -> RecordKey { RecordKey::new(&format!("topic::{}", topic.to_string()).as_bytes()) }