diff --git a/iroh-net/src/discovery.rs b/iroh-net/src/discovery.rs index e3848a7db5..0cbfa285d0 100644 --- a/iroh-net/src/discovery.rs +++ b/iroh-net/src/discovery.rs @@ -158,6 +158,8 @@ pub trait Discovery: std::fmt::Debug + Send + Sync { /// until the stream is actually polled. To avoid missing discovered nodes, /// poll the stream as soon as possible. /// + /// If you do not regularly poll the stream, you may miss discovered nodes. + /// /// Any discovery systems that only discover when explicitly resolving a /// specific [`NodeId`] do not need to implement this method. Any nodes or /// addresses that are discovered by calling `resolve` should NOT be added diff --git a/iroh-net/src/discovery/local_swarm_discovery.rs b/iroh-net/src/discovery/local_swarm_discovery.rs index 655b82ab26..a0ce2e941d 100644 --- a/iroh-net/src/discovery/local_swarm_discovery.rs +++ b/iroh-net/src/discovery/local_swarm_discovery.rs @@ -47,7 +47,10 @@ use watchable::Watchable; use iroh_base::key::PublicKey; use swarm_discovery::{Discoverer, DropGuard, IpClass, Peer}; -use tokio::{sync::mpsc, task::JoinSet}; +use tokio::{ + sync::mpsc::{self, error::TrySendError}, + task::JoinSet, +}; use tokio_util::task::AbortOnDropHandle; use crate::{ @@ -103,12 +106,21 @@ impl Subscribers { /// Sends the `node_id` and `item` to each subscriber. /// /// Cleans up any subscribers that have been dropped. - async fn send(&mut self, item: DiscoveryItem) { + fn send(&mut self, item: DiscoveryItem) { let mut clean_up = vec![]; for (i, subscriber) in self.0.iter().enumerate() { // assume subscriber was dropped - if (subscriber.send(item.clone()).await).is_err() { - clean_up.push(i); + if let Err(err) = subscriber.try_send(item.clone()) { + match err { + TrySendError::Full(_) => { + warn!( + ?item, + idx = i, + "local swarm discovery subscriber is blocked, dropping item" + ) + } + TrySendError::Closed(_) => clean_up.push(i), + } } } for i in clean_up.into_iter().rev() { @@ -236,7 +248,7 @@ impl LocalSwarmDiscovery { // in other words, nodes sent to the `subscribers` should only be the ones that // have been "passively" discovered if !resolved { - subscribers.send(item).await; + subscribers.send(item); } } Message::Resolve(node_id, sender) => {