diff --git a/crates/net/eth-wire/src/lib.rs b/crates/net/eth-wire/src/lib.rs index f379090aa4e5..a1c7c70bdf0f 100644 --- a/crates/net/eth-wire/src/lib.rs +++ b/crates/net/eth-wire/src/lib.rs @@ -20,6 +20,7 @@ mod disconnect; pub mod errors; mod ethstream; mod hello; +pub mod multiplex; mod p2pstream; mod pinger; pub mod protocol; @@ -27,6 +28,9 @@ pub use builder::*; pub mod types; pub use types::*; +#[cfg(test)] +pub mod test_utils; + #[cfg(test)] pub use tokio_util::codec::{ LengthDelimitedCodec as PassthroughCodec, LengthDelimitedCodecError as PassthroughCodecError, diff --git a/crates/net/eth-wire/src/multiplex.rs b/crates/net/eth-wire/src/multiplex.rs new file mode 100644 index 000000000000..d0dcf467e59c --- /dev/null +++ b/crates/net/eth-wire/src/multiplex.rs @@ -0,0 +1,459 @@ +//! Rlpx protocol multiplexer and satellite stream +//! +//! A Satellite is a Stream that primarily drives a single RLPx subprotocol but can also handle +//! additional subprotocols. +//! +//! Most of other subprotocols are "dependent satellite" protocols of "eth" and not a fully standalone protocol, for example "snap", See also [snap protocol](https://github.com/ethereum/devp2p/blob/298d7a77c3bf833641579ecbbb5b13f0311eeeea/caps/snap.md?plain=1#L71) +//! Hence it is expected that the primary protocol is "eth" and the additional protocols are +//! "dependent satellite" protocols. + +use std::{ + collections::VecDeque, + fmt, + future::Future, + io, + pin::Pin, + task::{ready, Context, Poll}, +}; + +use bytes::{Bytes, BytesMut}; +use futures::{pin_mut, Sink, SinkExt, Stream, StreamExt, TryStream, TryStreamExt}; +use tokio::sync::{mpsc, mpsc::UnboundedSender}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use crate::{ + capability::{Capability, SharedCapabilities, SharedCapability, UnsupportedCapabilityError}, + errors::P2PStreamError, + CanDisconnect, DisconnectReason, P2PStream, +}; + +/// A Stream and Sink type that wraps a raw rlpx stream [P2PStream] and handles message ID +/// multiplexing. +#[derive(Debug)] +pub struct RlpxProtocolMultiplexer { + /// The raw p2p stream + conn: P2PStream, + /// All the subprotocols that are multiplexed on top of the raw p2p stream + protocols: Vec, +} + +impl RlpxProtocolMultiplexer { + /// Wraps the raw p2p stream + pub fn new(conn: P2PStream) -> Self { + Self { conn, protocols: Default::default() } + } + + /// Installs a new protocol on top of the raw p2p stream + pub fn install_protocol( + &mut self, + _cap: Capability, + _st: S, + ) -> Result<(), UnsupportedCapabilityError> { + todo!() + } + + /// Returns the [SharedCapabilities] of the underlying raw p2p stream + pub fn shared_capabilities(&self) -> &SharedCapabilities { + self.conn.shared_capabilities() + } + + /// Converts this multiplexer into a [RlpxSatelliteStream] with the given primary protocol. + /// + /// Returns an error if the primary protocol is not supported by the remote or the handshake + /// failed. + pub async fn into_satellite_stream_with_handshake( + mut self, + cap: &Capability, + handshake: F, + ) -> Result, Self> + where + F: FnOnce(ProtocolProxy) -> Fut, + Fut: Future>, + St: Stream> + Sink + Unpin, + { + let Ok(shared_cap) = self.shared_capabilities().ensure_matching_capability(cap).cloned() + else { + return Err(self) + }; + + let (to_primary, from_wire) = mpsc::unbounded_channel(); + let (to_wire, mut from_primary) = mpsc::unbounded_channel(); + let proxy = ProtocolProxy { + cap: shared_cap.clone(), + from_wire: UnboundedReceiverStream::new(from_wire), + to_wire, + }; + + let f = handshake(proxy); + pin_mut!(f); + + // handle messages until the handshake is complete + loop { + // TODO error handling + tokio::select! { + Some(Ok(msg)) = self.conn.next() => { + // TODO handle multiplex + let _ = to_primary.send(msg); + } + Some(msg) = from_primary.recv() => { + // TODO error handling + self.conn.send(msg).await.unwrap(); + } + res = &mut f => { + let Ok(primary) = res else { return Err(self) }; + return Ok(RlpxSatelliteStream { + conn: self.conn, + to_primary, + from_primary: UnboundedReceiverStream::new(from_primary), + primary, + primary_capability: shared_cap, + satellites: self.protocols, + out_buffer: Default::default(), + }) + } + } + } + } +} + +/// A Stream and Sink type that acts as a wrapper around a primary RLPx subprotocol (e.g. "eth") +#[derive(Debug)] +pub struct ProtocolProxy { + cap: SharedCapability, + from_wire: UnboundedReceiverStream, + to_wire: UnboundedSender, +} + +impl ProtocolProxy { + fn mask_msg_id(&self, msg: Bytes) -> Bytes { + // TODO handle empty messages + let mut masked_bytes = BytesMut::zeroed(msg.len()); + masked_bytes[0] = msg[0] + self.cap.relative_message_id_offset(); + masked_bytes[1..].copy_from_slice(&msg[1..]); + masked_bytes.freeze() + } + + fn unmask_id(&self, mut msg: BytesMut) -> BytesMut { + // TODO handle empty messages + msg[0] -= self.cap.relative_message_id_offset(); + msg + } +} + +impl Stream for ProtocolProxy { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let msg = ready!(self.from_wire.poll_next_unpin(cx)); + Poll::Ready(msg.map(|msg| Ok(self.get_mut().unmask_id(msg)))) + } +} + +impl Sink for ProtocolProxy { + type Error = io::Error; + + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> { + let msg = self.mask_msg_id(item); + self.to_wire.send(msg).map_err(|_| io::ErrorKind::BrokenPipe.into()) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +#[async_trait::async_trait] +impl CanDisconnect for ProtocolProxy { + async fn disconnect( + &mut self, + _reason: DisconnectReason, + ) -> Result<(), >::Error> { + // TODO handle disconnects + Ok(()) + } +} + +/// A connection channel to receive messages for the negotiated protocol. +/// +/// This is a [Stream] that returns raw bytes of the received messages for this protocol. +#[derive(Debug)] +pub struct ProtocolConnection { + from_wire: UnboundedReceiverStream, +} + +impl Stream for ProtocolConnection { + type Item = BytesMut; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.from_wire.poll_next_unpin(cx) + } +} + +/// A Stream and Sink type that acts as a wrapper around a primary RLPx subprotocol (e.g. "eth") +/// [EthStream](crate::EthStream) and can also handle additional subprotocols. +#[derive(Debug)] +pub struct RlpxSatelliteStream { + /// The raw p2p stream + conn: P2PStream, + to_primary: UnboundedSender, + from_primary: UnboundedReceiverStream, + primary: Primary, + primary_capability: SharedCapability, + satellites: Vec, + out_buffer: VecDeque, +} + +impl RlpxSatelliteStream {} + +impl Stream for RlpxSatelliteStream +where + St: Stream> + Sink + Unpin, + Primary: TryStream + Unpin, + P2PStreamError: Into, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + loop { + // first drain the primary stream + if let Poll::Ready(Some(msg)) = this.primary.try_poll_next_unpin(cx) { + return Poll::Ready(Some(msg)) + } + + let mut out_ready = true; + loop { + match this.conn.poll_ready_unpin(cx) { + Poll::Ready(_) => { + if let Some(msg) = this.out_buffer.pop_front() { + if let Err(err) = this.conn.start_send_unpin(msg) { + return Poll::Ready(Some(Err(err.into()))) + } + } else { + break; + } + } + Poll::Pending => { + out_ready = false; + break + } + } + } + + // advance primary out + loop { + match this.from_primary.poll_next_unpin(cx) { + Poll::Ready(Some(msg)) => { + this.out_buffer.push_back(msg); + } + Poll::Ready(None) => { + // primary closed + return Poll::Ready(None) + } + Poll::Pending => break, + } + } + + // advance all satellites + for idx in (0..this.satellites.len()).rev() { + let mut proto = this.satellites.swap_remove(idx); + loop { + match proto.poll_next_unpin(cx) { + Poll::Ready(Some(msg)) => { + this.out_buffer.push_back(msg); + } + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => { + this.satellites.push(proto); + break + } + } + } + } + + let mut delegated = false; + loop { + // pull messages from connection + match this.conn.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(msg))) => { + delegated = true; + let offset = msg[0]; + // find the protocol that matches the offset + // TODO optimize this by keeping a better index + let mut lowest_satellite = None; + // find the protocol with the lowest offset that is greater than the message + // offset + for (i, proto) in this.satellites.iter().enumerate() { + let proto_offset = proto.cap.relative_message_id_offset(); + if proto_offset >= offset { + if let Some((_, lowest_offset)) = lowest_satellite { + if proto_offset < lowest_offset { + lowest_satellite = Some((i, proto_offset)); + } + } else { + lowest_satellite = Some((i, proto_offset)); + } + } + } + + if let Some((idx, lowest_offset)) = lowest_satellite { + if lowest_offset < this.primary_capability.relative_message_id_offset() + { + // delegate to satellite + this.satellites[idx].send_raw(msg); + continue + } + } + // delegate to primary + let _ = this.to_primary.send(msg); + } + Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))), + Poll::Ready(None) => { + // connection closed + return Poll::Ready(None) + } + Poll::Pending => break, + } + } + + if !delegated || !out_ready || this.out_buffer.is_empty() { + return Poll::Pending + } + } + } +} + +impl Sink for RlpxSatelliteStream +where + St: Stream> + Sink + Unpin, + Primary: Sink + Unpin, + P2PStreamError: Into<>::Error>, +{ + type Error = >::Error; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + if let Err(err) = ready!(this.conn.poll_ready_unpin(cx)) { + return Poll::Ready(Err(err.into())) + } + if let Err(err) = ready!(this.primary.poll_ready_unpin(cx)) { + return Poll::Ready(Err(err)) + } + Poll::Ready(Ok(())) + } + + fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + self.get_mut().primary.start_send_unpin(item) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_mut().conn.poll_flush_unpin(cx).map_err(Into::into) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_mut().conn.poll_close_unpin(cx).map_err(Into::into) + } +} + +/// Wraps a RLPx subprotocol and handles message ID multiplexing. +struct ProtocolStream { + cap: SharedCapability, + /// the channel shared with the satellite stream + to_satellite: UnboundedSender, + satellite_st: Pin>>, +} + +impl ProtocolStream { + fn mask_msg_id(&self, mut msg: BytesMut) -> Bytes { + // TODO handle empty messages + msg[0] += self.cap.relative_message_id_offset(); + msg.freeze() + } + + fn unmask_id(&self, mut msg: BytesMut) -> BytesMut { + // TODO handle empty messages + msg[0] -= self.cap.relative_message_id_offset(); + msg + } + + fn send_raw(&self, msg: BytesMut) { + let _ = self.to_satellite.send(self.unmask_id(msg)); + } +} + +impl Stream for ProtocolStream { + type Item = Bytes; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let msg = ready!(this.satellite_st.as_mut().poll_next(cx)); + Poll::Ready(msg.map(|msg| this.mask_msg_id(msg))) + } +} + +impl fmt::Debug for ProtocolStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ProtocolStream").field("cap", &self.cap).finish_non_exhaustive() + } +} + +#[cfg(test)] +mod tests { + use tokio::net::TcpListener; + use tokio_util::codec::Decoder; + + use crate::{ + test_utils::{connect_passthrough, eth_handshake, eth_hello}, + UnauthedEthStream, UnauthedP2PStream, + }; + + use super::*; + + #[tokio::test] + async fn eth_satellite() { + reth_tracing::init_test_tracing(); + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let local_addr = listener.local_addr().unwrap(); + let (status, fork_filter) = eth_handshake(); + let other_status = status; + let other_fork_filter = fork_filter.clone(); + let _handle = tokio::spawn(async move { + let (incoming, _) = listener.accept().await.unwrap(); + let stream = crate::PassthroughCodec::default().framed(incoming); + let (server_hello, _) = eth_hello(); + let (p2p_stream, _) = + UnauthedP2PStream::new(stream).handshake(server_hello).await.unwrap(); + + let (_eth_stream, _) = UnauthedEthStream::new(p2p_stream) + .handshake(other_status, other_fork_filter) + .await + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + }); + + let conn = connect_passthrough(local_addr, eth_hello().0).await; + let eth = conn.shared_capabilities().eth().unwrap().clone(); + + let multiplexer = RlpxProtocolMultiplexer::new(conn); + + let _satellite = multiplexer + .into_satellite_stream_with_handshake( + eth.capability().as_ref(), + move |proxy| async move { + UnauthedEthStream::new(proxy).handshake(status, fork_filter).await + }, + ) + .await + .unwrap(); + } +} diff --git a/crates/net/eth-wire/src/p2pstream.rs b/crates/net/eth-wire/src/p2pstream.rs index a0f5c9f48d51..ed6001fb934a 100644 --- a/crates/net/eth-wire/src/p2pstream.rs +++ b/crates/net/eth-wire/src/p2pstream.rs @@ -1,19 +1,5 @@ #![allow(dead_code, unreachable_pub, missing_docs, unused_variables)] -use crate::{ - disconnect::CanDisconnect, - errors::{P2PHandshakeError, P2PStreamError}, - pinger::{Pinger, PingerEvent}, - DisconnectReason, HelloMessage, HelloMessageWithProtocols, -}; -use alloy_rlp::{Decodable, Encodable, Error as RlpError, EMPTY_LIST_CODE}; -use futures::{Sink, SinkExt, StreamExt}; -use pin_project::pin_project; -use reth_codecs::derive_arbitrary; -use reth_metrics::metrics::counter; -use reth_primitives::{ - bytes::{Buf, BufMut, Bytes, BytesMut}, - hex, GotExpected, -}; + use std::{ collections::VecDeque, fmt, io, @@ -21,13 +7,30 @@ use std::{ task::{ready, Context, Poll}, time::Duration, }; -use tokio_stream::Stream; -use crate::capability::SharedCapabilities; +use alloy_rlp::{Decodable, Encodable, Error as RlpError, EMPTY_LIST_CODE}; +use futures::{Sink, SinkExt, StreamExt}; +use pin_project::pin_project; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; +use tokio_stream::Stream; use tracing::{debug, trace}; +use reth_codecs::derive_arbitrary; +use reth_metrics::metrics::counter; +use reth_primitives::{ + bytes::{Buf, BufMut, Bytes, BytesMut}, + hex, GotExpected, +}; + +use crate::{ + capability::SharedCapabilities, + disconnect::CanDisconnect, + errors::{P2PHandshakeError, P2PStreamError}, + pinger::{Pinger, PingerEvent}, + DisconnectReason, HelloMessage, HelloMessageWithProtocols, +}; + /// [`MAX_PAYLOAD_SIZE`] is the maximum size of an uncompressed message payload. /// This is defined in [EIP-706](https://eips.ethereum.org/EIPS/eip-706). const MAX_PAYLOAD_SIZE: usize = 16 * 1024 * 1024; @@ -785,27 +788,12 @@ impl Decodable for ProtocolVersion { #[cfg(test)] mod tests { use super::*; - use crate::{capability::SharedCapability, DisconnectReason, EthVersion}; - use reth_discv4::DEFAULT_DISCOVERY_PORT; - use reth_ecies::util::pk2id; - use secp256k1::{SecretKey, SECP256K1}; + use crate::{ + capability::SharedCapability, test_utils::eth_hello, DisconnectReason, EthVersion, + }; use tokio::net::{TcpListener, TcpStream}; use tokio_util::codec::Decoder; - /// Returns a testing `HelloMessage` and new secretkey - fn eth_hello() -> (HelloMessageWithProtocols, SecretKey) { - let server_key = SecretKey::new(&mut rand::thread_rng()); - let protocols = vec![EthVersion::Eth67.into()]; - let hello = HelloMessageWithProtocols { - protocol_version: ProtocolVersion::V5, - client_version: "bitcoind/1.0.0".to_string(), - protocols, - port: DEFAULT_DISCOVERY_PORT, - id: pk2id(&server_key.public_key(SECP256K1)), - }; - (hello, server_key) - } - #[tokio::test] async fn test_can_disconnect() { reth_tracing::init_test_tracing(); diff --git a/crates/net/eth-wire/src/test_utils.rs b/crates/net/eth-wire/src/test_utils.rs new file mode 100644 index 000000000000..01bd9a048dc3 --- /dev/null +++ b/crates/net/eth-wire/src/test_utils.rs @@ -0,0 +1,57 @@ +//! Utilities for testing p2p protocol. + +use crate::{ + EthVersion, HelloMessageWithProtocols, P2PStream, ProtocolVersion, Status, UnauthedP2PStream, +}; +use reth_discv4::DEFAULT_DISCOVERY_PORT; +use reth_ecies::util::pk2id; +use reth_primitives::{Chain, ForkFilter, Head, B256, U256}; +use secp256k1::{SecretKey, SECP256K1}; +use std::net::SocketAddr; +use tokio::net::TcpStream; +use tokio_util::codec::{Decoder, Framed, LengthDelimitedCodec}; + +pub type P2pPassthroughTcpStream = P2PStream>; + +/// Returns a new testing `HelloMessage` and new secretkey +pub fn eth_hello() -> (HelloMessageWithProtocols, SecretKey) { + let server_key = SecretKey::new(&mut rand::thread_rng()); + let protocols = vec![EthVersion::Eth67.into()]; + let hello = HelloMessageWithProtocols { + protocol_version: ProtocolVersion::V5, + client_version: "eth/1.0.0".to_string(), + protocols, + port: DEFAULT_DISCOVERY_PORT, + id: pk2id(&server_key.public_key(SECP256K1)), + }; + (hello, server_key) +} + +/// Returns testing eth handshake status and fork filter. +pub fn eth_handshake() -> (Status, ForkFilter) { + let genesis = B256::random(); + let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new()); + + let status = Status { + version: EthVersion::Eth67 as u8, + chain: Chain::mainnet(), + total_difficulty: U256::ZERO, + blockhash: B256::random(), + genesis, + // Pass the current fork id. + forkid: fork_filter.current(), + }; + (status, fork_filter) +} + +/// Connects to a remote node and returns an authenticated `P2PStream` with the remote node. +pub async fn connect_passthrough( + addr: SocketAddr, + client_hello: HelloMessageWithProtocols, +) -> P2pPassthroughTcpStream { + let outgoing = TcpStream::connect(addr).await.unwrap(); + let sink = crate::PassthroughCodec::default().framed(outgoing); + let (p2p_stream, _) = UnauthedP2PStream::new(sink).handshake(client_hello).await.unwrap(); + + p2p_stream +} diff --git a/crates/net/network/src/protocol.rs b/crates/net/network/src/protocol.rs index 1ba6464defa7..adcfb75f22db 100644 --- a/crates/net/network/src/protocol.rs +++ b/crates/net/network/src/protocol.rs @@ -2,19 +2,14 @@ //! //! See also -use futures::{Stream, StreamExt}; -use reth_eth_wire::{capability::SharedCapabilities, protocol::Protocol}; +use futures::Stream; +use reth_eth_wire::{ + capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol, +}; use reth_network_api::Direction; use reth_primitives::BytesMut; use reth_rpc_types::PeerId; -use std::{ - fmt, - net::SocketAddr, - pin::Pin, - task::{Context, Poll}, -}; - -use tokio_stream::wrappers::UnboundedReceiverStream; +use std::{fmt, net::SocketAddr, pin::Pin}; /// A trait that allows to offer additional RLPx-based application-level protocols when establishing /// a peer-to-peer connection. @@ -81,22 +76,6 @@ pub enum OnNotSupported { Disconnect, } -/// A connection channel to receive messages for the negotiated protocol. -/// -/// This is a [Stream] that returns raw bytes of the received messages for this protocol. -#[derive(Debug)] -pub struct ProtocolConnection { - from_wire: UnboundedReceiverStream, -} - -impl Stream for ProtocolConnection { - type Item = BytesMut; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.from_wire.poll_next_unpin(cx) - } -} - /// A wrapper type for a RLPx sub-protocol. #[derive(Debug)] pub struct RlpxSubProtocol(Box);