diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 9d7eba6b64..a4cf2f26ee 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -1600,7 +1600,7 @@ impl>, msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, except_node: Option<&PublicKey>) { + /// Forwards a gossip `msg` to `peers` excluding node(s) that generated the gossip message and + /// excluding `except_node`. + /// + /// If the message queue for a peer is somewhat full, the message will not be forwarded to them + /// unless `allow_large_buffer` is set, in which case the message will be treated as critical + /// and delivered no matter the available buffer space. + fn forward_broadcast_msg( + &self, peers: &HashMap>, + msg: &wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage>, + except_node: Option<&PublicKey>, allow_large_buffer: bool, + ) { match msg { wire::Message::ChannelAnnouncement(ref msg) => { log_gossip!(self.logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}", except_node, msg); @@ -1961,7 +1971,7 @@ impl match peers.get(&descriptor) { - Some(peer_mutex) => { - let peer_lock = peer_mutex.lock().unwrap(); - if !peer_lock.handshake_complete() { - continue; + None + } else { + let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned(); + match descriptor_opt { + Some(descriptor) => match peers.get(&descriptor) { + Some(peer_mutex) => { + let peer_lock = peer_mutex.lock().unwrap(); + if !peer_lock.handshake_complete() { + None + } else { + Some(peer_lock) + } + }, + None => { + debug_assert!(false, "Inconsistent peers set state!"); + None } - peer_lock }, None => { - debug_assert!(false, "Inconsistent peers set state!"); - continue; - } - }, - None => { - continue; - }, + None + }, + } } } } } - for event in events_generated.drain(..) { + + // Handles a `MessageSendEvent`, using `from_chan_handler` to decide if we should + // robustly gossip broadcast events even if a peer's message buffer is full. + let mut handle_event = |event, from_chan_handler| { match event { MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.common_fields.temporary_channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.common_fields.temporary_channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.common_fields.temporary_channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.common_fields.temporary_channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id), None), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", @@ -2130,107 +2145,107 @@ impl { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendFundingSigned event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendChannelReady { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReady event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendStfu { ref node_id, ref msg} => { let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None); log_debug!(logger, "Handling SendStfu event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); } MessageSendEvent::SendSpliceInit { ref node_id, ref msg} => { let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None); log_debug!(logger, "Handling SendSpliceInit event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); } MessageSendEvent::SendSpliceAck { ref node_id, ref msg} => { let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None); log_debug!(logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); } MessageSendEvent::SendSpliceLocked { ref node_id, ref msg} => { let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None); log_debug!(logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); } MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddInput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxComplete { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxComplete event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxSignatures event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxAbort { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAbort event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(commitment_signed.channel_id), None), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", @@ -2239,7 +2254,7 @@ impl { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendClosingSigned event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendShutdown { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling Shutdown event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}", log_pubkey!(node_id), msg.contents.short_channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), update_msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, update_msg); }, MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => { log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); match self.message_handler.route_handler.handle_channel_announcement(None, &msg) { - Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => - self.forward_broadcast_msg(peers, &wire::Message::ChannelAnnouncement(msg), None), + Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => { + let forward = wire::Message::ChannelAnnouncement(msg); + self.forward_broadcast_msg(peers, &forward, None, from_chan_handler); + }, _ => {}, } if let Some(msg) = update_msg { match self.message_handler.route_handler.handle_channel_update(None, &msg) { - Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => - self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None), + Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => { + let forward = wire::Message::ChannelUpdate(msg); + self.forward_broadcast_msg(peers, &forward, None, from_chan_handler); + }, _ => {}, } } @@ -2306,23 +2325,27 @@ impl { log_debug!(self.logger, "Handling BroadcastChannelUpdate event in peer_handler for contents {:?}", msg.contents); match self.message_handler.route_handler.handle_channel_update(None, &msg) { - Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => - self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None), + Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => { + let forward = wire::Message::ChannelUpdate(msg); + self.forward_broadcast_msg(peers, &forward, None, from_chan_handler); + }, _ => {}, } }, MessageSendEvent::BroadcastNodeAnnouncement { msg } => { log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id); match self.message_handler.route_handler.handle_node_announcement(None, &msg) { - Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => - self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None), + Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => { + let forward = wire::Message::NodeAnnouncement(msg); + self.forward_broadcast_msg(peers, &forward, None, from_chan_handler); + }, _ => {}, } }, MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => { log_trace!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}", log_pubkey!(node_id), msg.contents.short_channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::HandleError { node_id, action } => { let logger = WithContext::from(&self.logger, Some(node_id), None, None); @@ -2360,21 +2383,21 @@ impl { log_given_level!(logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), msg.data); - self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id)?, msg); }, } }, MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => { - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => { - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); } MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => { log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}", @@ -2383,17 +2406,24 @@ impl { - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); } } + Some(()) + }; + for event in chan_events { + handle_event(event, true); + } + for event in route_events { + handle_event(event, false); } for (node_id, msg) in self.message_handler.custom_message_handler.get_and_clear_pending_msg() { if peers_to_disconnect.get(&node_id).is_some() { continue; } - self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg); + self.enqueue_message(&mut *if let Some(peer) = get_peer_for_forwarding!(&node_id) { peer } else { continue; }, &msg); } for (descriptor, peer_mutex) in peers.iter() { @@ -2680,7 +2710,7 @@ impl bool { #[cfg(test)] mod tests { + use super::*; + use crate::sign::{NodeSigner, Recipient}; use crate::events; use crate::io; use crate::ln::types::ChannelId; use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::peer_channel_encryptor::PeerChannelEncryptor; - use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses, ErroringMessageHandler, MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER}; use crate::ln::{msgs, wire}; use crate::ln::msgs::{Init, LightningError, SocketAddress}; use crate::util::test_utils; use bitcoin::Network; use bitcoin::constants::ChainHash; - use bitcoin::secp256k1::{PublicKey, SecretKey}; + use bitcoin::secp256k1::{PublicKey, SecretKey, Secp256k1}; use crate::sync::{Arc, Mutex}; use core::convert::Infallible; - use core::sync::atomic::{AtomicBool, Ordering}; + use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; #[allow(unused_imports)] use crate::prelude::*; @@ -2724,6 +2755,7 @@ mod tests { #[derive(Clone)] struct FileDescriptor { fd: u16, + hang_writes: Arc, outbound_data: Arc>>, disconnect: Arc, } @@ -2741,13 +2773,28 @@ mod tests { impl SocketDescriptor for FileDescriptor { fn send_data(&mut self, data: &[u8], _resume_read: bool) -> usize { - self.outbound_data.lock().unwrap().extend_from_slice(data); - data.len() + if self.hang_writes.load(Ordering::Acquire) { + 0 + } else { + self.outbound_data.lock().unwrap().extend_from_slice(data); + data.len() + } } fn disconnect_socket(&mut self) { self.disconnect.store(true, Ordering::Release); } } + impl FileDescriptor { + fn new(fd: u16) -> Self { + Self { + fd, + hang_writes: Arc::new(AtomicBool::new(false)), + outbound_data: Arc::new(Mutex::new(Vec::new())), + disconnect: Arc::new(AtomicBool::new(false)), + } + } + } + struct PeerManagerCfg { chan_handler: test_utils::TestChannelMessageHandler, routing_handler: test_utils::TestRoutingMessageHandler, @@ -2798,7 +2845,7 @@ mod tests { cfgs.push( PeerManagerCfg{ chan_handler: test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet)), - logger: test_utils::TestLogger::new(), + logger: test_utils::TestLogger::with_id(i.to_string()), routing_handler: test_utils::TestRoutingMessageHandler::new(), custom_handler: TestCustomMessageHandler { features }, node_signer: test_utils::TestNodeSigner::new(node_secret), @@ -2868,20 +2915,19 @@ mod tests { } fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { + static FD_COUNTER: AtomicUsize = AtomicUsize::new(0); + let fd = FD_COUNTER.fetch_add(1, Ordering::Relaxed) as u16; + let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap(); - let mut fd_a = FileDescriptor { - fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), - disconnect: Arc::new(AtomicBool::new(false)), - }; + let mut fd_a = FileDescriptor::new(fd); let addr_a = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1000}; + let id_b = peer_b.node_signer.get_node_id(Recipient::Node).unwrap(); let features_a = peer_a.init_features(id_b); let features_b = peer_b.init_features(id_a); - let mut fd_b = FileDescriptor { - fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), - disconnect: Arc::new(AtomicBool::new(false)), - }; + let mut fd_b = FileDescriptor::new(fd); let addr_b = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1001}; + let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap(); peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap(); assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false); @@ -2925,15 +2971,9 @@ mod tests { let mut ctr = 0; while start_time.elapsed() < std::time::Duration::from_secs(1) { let id_a = peers[0].node_signer.get_node_id(Recipient::Node).unwrap(); - let mut fd_a = FileDescriptor { - fd: $id + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())), - disconnect: Arc::new(AtomicBool::new(false)), - }; + let mut fd_a = FileDescriptor::new($id + ctr * 3); let addr_a = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1000}; - let mut fd_b = FileDescriptor { - fd: $id + ctr * 3, outbound_data: Arc::new(Mutex::new(Vec::new())), - disconnect: Arc::new(AtomicBool::new(false)), - }; + let mut fd_b = FileDescriptor::new($id + ctr * 3); let addr_b = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1001}; let initial_data = peers[1].new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap(); peers[0].new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap(); @@ -2997,15 +3037,9 @@ mod tests { let peer_pairs = [(&peers[0], &incompatible_peers[0]), (&incompatible_peers[1], &peers[1])]; for (peer_a, peer_b) in peer_pairs.iter() { let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap(); - let mut fd_a = FileDescriptor { - fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), - disconnect: Arc::new(AtomicBool::new(false)), - }; + let mut fd_a = FileDescriptor::new(1); let addr_a = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1000}; - let mut fd_b = FileDescriptor { - fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), - disconnect: Arc::new(AtomicBool::new(false)), - }; + let mut fd_b = FileDescriptor::new(1); let addr_b = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1001}; let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap(); peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap(); @@ -3033,15 +3067,9 @@ mod tests { let peer_pairs = [(&peers[0], &incompatible_peers[0]), (&incompatible_peers[1], &peers[1])]; for (peer_a, peer_b) in peer_pairs.iter() { let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap(); - let mut fd_a = FileDescriptor { - fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), - disconnect: Arc::new(AtomicBool::new(false)), - }; + let mut fd_a = FileDescriptor::new(1); let addr_a = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1000}; - let mut fd_b = FileDescriptor { - fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), - disconnect: Arc::new(AtomicBool::new(false)), - }; + let mut fd_b = FileDescriptor::new(1); let addr_b = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1001}; let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap(); peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap(); @@ -3115,10 +3143,7 @@ mod tests { let cfgs = create_peermgr_cfgs(2); let peers = create_network(2, &cfgs); - let mut fd_dup = FileDescriptor { - fd: 3, outbound_data: Arc::new(Mutex::new(Vec::new())), - disconnect: Arc::new(AtomicBool::new(false)), - }; + let mut fd_dup = FileDescriptor::new(3); let addr_dup = SocketAddress::TcpIpV4{addr: [127, 0, 0, 1], port: 1003}; let id_a = cfgs[0].node_signer.get_node_id(Recipient::Node).unwrap(); peers[0].new_inbound_connection(fd_dup.clone(), Some(addr_dup.clone())).unwrap(); @@ -3176,6 +3201,8 @@ mod tests { let cfgs = create_peermgr_cfgs(2); cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release); cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release); + cfgs[0].routing_handler.announcement_available_for_sync.store(true, Ordering::Release); + cfgs[1].routing_handler.announcement_available_for_sync.store(true, Ordering::Release); let peers = create_network(2, &cfgs); // By calling establish_connect, we trigger do_attempt_write_data between @@ -3222,14 +3249,8 @@ mod tests { let peers = create_network(2, &cfgs); let a_id = peers[0].node_signer.get_node_id(Recipient::Node).unwrap(); - let mut fd_a = FileDescriptor { - fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), - disconnect: Arc::new(AtomicBool::new(false)), - }; - let mut fd_b = FileDescriptor { - fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), - disconnect: Arc::new(AtomicBool::new(false)), - }; + let mut fd_a = FileDescriptor::new(1); + let mut fd_b = FileDescriptor::new(1); let initial_data = peers[1].new_outbound_connection(a_id, fd_b.clone(), None).unwrap(); peers[0].new_inbound_connection(fd_a.clone(), None).unwrap(); @@ -3275,14 +3296,8 @@ mod tests { }, 0, &[1; 32], &logger, &node_signer_b); let a_id = node_signer_a.get_node_id(Recipient::Node).unwrap(); - let mut fd_a = FileDescriptor { - fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), - disconnect: Arc::new(AtomicBool::new(false)), - }; - let mut fd_b = FileDescriptor { - fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())), - disconnect: Arc::new(AtomicBool::new(false)), - }; + let mut fd_a = FileDescriptor::new(1); + let mut fd_b = FileDescriptor::new(1); // Exchange messages with both peers until they both complete the init handshake. let act_one = peer_b.new_outbound_connection(a_id, fd_b.clone(), None).unwrap(); @@ -3351,6 +3366,79 @@ mod tests { assert_eq!(peer_b.peers.read().unwrap().len(), 0); } + #[test] + fn test_gossip_flood_pause() { + use crate::routing::test_utils::channel_announcement; + use lightning_types::features::ChannelFeatures; + + // Simple test which connects two nodes to a PeerManager and checks that if we run out of + // socket buffer space we'll stop forwarding gossip but still push our own gossip. + let cfgs = create_peermgr_cfgs(2); + let peers = create_network(2, &cfgs); + let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]); + + macro_rules! drain_queues { () => { + loop { + peers[0].process_events(); + peers[1].process_events(); + + let msg = fd_a.outbound_data.lock().unwrap().split_off(0); + if !msg.is_empty() { + assert_eq!(peers[1].read_event(&mut fd_b, &msg).unwrap(), false); + continue; + } + let msg = fd_b.outbound_data.lock().unwrap().split_off(0); + if !msg.is_empty() { + assert_eq!(peers[0].read_event(&mut fd_a, &msg).unwrap(), false); + continue; + } + break; + } + } } + + // First, make sure all pending messages have been processed and queues drained. + drain_queues!(); + + let secp_ctx = Secp256k1::new(); + let key = SecretKey::from_slice(&[1; 32]).unwrap(); + let msg = channel_announcement(&key, &key, ChannelFeatures::empty(), 42, &secp_ctx); + let msg_ev = MessageSendEvent::BroadcastChannelAnnouncement { + msg, + update_msg: None, + }; + + fd_a.hang_writes.store(true, Ordering::Relaxed); + + // Now push an arbitrarily large number of messages and check that only + // `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP` messages end up in the queue. + for _ in 0..OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP * 2 { + cfgs[0].routing_handler.pending_events.lock().unwrap().push(msg_ev.clone()); + peers[0].process_events(); + } + + assert_eq!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(), + OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP); + + // Check that if a broadcast message comes in from the channel handler (i.e. it is an + // announcement for our own channel), it gets queued anyway. + cfgs[0].chan_handler.pending_events.lock().unwrap().push(msg_ev); + peers[0].process_events(); + assert_eq!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(), + OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 1); + + // Finally, deliver all the messages and make sure we got the right count. Note that there + // was an extra message that had already moved from the broadcast queue to the encrypted + // message queue so we actually receive `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2` messages. + fd_a.hang_writes.store(false, Ordering::Relaxed); + cfgs[1].routing_handler.chan_anns_recvd.store(0, Ordering::Relaxed); + peers[0].write_buffer_space_avail(&mut fd_a).unwrap(); + + drain_queues!(); + assert!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.is_empty()); + assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Relaxed), + OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2); + } + #[test] fn test_filter_addresses(){ // Tests the filter_addresses function. diff --git a/lightning/src/routing/test_utils.rs b/lightning/src/routing/test_utils.rs index a64955cd01..d85371479f 100644 --- a/lightning/src/routing/test_utils.rs +++ b/lightning/src/routing/test_utils.rs @@ -27,13 +27,11 @@ use crate::sync::{self, Arc}; use crate::routing::gossip::NodeId; -// Using the same keys for LN and BTC ids -pub(crate) fn add_channel( - gossip_sync: &P2PGossipSync>>, Arc, Arc>, - secp_ctx: &Secp256k1, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64 -) { - let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, node_1_privkey); - let node_id_1 = NodeId::from_pubkey(&node_1_pubkey); +pub(crate) fn channel_announcement( + node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, + short_channel_id: u64, secp_ctx: &Secp256k1, +) -> ChannelAnnouncement { + let node_id_1 = NodeId::from_pubkey(&PublicKey::from_secret_key(&secp_ctx, node_1_privkey)); let node_id_2 = NodeId::from_pubkey(&PublicKey::from_secret_key(&secp_ctx, node_2_privkey)); let unsigned_announcement = UnsignedChannelAnnouncement { @@ -48,13 +46,23 @@ pub(crate) fn add_channel( }; let msghash = hash_to_message!(&Sha256dHash::hash(&unsigned_announcement.encode()[..])[..]); - let valid_announcement = ChannelAnnouncement { + ChannelAnnouncement { node_signature_1: secp_ctx.sign_ecdsa(&msghash, node_1_privkey), node_signature_2: secp_ctx.sign_ecdsa(&msghash, node_2_privkey), bitcoin_signature_1: secp_ctx.sign_ecdsa(&msghash, node_1_privkey), bitcoin_signature_2: secp_ctx.sign_ecdsa(&msghash, node_2_privkey), contents: unsigned_announcement.clone(), - }; + } +} + +// Using the same keys for LN and BTC ids +pub(crate) fn add_channel( + gossip_sync: &P2PGossipSync>>, Arc, Arc>, + secp_ctx: &Secp256k1, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64 +) { + let valid_announcement = + channel_announcement(node_1_privkey, node_2_privkey, features, short_channel_id, secp_ctx); + let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, node_1_privkey); match gossip_sync.handle_channel_announcement(Some(node_1_pubkey), &valid_announcement) { Ok(res) => assert!(res), _ => panic!() @@ -108,7 +116,7 @@ pub(crate) fn update_channel( pub(super) fn get_nodes(secp_ctx: &Secp256k1) -> (SecretKey, PublicKey, Vec, Vec) { let privkeys: Vec = (2..22).map(|i| { - SecretKey::from_slice(&>::from_hex(&format!("{:02x}", i).repeat(32)).unwrap()[..]).unwrap() + SecretKey::from_slice(&[i; 32]).unwrap() }).collect(); let pubkeys = privkeys.iter().map(|secret| PublicKey::from_secret_key(&secp_ctx, secret)).collect(); diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 12e027d32f..8bcb7bbc7f 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -986,6 +986,7 @@ pub struct TestRoutingMessageHandler { pub chan_anns_recvd: AtomicUsize, pub pending_events: Mutex>, pub request_full_sync: AtomicBool, + pub announcement_available_for_sync: AtomicBool, } impl TestRoutingMessageHandler { @@ -995,27 +996,32 @@ impl TestRoutingMessageHandler { chan_anns_recvd: AtomicUsize::new(0), pending_events: Mutex::new(vec![]), request_full_sync: AtomicBool::new(false), + announcement_available_for_sync: AtomicBool::new(false), } } } impl msgs::RoutingMessageHandler for TestRoutingMessageHandler { fn handle_node_announcement(&self, _their_node_id: Option, _msg: &msgs::NodeAnnouncement) -> Result { - Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError }) + Ok(true) } fn handle_channel_announcement(&self, _their_node_id: Option, _msg: &msgs::ChannelAnnouncement) -> Result { self.chan_anns_recvd.fetch_add(1, Ordering::AcqRel); - Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError }) + Ok(true) } fn handle_channel_update(&self, _their_node_id: Option, _msg: &msgs::ChannelUpdate) -> Result { self.chan_upds_recvd.fetch_add(1, Ordering::AcqRel); - Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError }) + Ok(true) } fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(msgs::ChannelAnnouncement, Option, Option)> { - let chan_upd_1 = get_dummy_channel_update(starting_point); - let chan_upd_2 = get_dummy_channel_update(starting_point); - let chan_ann = get_dummy_channel_announcement(starting_point); + if self.announcement_available_for_sync.load(Ordering::Acquire) { + let chan_upd_1 = get_dummy_channel_update(starting_point); + let chan_upd_2 = get_dummy_channel_update(starting_point); + let chan_ann = get_dummy_channel_announcement(starting_point); - Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2))) + Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2))) + } else { + None + } } fn get_next_node_announcement(&self, _starting_point: Option<&NodeId>) -> Option {