Skip to content

Commit

Permalink
Http gracefull shutdown support (#393)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Aug 12, 2024
1 parent f574916 commit 5f20ee2
Show file tree
Hide file tree
Showing 13 changed files with 373 additions and 41 deletions.
4 changes: 4 additions & 0 deletions ntex-io/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [2.2.0] - 2024-08-12

* Allow to notify dispatcher from IoRef

## [2.1.0] - 2024-07-30

* Optimize `Io` layout
Expand Down
2 changes: 1 addition & 1 deletion ntex-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-io"
version = "2.1.0"
version = "2.2.0"
authors = ["ntex contributors <[email protected]>"]
description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"]
Expand Down
15 changes: 11 additions & 4 deletions ntex-io/src/ioref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,24 @@ impl IoRef {
}

#[inline]
/// current timer handle
pub fn timer_handle(&self) -> timer::TimerHandle {
self.0.timeout.get()
/// Wakeup dispatcher
pub fn notify_dispatcher(&self) {
self.0.dispatch_task.wake();
log::trace!("{}: Timer, notify dispatcher", self.tag());
}

#[inline]
/// wakeup dispatcher and send keep-alive error
/// Wakeup dispatcher and send keep-alive error
pub fn notify_timeout(&self) {
self.0.notify_timeout()
}

#[inline]
/// current timer handle
pub fn timer_handle(&self) -> timer::TimerHandle {
self.0.timeout.get()
}

#[inline]
/// Start timer
pub fn start_timer(&self, timeout: Seconds) -> timer::TimerHandle {
Expand Down
4 changes: 4 additions & 0 deletions ntex/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [2.2.0] - 2024-08-12

* Http server gracefull shutdown support

## [2.1.0] - 2024-07-30

* Better handling for connection upgrade #385
Expand Down
8 changes: 4 additions & 4 deletions ntex/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "2.1.0"
version = "2.2.0"
authors = ["ntex contributors <[email protected]>"]
description = "Framework for composable network services"
readme = "README.md"
Expand Down Expand Up @@ -65,10 +65,10 @@ ntex-service = "3.0"
ntex-macros = "0.1.3"
ntex-util = "2"
ntex-bytes = "0.1.27"
ntex-server = "2.1"
ntex-h2 = "1.0"
ntex-server = "2.3"
ntex-h2 = "1.1"
ntex-rt = "0.4.13"
ntex-io = "2.1"
ntex-io = "2.2"
ntex-net = "2.0"
ntex-tls = "2.0"

Expand Down
33 changes: 30 additions & 3 deletions ntex/src/http/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,23 @@ impl ServiceConfig {
}
}

bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct Flags: u8 {
/// Keep-alive enabled
const KA_ENABLED = 0b0000_0001;
/// Shutdown service
const SHUTDOWN = 0b0000_0010;
}
}

pub(super) struct DispatcherConfig<S, C> {
flags: Cell<Flags>,
pub(super) service: Pipeline<S>,
pub(super) control: Pipeline<C>,
pub(super) keep_alive: Seconds,
pub(super) client_disconnect: Seconds,
pub(super) h2config: h2::Config,
pub(super) ka_enabled: bool,
pub(super) headers_read_rate: Option<ReadRate>,
pub(super) payload_read_rate: Option<ReadRate>,
pub(super) timer: DateService,
Expand All @@ -253,22 +263,39 @@ impl<S, C> DispatcherConfig<S, C> {
control: control.into(),
keep_alive: cfg.keep_alive,
client_disconnect: cfg.client_disconnect,
ka_enabled: cfg.ka_enabled,
headers_read_rate: cfg.headers_read_rate,
payload_read_rate: cfg.payload_read_rate,
h2config: cfg.h2config.clone(),
timer: cfg.timer.clone(),
flags: Cell::new(if cfg.ka_enabled {
Flags::KA_ENABLED
} else {
Flags::empty()
}),
}
}

/// Return state of connection keep-alive functionality
pub(super) fn keep_alive_enabled(&self) -> bool {
self.ka_enabled
self.flags.get().contains(Flags::KA_ENABLED)
}

pub(super) fn headers_read_rate(&self) -> Option<&ReadRate> {
self.headers_read_rate.as_ref()
}

/// Service is shuting down
pub(super) fn is_shutdown(&self) -> bool {
self.flags.get().contains(Flags::SHUTDOWN)
}

pub(super) fn shutdown(&self) {
self.h2config.shutdown();

let mut flags = self.flags.get();
flags.insert(Flags::SHUTDOWN);
self.flags.set(flags);
}
}

const DATE_VALUE_LENGTH_HDR: usize = 39;
Expand Down
6 changes: 6 additions & 0 deletions ntex/src/http/h1/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ where
B: MessageBody,
{
fn poll_read_request(&mut self, cx: &mut Context<'_>) -> Poll<State<F, C, S, B>> {
// stop dispatcher
if self.config.is_shutdown() {
log::trace!("{}: Service is shutting down", self.io.tag());
return Poll::Ready(self.stop());
}

log::trace!("{}: Trying to read http message", self.io.tag());

let result = match self.io.poll_recv_decode(&self.codec, cx) {
Expand Down
69 changes: 60 additions & 9 deletions ntex/src/http/h1/service.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::{error::Error, fmt, marker, rc::Rc};
use std::{cell::Cell, cell::RefCell, error::Error, fmt, marker, rc::Rc};

use crate::http::body::MessageBody;
use crate::http::config::{DispatcherConfig, ServiceConfig};
use crate::http::error::{DispatchError, ResponseError};
use crate::http::{request::Request, response::Response};
use crate::io::{types, Filter, Io};
use crate::io::{types, Filter, Io, IoRef};
use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
use crate::util::join;
use crate::{channel::oneshot, util::join, util::HashSet};

use super::control::{Control, ControlAck};
use super::default::DefaultControlService;
Expand Down Expand Up @@ -181,10 +181,14 @@ where
.await
.map_err(|e| log::error!("Cannot construct control service: {:?}", e))?;

let (tx, rx) = oneshot::channel();
let config = Rc::new(DispatcherConfig::new(self.cfg.clone(), service, control));

Ok(H1ServiceHandler {
config,
inflight: RefCell::new(Default::default()),
rx: Cell::new(Some(rx)),
tx: Cell::new(Some(tx)),
_t: marker::PhantomData,
})
}
Expand All @@ -193,6 +197,9 @@ where
/// `Service` implementation for HTTP1 transport
pub struct H1ServiceHandler<F, S, B, C> {
config: Rc<DispatcherConfig<S, C>>,
inflight: RefCell<HashSet<IoRef>>,
rx: Cell<Option<oneshot::Receiver<()>>>,
tx: Cell<Option<oneshot::Sender<()>>>,
_t: marker::PhantomData<(F, B)>,
}

Expand Down Expand Up @@ -224,18 +231,62 @@ where
}

async fn shutdown(&self) {
self.config.control.shutdown().await;
self.config.service.shutdown().await;
self.config.shutdown();

// check inflight connections
let inflight = {
let inflight = self.inflight.borrow();
for io in inflight.iter() {
io.notify_dispatcher();
}
inflight.len()
};
if inflight != 0 {
log::trace!("Shutting down service, in-flight connections: {}", inflight);

if let Some(rx) = self.rx.take() {
let _ = rx.await;
}

log::trace!("Shutting down is complected",);
}

join(
self.config.control.shutdown(),
self.config.service.shutdown(),
)
.await;
}

async fn call(&self, io: Io<F>, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
let inflight = {
let mut inflight = self.inflight.borrow_mut();
inflight.insert(io.get_ref());
inflight.len()
};

log::trace!(
"New http1 connection, peer address {:?}",
io.query::<types::PeerAddr>().get()
"New http1 connection, peer address {:?}, inflight: {}",
io.query::<types::PeerAddr>().get(),
inflight
);
let ioref = io.get_ref();

Dispatcher::new(io, self.config.clone())
let result = Dispatcher::new(io, self.config.clone())
.await
.map_err(DispatchError::Control)
.map_err(DispatchError::Control);

{
let mut inflight = self.inflight.borrow_mut();
inflight.remove(&ioref);

if inflight.len() == 0 {
if let Some(tx) = self.tx.take() {
let _ = tx.send(());
}
}
}

result
}
}
59 changes: 54 additions & 5 deletions ntex/src/http/h2/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::{cell::RefCell, error::Error, fmt, future::poll_fn, io, marker, mem, rc::Rc};
use std::cell::{Cell, RefCell};
use std::{error::Error, fmt, future::poll_fn, io, marker, mem, rc::Rc};

use ntex_h2::{self as h2, frame::StreamId, server};

use crate::channel::oneshot;
use crate::http::body::{BodySize, MessageBody};
use crate::http::config::{DispatcherConfig, ServiceConfig};
use crate::http::error::{DispatchError, H2Error, ResponseError};
Expand All @@ -10,7 +12,7 @@ use crate::http::message::{CurrentIo, ResponseHead};
use crate::http::{DateService, Method, Request, Response, StatusCode, Uri, Version};
use crate::io::{types, Filter, Io, IoBoxed, IoRef};
use crate::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
use crate::util::{Bytes, BytesMut, HashMap};
use crate::util::{Bytes, BytesMut, HashMap, HashSet};

use super::payload::{Payload, PayloadSender};
use super::DefaultControlService;
Expand Down Expand Up @@ -177,11 +179,16 @@ where
.create(())
.await
.map_err(|e| log::error!("Cannot construct publish service: {:?}", e))?;

let (tx, rx) = oneshot::channel();
let config = Rc::new(DispatcherConfig::new(self.cfg.clone(), service, ()));

Ok(H2ServiceHandler {
config,
control: self.ctl.clone(),
inflight: RefCell::new(Default::default()),
rx: Cell::new(Some(rx)),
tx: Cell::new(Some(tx)),
_t: marker::PhantomData,
})
}
Expand All @@ -191,6 +198,9 @@ where
pub struct H2ServiceHandler<F, S: Service<Request>, B, C> {
config: Rc<DispatcherConfig<S, ()>>,
control: Rc<C>,
inflight: RefCell<HashSet<IoRef>>,
rx: Cell<Option<oneshot::Receiver<()>>>,
tx: Cell<Option<oneshot::Sender<()>>>,
_t: marker::PhantomData<(F, B)>,
}

Expand Down Expand Up @@ -218,6 +228,25 @@ where

#[inline]
async fn shutdown(&self) {
self.config.shutdown();

// check inflight connections
let inflight = {
let inflight = self.inflight.borrow();
for io in inflight.iter() {
io.notify_dispatcher();
}
inflight.len()
};
if inflight != 0 {
log::trace!("Shutting down service, in-flight connections: {}", inflight);
if let Some(rx) = self.rx.take() {
let _ = rx.await;
}

log::trace!("Shutting down is complected",);
}

self.config.service.shutdown().await
}

Expand All @@ -226,17 +255,37 @@ where
io: Io<F>,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
let inflight = {
let mut inflight = self.inflight.borrow_mut();
inflight.insert(io.get_ref());
inflight.len()
};

log::trace!(
"New http2 connection, peer address {:?}",
io.query::<types::PeerAddr>().get()
"New http2 connection, peer address {:?}, inflight: {}",
io.query::<types::PeerAddr>().get(),
inflight
);
let control = self.control.create(()).await.map_err(|e| {
DispatchError::Control(
format!("Cannot construct control service: {:?}", e).into(),
)
})?;

handle(io.into(), control, self.config.clone()).await
let ioref = io.get_ref();
let result = handle(io.into(), control, self.config.clone()).await;
{
let mut inflight = self.inflight.borrow_mut();
inflight.remove(&ioref);

if inflight.len() == 0 {
if let Some(tx) = self.tx.take() {
let _ = tx.send(());
}
}
}

result
}
}

Expand Down
Loading

0 comments on commit 5f20ee2

Please sign in to comment.