Skip to content

Commit

Permalink
Check DHT for gossip peers (#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
ligustah authored May 5, 2024
2 parents 25ed823 + adebd93 commit 8554250
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 35 deletions.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod chain_list;
pub mod config;
pub mod controller;
pub mod metrics;
pub mod multi_ticker;
pub mod multiaddr_ext;
pub mod p2p;
pub mod premints;
Expand Down
126 changes: 126 additions & 0 deletions src/multi_ticker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use std::collections::HashMap;

use std::hash::Hash;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;

use futures::Stream;
use futures_ticker::Ticker;
use futures_util::stream::{select_all, FusedStream};
use futures_util::StreamExt;

pub struct MultiTicker<T: Copy + Hash + Eq + Unpin + 'static> {
tickers: HashMap<T, Ticker>,
}

impl<T> MultiTicker<T>
where
T: Copy + Hash + Eq + Unpin + Sized + 'static,
{
pub fn new<I>(tickers: I) -> Self
where
I: IntoIterator<Item = (T, Ticker)>,
{
MultiTicker {
tickers: tickers.into_iter().collect::<HashMap<_, _>>(),
}
}
}

struct KeyedStream<T, S>(T, S);

impl<T, S> Stream for KeyedStream<T, S>
where
T: Copy + Unpin,
S: Stream + Unpin,
{
type Item = (T, S::Item);

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let KeyedStream(key, stream) = self.get_mut();
match stream.poll_next_unpin(cx) {
Poll::Ready(Some(item)) => Poll::Ready(Some((*key, item))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(1, None)
}
}

impl<T> FusedStream for MultiTicker<T>
where
T: Copy + Hash + Eq + Unpin + Sized + 'static,
{
fn is_terminated(&self) -> bool {
self.tickers.is_empty()
}
}

impl<T: Copy + Hash + Eq + Unpin + 'static> MultiTicker<T> {}

impl<T: Copy + Hash + Eq + Unpin + 'static> Stream for MultiTicker<T> {
type Item = (T, Instant);

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
select_all(
self.get_mut()
.tickers
.iter_mut()
.map(|(k, v)| KeyedStream(*k, v)),
)
.poll_next_unpin(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.tickers.len(), None)
}
}

#[cfg(test)]
mod test {
use std::time::Duration;

use futures::executor::block_on;

use super::*;

#[test]
fn test_multi_ticker() {
#[derive(Clone, Copy, Eq, Debug, Hash, PartialEq)]
enum TickerId {
A,
B,
C,
}

let mut tickers = HashMap::new();
tickers.insert(TickerId::A, Ticker::new(Duration::from_millis(1000)));
tickers.insert(TickerId::B, Ticker::new(Duration::from_millis(2100)));
tickers.insert(TickerId::C, Ticker::new(Duration::from_millis(3200)));

let mut multi_ticker = MultiTicker { tickers };

let mut ticks = vec![];
let mut multi_ticker = Pin::new(&mut multi_ticker);
for _ in 0..5 {
ticks.push(block_on(multi_ticker.next()).unwrap());
}

println!("{:?}", ticks);

assert_eq!(
ticks,
vec![
(TickerId::A, ticks[0].1), // 1000
(TickerId::A, ticks[1].1), // 2000
(TickerId::B, ticks[2].1), // 2100
(TickerId::A, ticks[3].1), // 3000
(TickerId::C, ticks[4].1), // 3200
]
);
}
}
122 changes: 87 additions & 35 deletions src/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::hash::Hasher;
use std::time::Duration;

Expand All @@ -7,18 +8,17 @@ use futures_ticker::Ticker;
use itertools::Itertools;
use libp2p::autonat::NatStatus;
use libp2p::futures::StreamExt;
use libp2p::gossipsub::Version;
use libp2p::gossipsub::{IdentTopic, TopicHash, Version};
use libp2p::identify::Event;
use libp2p::identity::Keypair;
use libp2p::kad::store::{MemoryStore, RecordStore};
use libp2p::kad::GetProvidersOk::FoundProviders;
use libp2p::kad::{Addresses, ProviderRecord, QueryResult, Record, RecordKey};
use libp2p::multiaddr::{Error, Protocol};
use libp2p::request_response::{InboundRequestId, Message, ProtocolSupport, ResponseChannel};
use libp2p::kad::{Addresses, ProviderRecord, RecordKey};
use libp2p::multiaddr::Protocol;
use libp2p::request_response::{InboundRequestId, Message, ProtocolSupport};
use libp2p::swarm::behaviour::toggle::Toggle;
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
use libp2p::swarm::DialError::DialPeerConditionFalse;
use libp2p::swarm::{ConnectionId, NetworkBehaviour, NetworkInfo, SwarmEvent, ToSwarm};
use libp2p::swarm::{ConnectionId, NetworkBehaviour, NetworkInfo, SwarmEvent};
use libp2p::{
autonat, dcutr, gossipsub, kad, noise, relay, request_response, tcp, yamux, Multiaddr, PeerId,
StreamProtocol,
Expand All @@ -30,6 +30,7 @@ use tokio::select;

use crate::config::Config;
use crate::controller::{P2PEvent, SwarmCommand};
use crate::multi_ticker::MultiTicker;
use crate::multiaddr_ext::MultiaddrExt;
use crate::storage::QueryOptions;
use crate::types::{
Expand All @@ -51,12 +52,18 @@ pub struct MintpoolBehaviour {
dcutr: dcutr::Behaviour,
}

#[derive(Copy, Clone, PartialEq, Eq, Hash)]
enum SwarmTickers {
Bootstrap,
DiscoverGossipPeers,
}

pub struct SwarmController {
swarm: libp2p::Swarm<MintpoolBehaviour>,
command_receiver: tokio::sync::mpsc::Receiver<SwarmCommand>,
event_sender: tokio::sync::mpsc::Sender<P2PEvent>,
premint_names: Vec<PremintName>,
bootstrap_ticker: Ticker,
tickers: MultiTicker<SwarmTickers>,
config: Config,
}

Expand Down Expand Up @@ -94,11 +101,17 @@ impl SwarmController {
event_sender,
config: config.clone(),
premint_names: config.premint_names(),
bootstrap_ticker: Ticker::new(
// spec suggests to run bootstrap every 5 minutes
// first time bootstrap will trigger after first connection
Duration::from_secs(60 * 5),
),
tickers: MultiTicker::new(vec![
(
// documentation suggests bootstrapping every 5 minutes
SwarmTickers::Bootstrap,
Ticker::new(Duration::from_secs(60 * 5)),
),
(
SwarmTickers::DiscoverGossipPeers,
Ticker::new(Duration::from_secs(60)),
),
]),
}
}

Expand Down Expand Up @@ -216,20 +229,20 @@ impl SwarmController {
.listen_on(format!("/ip4/{listen_ip}/tcp/{port}").parse()?)?;

let registry_topic = announce_topic();
self.swarm
.behaviour_mut()
.gossipsub
.subscribe(&registry_topic)?;

for premint_name in self.premint_names.iter() {
let topic = premint_name.msg_topic();
self.swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
let claim_topic = premint_name.claims_topic();
self.swarm
.behaviour_mut()
.gossipsub
.subscribe(&claim_topic)?;
}
self.gossip_subscribe(&registry_topic)?;

// subscribe to all relevant topics
self.premint_names
.iter()
.flat_map(|name| vec![name.msg_topic(), name.claims_topic()])
.collect::<Vec<IdentTopic>>()
.iter()
.for_each(|topic| match self.gossip_subscribe(&topic) {
Ok(_) => {}
Err(err) => {
tracing::error!("Error subscribing to topic: {:?}", err);
}
});

self.run_loop().await;
Ok(())
Expand All @@ -245,14 +258,27 @@ impl SwarmController {
}
}
event = self.swarm.select_next_some() => self.handle_swarm_event(event).await,
_tick = self.bootstrap_ticker.next() => {
match self.swarm.behaviour_mut().kad.bootstrap() {
Ok(_) => {}
Err(err) => {
tracing::error!("Error bootstrapping kad: {:?}", err);
tick = self.tickers.select_next_some() => {
match tick {
(SwarmTickers::Bootstrap, _) => {
match self.swarm.behaviour_mut().kad.bootstrap() {
Ok(_) => {}
Err(err) => {
tracing::error!("Error bootstrapping kad: {:?}", err);
}
}
}
(SwarmTickers::DiscoverGossipPeers, _) => {
let b = self.swarm.behaviour_mut();

b.gossipsub.topics().for_each(|topic| {
// kad will automatically dial all discovered peers,
// gossipsub will automatically sync topics with new peers
b.kad.get_providers(Self::topic_to_record_key(topic));
});
}
}
},
}
}
}
}
Expand Down Expand Up @@ -433,6 +459,7 @@ impl SwarmController {
tracing::debug!("Discovered relay peer: {:?}", info);

for addr in info.listen_addrs {
tracing::debug!("Adding relay address: {:?}", addr);
relay_manager.add_address(peer_id, addr);
}
}
Expand All @@ -453,11 +480,11 @@ impl SwarmController {
}
_ => {}
}
tracing::debug!("Ping event: {:?}", event);
tracing::trace!("Ping event: {:?}", event);
}

SwarmEvent::Behaviour(MintpoolBehaviourEvent::Relay(event)) => {
tracing::info!("Relay event: {:?}", event);
tracing::debug!("Relay event: {:?}", event);
}

SwarmEvent::Behaviour(MintpoolBehaviourEvent::Autonat(event)) => {
Expand All @@ -470,7 +497,7 @@ impl SwarmController {
}

SwarmEvent::Behaviour(MintpoolBehaviourEvent::Dcutr(event)) => {
tracing::info!("Dcutr event: {:?}", event);
tracing::debug!("Dcutr event: {:?}", event);
}

other => {
Expand Down Expand Up @@ -815,6 +842,31 @@ impl SwarmController {
Ok(())
}

fn gossip_subscribe(&mut self, topic: &IdentTopic) -> eyre::Result<()> {
tracing::info!("Subscribing to topic: {}", topic.to_string());
let b = self.swarm.behaviour_mut();

b.gossipsub.subscribe(&topic)?;
b.kad
.start_providing(Self::topic_to_record_key(&topic.hash()))?;

Ok(())
}

fn gossip_unsubscribe(&mut self, topic: &IdentTopic) -> eyre::Result<()> {
let b = self.swarm.behaviour_mut();

b.gossipsub.unsubscribe(topic)?;
b.kad
.stop_providing(&Self::topic_to_record_key(&topic.hash()));

Ok(())
}

fn topic_to_record_key(topic: &TopicHash) -> RecordKey {
RecordKey::new(&format!("topic::{}", topic.to_string()).as_bytes())
}

// Makes a Response for a request to sync from another node
async fn make_sync_response(
&mut self,
Expand Down

0 comments on commit 8554250

Please sign in to comment.