Skip to content

Commit

Permalink
Stream able to configure rx timestamp reading
Browse files Browse the repository at this point in the history
This allows us to optionally enable rx timestamp reading logic per
stream by calling `set_rx_timestamp`. When rx timestamp is disabled, the
standard battle-tested logic is used.
  • Loading branch information
dqminh authored and gumpt committed Sep 23, 2024
1 parent 7e0368d commit ebca676
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2b28af8029c2e74b642c3a7445dfd6768eda1b24
5bbb21bd377e352872ab767af7583e4d8e9022f8
50 changes: 48 additions & 2 deletions pingora-core/src/protocols/l4/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ struct RawStreamWrapper {
pub(crate) stream: RawStream,
/// store the last rx timestamp of the stream.
pub(crate) rx_ts: Option<SystemTime>,
/// enable reading rx timestamp
pub(crate) enable_rx_ts: bool,
#[cfg(target_os = "linux")]
/// This can be reused across multiple recvmsg calls. The cmsg buffer may
/// come from old sockets created by older version of pingora and so,
Expand All @@ -137,10 +139,15 @@ impl RawStreamWrapper {
RawStreamWrapper {
stream,
rx_ts: None,
enable_rx_ts: false,
#[cfg(target_os = "linux")]
reusable_cmsg_space: nix::cmsg_space!(nix::sys::time::TimeSpec),
}
}

pub fn enable_rx_ts(&mut self, enable_rx_ts: bool) {
self.enable_rx_ts = enable_rx_ts;
}
}

impl AsyncRead for RawStreamWrapper {
Expand Down Expand Up @@ -169,6 +176,18 @@ impl AsyncRead for RawStreamWrapper {
use futures::ready;
use nix::sys::socket::{recvmsg, ControlMessageOwned, MsgFlags, SockaddrStorage};

// if we do not need rx timestamp, then use the standard path
if !self.enable_rx_ts {
// Safety: Basic enum pin projection
unsafe {
let rs_wrapper = Pin::get_unchecked_mut(self);
match &mut rs_wrapper.stream {
RawStream::Tcp(s) => return Pin::new_unchecked(s).poll_read(cx, buf),
RawStream::Unix(s) => return Pin::new_unchecked(s).poll_read(cx, buf),
}
}
}

// Safety: Basic pin projection to get mutable stream
let rs_wrapper = unsafe { Pin::get_unchecked_mut(self) };
match &mut rs_wrapper.stream {
Expand Down Expand Up @@ -331,9 +350,11 @@ impl Stream {
if let RawStream::Tcp(s) = &self.stream.get_mut().stream {
let timestamp_options = TimestampingFlag::SOF_TIMESTAMPING_RX_SOFTWARE
| TimestampingFlag::SOF_TIMESTAMPING_SOFTWARE;
return setsockopt(s.as_raw_fd(), sockopt::Timestamping, &timestamp_options)
.or_err(InternalError, "failed to set SOF_TIMESTAMPING_RX_SOFTWARE");
setsockopt(s.as_raw_fd(), sockopt::Timestamping, &timestamp_options)
.or_err(InternalError, "failed to set SOF_TIMESTAMPING_RX_SOFTWARE")?;
self.stream.get_mut().enable_rx_ts(true);
}

Ok(())
}

Expand Down Expand Up @@ -755,4 +776,29 @@ mod tests {
assert_eq!(n, message.len());
assert!(stream.rx_ts.is_some());
}

#[cfg(target_os = "linux")]
#[tokio::test]
async fn test_rx_timestamp_standard_path() {
let message = "hello world".as_bytes();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();

tokio::spawn(async move {
let (mut stream, _) = listener.accept().await.unwrap();
notify2.notified().await;
stream.write_all(message).await.unwrap();
});

let mut stream: Stream = TcpStream::connect(addr).await.unwrap().into();
std::thread::sleep(Duration::from_micros(100));
notify.notify_one();

let mut buffer = vec![0u8; message.len()];
let n = stream.read(buffer.as_mut_slice()).await.unwrap();
assert_eq!(n, message.len());
assert!(stream.rx_ts.is_none());
}
}

0 comments on commit ebca676

Please sign in to comment.