Skip to content

Commit

Permalink
Optimize KEEP-ALIVE timer (#261)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Dec 2, 2023
1 parent 5e7f325 commit c9993af
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 80 deletions.
4 changes: 4 additions & 0 deletions ntex-io/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.3.13] - 2023-12-02

* Optimize KEEP-ALIVE timer

## [0.3.12] - 2023-11-29

* Refactor io timers
Expand Down
2 changes: 1 addition & 1 deletion ntex-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "0.3.12"
version = "0.3.13"
authors = ["ntex contributors <[email protected]>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
Expand Down
138 changes: 65 additions & 73 deletions ntex-io/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,11 @@ pin_project_lite::pin_project! {
bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct Flags: u8 {
const READY_ERR = 0b0001;
const IO_ERR = 0b0010;
const KA_TIMEOUT = 0b0100;
const READ_TIMEOUT = 0b1000;
const READY_ERR = 0b00001;
const IO_ERR = 0b00010;
const KA_ENABLED = 0b00100;
const NO_KA_TIMEOUT = 0b01000;
const READ_TIMEOUT = 0b10000;
}
}

Expand Down Expand Up @@ -231,6 +232,12 @@ where
let io = IoBoxed::from(io);
io.set_disconnect_timeout(cfg.disconnect_timeout());

let flags = if cfg.keepalive_timeout_secs().is_zero() {
Flags::NO_KA_TIMEOUT
} else {
Flags::KA_ENABLED | Flags::NO_KA_TIMEOUT
};

let pool = io.memory_pool().pool();
let shared = Rc::new(DispatcherShared {
io,
Expand All @@ -244,9 +251,9 @@ where
inner: DispatcherInner {
pool,
shared,
flags,
cfg: cfg.clone(),
error: None,
flags: Flags::empty(),
read_remains: 0,
read_remains_prev: 0,
read_max_timeout: Seconds::ZERO,
Expand All @@ -267,39 +274,6 @@ where
}
}

impl<S, U> Dispatcher<S, U>
where
S: Service<DispatchItem<U>, Response = Option<Response<U>>>,
U: Decoder + Encoder,
{
#[doc(hidden)]
#[deprecated(since = "0.3.6", note = "Use DispatcherConfig methods")]
/// Set keep-alive timeout.
///
/// To disable timeout set value to 0.
///
/// By default keep-alive timeout is set to 30 seconds.
pub fn keepalive_timeout(self, timeout: Seconds) -> Self {
self.inner.cfg.set_keepalive_timeout(timeout);
self
}

#[doc(hidden)]
#[deprecated(since = "0.3.6", note = "Use DispatcherConfig methods")]
/// Set connection disconnect timeout in seconds.
///
/// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
/// within this time, the connection get dropped.
///
/// To disable timeout set value to 0.
///
/// By default disconnect timeout is set to 1 seconds.
pub fn disconnect_timeout(self, val: Seconds) -> Self {
self.inner.shared.io.set_disconnect_timeout(val);
self
}
}

impl<S, U> DispatcherShared<S, U>
where
S: Service<DispatchItem<U>, Response = Option<Response<U>>>,
Expand Down Expand Up @@ -549,38 +523,33 @@ where
}

fn update_timer(&mut self, decoded: &Decoded<<U as Decoder>::Item>) {
log::debug!(
"update timer, item: {:?}, remains: {:?}, consumed: {:?}, flags: {:?}",
decoded.item.is_some(),
decoded.remains,
decoded.consumed,
self.flags
);

// got parsed frame
if decoded.item.is_some() {
self.read_remains = 0;
self.flags.remove(Flags::KA_TIMEOUT | Flags::READ_TIMEOUT);
self.flags.remove(Flags::READ_TIMEOUT);
} else if self.flags.contains(Flags::READ_TIMEOUT) {
// received new data but not enough for parsing complete frame
self.read_remains = decoded.remains as u32;
} else if self.read_remains == 0 && decoded.remains == 0 {
// no new data, start keep-alive timer
if !self.flags.contains(Flags::KA_TIMEOUT) {
if self
.flags
.contains(Flags::NO_KA_TIMEOUT | Flags::KA_ENABLED)
{
log::debug!(
"Start keep-alive timer {:?}",
self.cfg.keepalive_timeout_secs()
);
self.flags.insert(Flags::KA_TIMEOUT);
self.flags.remove(Flags::NO_KA_TIMEOUT);
self.shared
.io
.start_timer_secs(self.cfg.keepalive_timeout_secs());
}
} else if self.flags.contains(Flags::READ_TIMEOUT) {
// received new data but not enough for parsing complete frame
self.read_remains = decoded.remains as u32;
} else if let Some((timeout, max, _)) = self.cfg.frame_read_rate_params() {
// we got new data but not enough to parse single frame
// start read timer
self.flags.remove(Flags::KA_TIMEOUT);
self.flags.insert(Flags::READ_TIMEOUT);
self.flags
.insert(Flags::READ_TIMEOUT | Flags::NO_KA_TIMEOUT);

self.read_remains = decoded.remains as u32;
self.read_remains_prev = 0;
Expand All @@ -590,8 +559,6 @@ where
}

fn handle_timeout(&mut self) -> Result<(), DispatchItem<U>> {
log::debug!("handle timeout, flags: {:?}", self.flags);

// check read timer
if self.flags.contains(Flags::READ_TIMEOUT) {
if let Some((timeout, max, rate)) = self.cfg.frame_read_rate_params() {
Expand Down Expand Up @@ -697,11 +664,28 @@ mod tests {
codec: U,
service: F,
) -> (Self, State) {
let state = Io::new(io);
let pool = state.memory_pool().pool();
let cfg = DispatcherConfig::default()
.set_keepalive_timeout(Seconds(1))
.clone();
Self::debug_cfg(io, codec, service, cfg)
}

/// Construct new `Dispatcher` instance
pub(crate) fn debug_cfg<T: IoStream, F: IntoService<S, DispatchItem<U>>>(
io: T,
codec: U,
service: F,
cfg: DispatcherConfig,
) -> (Self, State) {
let state = Io::new(io);
let pool = state.memory_pool().pool();
state.set_disconnect_timeout(cfg.disconnect_timeout());

let flags = if cfg.keepalive_timeout_secs().is_zero() {
super::Flags::NO_KA_TIMEOUT
} else {
super::Flags::KA_ENABLED | super::Flags::NO_KA_TIMEOUT
};

let inner = State(state.get_ref());
state.start_timer(Duration::from_millis(500));
Expand All @@ -718,14 +702,14 @@ mod tests {
Dispatcher {
inner: DispatcherInner {
error: None,
flags: super::Flags::empty(),
st: DispatcherState::Processing,
read_remains: 0,
read_remains_prev: 0,
read_max_timeout: Seconds::ZERO,
pool,
shared,
cfg,
flags,
},
},
inner,
Expand Down Expand Up @@ -765,6 +749,10 @@ mod tests {

client.close().await;
assert!(client.is_server_dropped());

assert!(
format!("{:?}", super::Flags::NO_KA_TIMEOUT.clone()).contains("NO_KA_TIMEOUT")
);
}

#[ntex::test]
Expand All @@ -784,9 +772,8 @@ mod tests {
}
}),
);
#[allow(deprecated)]
spawn(async move {
let _ = disp.disconnect_timeout(Seconds(1)).await;
let _ = disp.await;
});

let buf = client.read().await.unwrap();
Expand Down Expand Up @@ -1012,14 +999,21 @@ mod tests {

#[ntex::test]
async fn test_keepalive() {
let _ = env_logger::try_init();

let (client, server) = IoTest::create();
client.remote_buffer_cap(1024);
client.write("GET /test HTTP/1\r\n\r\n");

let data = Arc::new(Mutex::new(RefCell::new(Vec::new())));
let data2 = data.clone();

let (disp, state) = Dispatcher::debug(
let cfg = DispatcherConfig::default()
.set_disconnect_timeout(Seconds(1))
.set_keepalive_timeout(Seconds(1))
.clone();

let (disp, state) = Dispatcher::debug_cfg(
server,
BytesCodec,
ntex_service::fn_service(move |msg: DispatchItem<BytesCodec>| {
Expand All @@ -1038,15 +1032,11 @@ mod tests {
Ok(None)
}
}),
cfg,
);
#[allow(deprecated)]
spawn(async move {
let _ = disp
.keepalive_timeout(Seconds::ZERO)
.keepalive_timeout(Seconds(1))
.await;
let _ = disp.await;
});
state.0 .0.disconnect_timeout.set(Seconds(1));

let buf = client.read().await.unwrap();
assert_eq!(buf, Bytes::from_static(b"GET /test HTTP/1\r\n\r\n"));
Expand All @@ -1067,7 +1057,12 @@ mod tests {
let data = Arc::new(Mutex::new(RefCell::new(Vec::new())));
let data2 = data.clone();

let (disp, state) = Dispatcher::debug(
let cfg = DispatcherConfig::default()
.set_keepalive_timeout(Seconds(1))
.set_frame_read_rate(Seconds(1), Seconds(2), 2)
.clone();

let (disp, state) = Dispatcher::debug_cfg(
server,
BCodec(8),
ntex_service::fn_service(move |msg: DispatchItem<BCodec>| {
Expand All @@ -1086,12 +1081,9 @@ mod tests {
Ok(None)
}
}),
cfg,
);
spawn(async move {
disp.inner
.cfg
.set_keepalive_timeout(Seconds(1))
.set_frame_read_rate(Seconds(1), Seconds(2), 2);
let _ = disp.await;
});

Expand Down
5 changes: 0 additions & 5 deletions ntex-io/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,11 +439,6 @@ impl<F> Io<F> {

let ready = flags.contains(Flags::RD_READY);
if flags.intersects(Flags::RD_BUF_FULL | Flags::RD_PAUSED) {
if flags.intersects(Flags::RD_BUF_FULL) {
log::trace!("read back-pressure is disabled, wake io task");
} else {
log::trace!("read task is resumed, wake io task");
}
flags.remove(Flags::RD_READY | Flags::RD_BUF_FULL | Flags::RD_PAUSED);
self.0 .0.read_task.wake();
self.0 .0.flags.set(flags);
Expand Down
2 changes: 1 addition & 1 deletion ntex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ ntex-util = "0.3.4"
ntex-bytes = "0.1.21"
ntex-h2 = "0.4.4"
ntex-rt = "0.4.11"
ntex-io = "0.3.12"
ntex-io = "0.3.13"
ntex-tls = "0.3.2"
ntex-tokio = { version = "0.3.1", optional = true }
ntex-glommio = { version = "0.3.1", optional = true }
Expand Down

0 comments on commit c9993af

Please sign in to comment.