Skip to content

Commit

Permalink
use try_send rather than send so we dont block the local swarm di…
Browse files Browse the repository at this point in the history
…scovery service
  • Loading branch information
ramfox committed Oct 7, 2024
1 parent 75d8019 commit 7411702
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
2 changes: 2 additions & 0 deletions iroh-net/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ pub trait Discovery: std::fmt::Debug + Send + Sync {
/// until the stream is actually polled. To avoid missing discovered nodes,
/// poll the stream as soon as possible.
///
/// If you do not regularly poll the stream, you may miss discovered nodes.
///
/// Any discovery systems that only discover when explicitly resolving a
/// specific [`NodeId`] do not need to implement this method. Any nodes or
/// addresses that are discovered by calling `resolve` should NOT be added
Expand Down
14 changes: 11 additions & 3 deletions iroh-net/src/discovery/local_swarm_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ use watchable::Watchable;

use iroh_base::key::PublicKey;
use swarm_discovery::{Discoverer, DropGuard, IpClass, Peer};
use tokio::{sync::mpsc, task::JoinSet};
use tokio::{
sync::mpsc::{self, error::TrySendError},
task::JoinSet,
};
use tokio_util::task::AbortOnDropHandle;

use crate::{
Expand Down Expand Up @@ -107,8 +110,13 @@ impl Subscribers {
let mut clean_up = vec![];
for (i, subscriber) in self.0.iter().enumerate() {
// assume subscriber was dropped
if (subscriber.send(item.clone()).await).is_err() {
clean_up.push(i);
if let Err(err) = subscriber.try_send(item.clone()) {
match err {
TrySendError::Full(_) => {
warn!("local swarm discovery subscriber {i} is blocked, dropping item {item:?}")
}
TrySendError::Closed(_) => clean_up.push(i),
}
}
}
for i in clean_up.into_iter().rev() {
Expand Down

0 comments on commit 7411702

Please sign in to comment.