From 78629ee17449d6d1c23a144f60407dc70ccce7c7 Mon Sep 17 00:00:00 2001 From: Aston Turing <151985526+astoring@users.noreply.github.com> Date: Tue, 5 Dec 2023 14:17:25 +0800 Subject: [PATCH] Add `ServerHandle` for stop server (#533) --- crates/core/src/conn/joined.rs | 14 +- crates/core/src/conn/mod.rs | 6 +- crates/core/src/conn/native_tls/listener.rs | 6 +- crates/core/src/conn/openssl/listener.rs | 6 +- crates/core/src/conn/proto.rs | 35 +--- crates/core/src/conn/quinn/builder.rs | 112 +++++++----- crates/core/src/conn/quinn/mod.rs | 6 +- crates/core/src/conn/tcp.rs | 6 +- crates/core/src/conn/unix.rs | 5 +- crates/core/src/http/body/channel.rs | 3 +- crates/core/src/http/body/res.rs | 3 +- crates/core/src/http/mod.rs | 5 +- crates/core/src/server.rs | 180 ++++++++++---------- 13 files changed, 187 insertions(+), 200 deletions(-) diff --git a/crates/core/src/conn/joined.rs b/crates/core/src/conn/joined.rs index c6f22775a..8a96a525b 100644 --- a/crates/core/src/conn/joined.rs +++ b/crates/core/src/conn/joined.rs @@ -7,7 +7,6 @@ use std::time::Duration; use pin_project::pin_project; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tokio_util::sync::CancellationToken; use crate::async_trait; use crate::conn::{Holding, HttpBuilder}; @@ -124,18 +123,11 @@ where self, handler: HyperHandler, builder: Arc, - server_shutdown_token: CancellationToken, - idle_connection_timeout: Option, + idle_timeout: Option, ) -> IoResult<()> { match self { - JoinedStream::A(a) => { - a.serve(handler, builder, server_shutdown_token, idle_connection_timeout) - .await - } - JoinedStream::B(b) => { - b.serve(handler, builder, server_shutdown_token, idle_connection_timeout) - .await - } + JoinedStream::A(a) => a.serve(handler, builder, idle_timeout).await, + JoinedStream::B(b) => b.serve(handler, builder, idle_timeout).await, } } } diff --git a/crates/core/src/conn/mod.rs b/crates/core/src/conn/mod.rs index 54d0390d9..d2ce56705 100644 --- a/crates/core/src/conn/mod.rs +++ b/crates/core/src/conn/mod.rs @@ -75,7 +75,6 @@ cfg_feature! { use tokio_rustls::server::TlsStream; use tokio::io::{AsyncRead, AsyncWrite}; - use tokio_util::sync::CancellationToken; use crate::async_trait; use crate::service::HyperHandler; @@ -89,10 +88,9 @@ cfg_feature! { S: AsyncRead + AsyncWrite + Send + Unpin + 'static, { async fn serve(self, handler: HyperHandler, builder: Arc, - server_shutdown_token: CancellationToken, - idle_connection_timeout: Option) -> IoResult<()> { + idle_timeout: Option) -> IoResult<()> { builder - .serve_connection(self, handler, server_shutdown_token, idle_connection_timeout) + .serve_connection(self, handler, idle_timeout) .await .map_err(|e| IoError::new(ErrorKind::Other, e.to_string())) } diff --git a/crates/core/src/conn/native_tls/listener.rs b/crates/core/src/conn/native_tls/listener.rs index 181865810..2f5bd4875 100644 --- a/crates/core/src/conn/native_tls/listener.rs +++ b/crates/core/src/conn/native_tls/listener.rs @@ -11,7 +11,6 @@ use futures_util::task::noop_waker_ref; use http::uri::Scheme; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_native_tls::TlsStream; -use tokio_util::sync::CancellationToken; use crate::async_trait; use crate::conn::{Accepted, Acceptor, Holding, HttpBuilder, IntoConfigStream, Listener}; @@ -72,11 +71,10 @@ where self, handler: HyperHandler, builder: Arc, - server_shutdown_token: CancellationToken, - idle_connection_timeout: Option, + idle_timeout: Option, ) -> IoResult<()> { builder - .serve_connection(self, handler, server_shutdown_token, idle_connection_timeout) + .serve_connection(self, handler, idle_timeout) .await .map_err(|e| IoError::new(ErrorKind::Other, e.to_string())) } diff --git a/crates/core/src/conn/openssl/listener.rs b/crates/core/src/conn/openssl/listener.rs index ee8b5b7e4..0d8575f51 100644 --- a/crates/core/src/conn/openssl/listener.rs +++ b/crates/core/src/conn/openssl/listener.rs @@ -13,7 +13,6 @@ use openssl::ssl::{Ssl, SslAcceptor}; use tokio::io::ErrorKind; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_openssl::SslStream; -use tokio_util::sync::CancellationToken; use super::SslAcceptorBuilder; @@ -122,11 +121,10 @@ where self, handler: HyperHandler, builder: Arc, - server_shutdown_token: CancellationToken, - idle_connection_timeout: Option, + idle_timeout: Option, ) -> IoResult<()> { builder - .serve_connection(self, handler, server_shutdown_token, idle_connection_timeout) + .serve_connection(self, handler, idle_timeout) .await .map_err(|e| IoError::new(ErrorKind::Other, e.to_string())) } diff --git a/crates/core/src/conn/proto.rs b/crates/core/src/conn/proto.rs index f6ad2ae8b..a0ef46dca 100644 --- a/crates/core/src/conn/proto.rs +++ b/crates/core/src/conn/proto.rs @@ -14,7 +14,7 @@ use http::{Request, Response, Version}; use hyper::service::Service; use pin_project::pin_project; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tokio::sync::{oneshot, Notify}; +use tokio::sync::Notify; use tokio_util::either::Either; use tokio_util::sync::CancellationToken; @@ -51,8 +51,7 @@ impl HttpBuilder { &self, socket: I, #[allow(unused_variables)] service: S, - #[allow(unused_variables)] server_shutdown_token: CancellationToken, - idle_connection_timeout: Option, + idle_timeout: Option, ) -> Result<()> where S: Service, Response = Response> + Send, @@ -73,7 +72,7 @@ impl HttpBuilder { #[cfg(all(not(feature = "http1"), feature = "http2"))] let version = Version::HTTP_2; #[allow(unused_variables)] - let socket = match idle_connection_timeout { + let socket = match idle_timeout { Some(timeout) => Either::Left(ClosingInactiveConnection::new(socket, timeout, { let conn_shutdown_token = conn_shutdown_token.clone(); @@ -106,7 +105,6 @@ impl HttpBuilder { _ = conn_shutdown_token.cancelled() => { tracing::info!("closing connection due to inactivity"); } - _ = server_shutdown_token.cancelled() => {} } // Init graceful shutdown for connection (`GOAWAY` for `HTTP/2` or disabling `keep-alive` for `HTTP/1`) @@ -128,7 +126,6 @@ impl HttpBuilder { _ = conn_shutdown_token.cancelled() => { tracing::info!("closing connection due to inactivity"); } - _ = server_shutdown_token.cancelled() => {} } // Init graceful shutdown for connection (`GOAWAY` for `HTTP/2` or disabling `keep-alive` for `HTTP/1`) @@ -152,7 +149,6 @@ struct ClosingInactiveConnection { #[pin] alive: Arc, timeout: Duration, - stop_tx: oneshot::Sender<()>, } impl AsyncRead for ClosingInactiveConnection @@ -213,33 +209,18 @@ impl ClosingInactiveConnection { Fut: Future + Send + 'static, { let alive = Arc::new(Notify::new()); - let (stop_tx, stop_rx) = oneshot::channel(); tokio::spawn({ let alive = alive.clone(); - async move { - let check_timeout = async { - loop { - match tokio::time::timeout(timeout, alive.notified()).await { - Ok(()) => {} - Err(_) => { - f().await; - } - } + loop { + if tokio::time::timeout(timeout, alive.notified()).await.is_err() { + f().await; + break; } - }; - tokio::select! { - _ = stop_rx => {}, - _ = check_timeout => {} } } }); - Self { - inner, - alive, - timeout, - stop_tx, - } + Self { inner, alive, timeout } } } diff --git a/crates/core/src/conn/quinn/builder.rs b/crates/core/src/conn/quinn/builder.rs index 0dbf48308..7b568c73e 100644 --- a/crates/core/src/conn/quinn/builder.rs +++ b/crates/core/src/conn/quinn/builder.rs @@ -2,6 +2,7 @@ use std::io::{Error as IoError, ErrorKind, Result as IoResult}; use std::ops::{Deref, DerefMut}; use std::pin::Pin; +use std::sync::Arc; use std::sync::Mutex; use std::time::Duration; @@ -11,6 +12,7 @@ use futures_util::Stream; use salvo_http3::error::ErrorLevel; use salvo_http3::ext::Protocol; use salvo_http3::server::RequestStream; +use tokio::sync::Notify; use tokio_util::sync::CancellationToken; use crate::http::body::{H3ReqBody, ReqBody}; @@ -49,57 +51,91 @@ impl Builder { } } +macro_rules! process_accepted { + ($conn:expr, $accepted:expr, $hyper_handler:expr) => { + match $accepted { + Ok(Some((request, stream))) => { + tracing::debug!("new request: {:#?}", request); + let hyper_handler = $hyper_handler.clone(); + match request.method() { + &Method::CONNECT + if request.extensions().get::() == Some(&Protocol::WEB_TRANSPORT) => + { + if let Some(c) = process_web_transport($conn, request, stream, hyper_handler).await? { + $conn = c; + } else { + return Ok(()); + } + } + _ => { + tokio::spawn(async move { + match process_request(request, stream, hyper_handler).await { + Ok(_) => {} + Err(e) => { + tracing::error!(error = ?e, "process request failed") + } + } + }); + } + } + } + Ok(None) => { + break; + } + Err(e) => { + tracing::warn!(error = ?e, "accept failed"); + match e.get_error_level() { + ErrorLevel::ConnectionError => break, + ErrorLevel::StreamError => continue, + } + } + } + } +} impl Builder { /// Serve HTTP3 connection. pub async fn serve_connection( &self, conn: crate::conn::quinn::H3Connection, hyper_handler: crate::service::HyperHandler, - _server_shutdown_token: CancellationToken, //TODO - _idle_connection_timeout: Option, //TODO + idle_timeout: Option, //TODO ) -> IoResult<()> { let mut conn = self .0 .build::(conn.into_inner()) .await .map_err(|e| IoError::new(ErrorKind::Other, format!("invalid connection: {}", e)))?; + loop { - match conn.accept().await { - Ok(Some((request, stream))) => { - tracing::debug!("new request: {:#?}", request); - let hyper_handler = hyper_handler.clone(); - match request.method() { - &Method::CONNECT - if request.extensions().get::() == Some(&Protocol::WEB_TRANSPORT) => - { - if let Some(c) = process_web_transport(conn, request, stream, hyper_handler).await? { - conn = c; - } else { - return Ok(()); + if let Some(idle_timeout) = idle_timeout { + let conn_shutdown_token = CancellationToken::new(); + let alive = Arc::new(Notify::new()); + tokio::spawn({ + let alive = alive.clone(); + let conn_shutdown_token = conn_shutdown_token.clone(); + async move { + loop { + let timeout = tokio::time::timeout(idle_timeout, alive.notified()); + if timeout.await.is_err() { + conn_shutdown_token.cancel(); + break; } } - _ => { - tokio::spawn(async move { - match process_request(request, stream, hyper_handler).await { - Ok(_) => {} - Err(e) => { - tracing::error!(error = ?e, "process request failed") - } - } - }); - } } - } - Ok(None) => { - break; - } - Err(e) => { - tracing::warn!(error = ?e, "accept failed"); - match e.get_error_level() { - ErrorLevel::ConnectionError => break, - ErrorLevel::StreamError => continue, + }); + tokio::select! { + accepted = conn.accept() => { + alive.notify_waiters(); + process_accepted!(conn, accepted, hyper_handler); + } + _ = conn_shutdown_token.cancelled() => { + tracing::info!("closing http3 connection due to inactivity"); + break; } } + } else { + let accpeted = conn.accept().await; + process_accepted!(conn, accpeted, hyper_handler); } } Ok(()) @@ -175,9 +211,9 @@ async fn process_web_transport( if let Err(e) = stream.send_data(frame.into_data().unwrap_or_default()).await { tracing::error!(error = ?e, "unable to send data to connection peer"); } - } else if let Err(e) = stream.send_trailers(frame.into_trailers().unwrap_or_default()).await { - tracing::error!(error = ?e, "unable to send trailers to connection peer"); - } + } else if let Err(e) = stream.send_trailers(frame.into_trailers().unwrap_or_default()).await { + tracing::error!(error = ?e, "unable to send trailers to connection peer"); + } } Err(e) => { tracing::error!(error = ?e, "unable to poll data from connection"); @@ -230,8 +266,8 @@ where tracing::error!(error = ?e, "unable to send data to connection peer"); } } else if let Err(e) = tx.send_trailers(frame.into_trailers().unwrap_or_default()).await { - tracing::error!(error = ?e, "unable to send trailers to connection peer"); - } + tracing::error!(error = ?e, "unable to send trailers to connection peer"); + } } Err(e) => { tracing::error!(error = ?e, "unable to poll data from connection"); diff --git a/crates/core/src/conn/quinn/mod.rs b/crates/core/src/conn/quinn/mod.rs index a285f01a3..8aacf3ab5 100644 --- a/crates/core/src/conn/quinn/mod.rs +++ b/crates/core/src/conn/quinn/mod.rs @@ -11,7 +11,6 @@ use futures_util::Stream; use salvo_http3::http3_quinn; pub use salvo_http3::http3_quinn::ServerConfig; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tokio_util::sync::CancellationToken; use crate::async_trait; use crate::conn::rustls::RustlsConfig; @@ -78,12 +77,11 @@ impl HttpConnection for H3Connection { self, handler: HyperHandler, builder: Arc, - server_shutdown_token: CancellationToken, - idle_connection_timeout: Option, + idle_timeout: Option, ) -> IoResult<()> { builder .quinn - .serve_connection(self, handler, server_shutdown_token, idle_connection_timeout) + .serve_connection(self, handler, idle_timeout) .await } } diff --git a/crates/core/src/conn/tcp.rs b/crates/core/src/conn/tcp.rs index 7e459a3d8..f94451713 100644 --- a/crates/core/src/conn/tcp.rs +++ b/crates/core/src/conn/tcp.rs @@ -5,7 +5,6 @@ use std::time::Duration; use std::vec; use tokio::net::{TcpListener as TokioTcpListener, TcpStream, ToSocketAddrs}; -use tokio_util::sync::CancellationToken; use crate::async_trait; use crate::conn::{Holding, HttpBuilder}; @@ -135,11 +134,10 @@ impl HttpConnection for TcpStream { self, handler: HyperHandler, builder: Arc, - server_shutdown_token: CancellationToken, - idle_connection_timeout: Option, + idle_timeout: Option, ) -> IoResult<()> { builder - .serve_connection(self, handler, server_shutdown_token, idle_connection_timeout) + .serve_connection(self, handler, idle_timeout) .await .map_err(|e| IoError::new(ErrorKind::Other, e.to_string())) } diff --git a/crates/core/src/conn/unix.rs b/crates/core/src/conn/unix.rs index ccd9e52b1..8de40ee08 100644 --- a/crates/core/src/conn/unix.rs +++ b/crates/core/src/conn/unix.rs @@ -123,11 +123,10 @@ impl HttpConnection for UnixStream { self, handler: HyperHandler, builder: Arc, - server_shutdown_token: CancellationToken, - idle_connection_timeout: Option, + idle_timeout: Option, ) -> IoResult<()> { builder - .serve_connection(self, handler, server_shutdown_token, idle_connection_timeout) + .serve_connection(self, handler, idle_timeout) .await .map_err(|e| IoError::new(ErrorKind::Other, e.to_string())) } diff --git a/crates/core/src/http/body/channel.rs b/crates/core/src/http/body/channel.rs index 14eb72a7a..a69c00324 100644 --- a/crates/core/src/http/body/channel.rs +++ b/crates/core/src/http/body/channel.rs @@ -4,8 +4,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use bytes::Bytes; -use futures_channel::mpsc; -use futures_channel::oneshot; +use futures_channel::{mpsc, oneshot}; use hyper::HeaderMap; /// A sender half created through [`ResBody::channel()`]. diff --git a/crates/core/src/http/body/res.rs b/crates/core/src/http/body/res.rs index a0a836ac5..7b783b0af 100644 --- a/crates/core/src/http/body/res.rs +++ b/crates/core/src/http/body/res.rs @@ -8,8 +8,7 @@ use std::io::{Error as IoError, ErrorKind, Result as IoResult}; use std::pin::Pin; use std::task::{self, ready, Context, Poll}; -use futures_channel::mpsc; -use futures_channel::oneshot; +use futures_channel::{mpsc, oneshot}; use futures_util::stream::{BoxStream, FusedStream, Stream, TryStreamExt}; use hyper::body::{Body, Frame, Incoming, SizeHint}; use sync_wrapper::SyncWrapper; diff --git a/crates/core/src/http/mod.rs b/crates/core/src/http/mod.rs index 89592349a..dc652f89e 100644 --- a/crates/core/src/http/mod.rs +++ b/crates/core/src/http/mod.rs @@ -26,8 +26,6 @@ use std::io::Result as IoResult; use std::sync::Arc; use std::time::Duration; -use tokio_util::sync::CancellationToken; - use crate::async_trait; use crate::conn::HttpBuilder; use crate::service::HyperHandler; @@ -40,8 +38,7 @@ pub trait HttpConnection { self, handler: HyperHandler, builder: Arc, - server_shutdown_token: CancellationToken, - idle_connection_timeout: Option, + idle_timeout: Option, ) -> IoResult<()>; } diff --git a/crates/core/src/server.rs b/crates/core/src/server.rs index cb43e3039..165568934 100644 --- a/crates/core/src/server.rs +++ b/crates/core/src/server.rs @@ -1,5 +1,4 @@ //! Server module -use std::future::Future; use std::io::Result as IoResult; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -8,6 +7,7 @@ use std::sync::Arc; use hyper::server::conn::http1; #[cfg(feature = "http2")] use hyper::server::conn::http2; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::Notify; use tokio::time::Duration; use tokio_util::sync::CancellationToken; @@ -18,13 +18,37 @@ use crate::conn::{Accepted, Acceptor, Holding, HttpBuilder}; use crate::http::{HeaderValue, HttpConnection, Version}; use crate::Service; +/// Server handle is used to stop server. +#[derive(Clone)] +pub struct ServerHandle { + tx_cmd: UnboundedSender, +} + +impl ServerHandle { + /// Force stop server. + pub fn stop_forcible(&self) { + self.tx_cmd.send(ServerCommand::StopForcible).ok(); + } + /// Graceful stop server. + pub fn stop_graceful(&self, timeout: impl Into>) { + self.tx_cmd.send(ServerCommand::StopGraceful(timeout.into())).ok(); + } +} + +enum ServerCommand { + StopForcible, + StopGraceful(Option), +} + /// HTTP Server /// /// A `Server` is created to listen on a port, parse HTTP requests, and hand them off to a [`Service`]. pub struct Server { acceptor: A, builder: HttpBuilder, - idle_timeout: Option, + conn_idle_timeout: Option, + tx_cmd: UnboundedSender, + rx_cmd: UnboundedReceiver, } impl Server { @@ -41,11 +65,10 @@ impl Server { /// Server::new(acceptor); /// # } /// ``` - #[inline] pub fn new(acceptor: A) -> Self { - Server { + Self::with_http_builder( acceptor, - builder: HttpBuilder { + HttpBuilder { #[cfg(feature = "http1")] http1: http1::Builder::new(), #[cfg(feature = "http2")] @@ -53,10 +76,37 @@ impl Server { #[cfg(feature = "quinn")] quinn: crate::conn::quinn::Builder::new(), }, - idle_timeout: None, + ) + } + + /// Create new `Server` with [`Acceptor`] and [`HttpBuilder`]. + pub fn with_http_builder(acceptor: A, builder: HttpBuilder) -> Self { + let (tx_cmd, rx_cmd) = tokio::sync::mpsc::unbounded_channel(); + Self { + acceptor, + builder, + conn_idle_timeout: None, + tx_cmd, + rx_cmd, + } + } + + /// Get a [`ServerHandle`] to stop server. + pub fn handle(&self) -> ServerHandle { + ServerHandle { + tx_cmd: self.tx_cmd.clone(), } } + /// Force stop server. + pub fn stop_forcible(&self) { + self.tx_cmd.send(ServerCommand::StopForcible).ok(); + } + /// Graceful stop server. + pub fn stop_graceful(&self, timeout: impl Into>) { + self.tx_cmd.send(ServerCommand::StopGraceful(timeout.into())).ok(); + } + /// Get holding information of this server. #[inline] pub fn holdings(&self) -> &[Holding] { @@ -90,8 +140,8 @@ impl Server { /// Specify connection idle timeout. Connections will be terminated if there was no activity /// within this period of time. #[must_use] - pub fn idle_timeout(mut self, timeout: Duration) -> Self { - self.idle_timeout = Some(timeout); + pub fn conn_idle_timeout(mut self, timeout: Duration) -> Self { + self.conn_idle_timeout = Some(timeout); self } @@ -109,75 +159,17 @@ impl Server { pub async fn try_serve(self, service: S) -> IoResult<()> where S: Into + Send, - { - self.try_serve_with_graceful_shutdown(service, futures_util::future::pending(), None) - .await - } - - /// Serve with graceful shutdown signal. - /// - /// # Example - /// - /// ```no_run - /// # use tokio::sync::oneshot; - /// - /// use salvo_core::prelude::*; - /// - /// #[handler] - /// async fn hello(res: &mut Response) { - /// res.render("Hello World!"); - /// } - /// - /// #[tokio::main] - /// async fn main() { - /// let (tx, rx) = oneshot::channel(); - /// let router = Router::new().get(hello); - /// let acceptor = TcpListener::new("127.0.0.1:5800").bind().await; - /// let server = Server::new(acceptor).serve_with_graceful_shutdown(router, async { - /// rx.await.ok(); - /// }, None); - /// - /// // Spawn the server into a runtime - /// tokio::task::spawn(server); - /// - /// // Later, start the shutdown... - /// let _ = tx.send(()); - /// } - /// ``` - #[inline] - pub async fn serve_with_graceful_shutdown(self, service: S, signal: G, timeout: Option) - where - S: Into + Send, - G: Future + Send + 'static, - { - self.try_serve_with_graceful_shutdown(service, signal, timeout) - .await - .unwrap(); - } - - /// Serve with graceful shutdown signal. - #[inline] - pub async fn try_serve_with_graceful_shutdown( - self, - service: S, - signal: G, - timeout: Option, - ) -> IoResult<()> - where - S: Into + Send, - G: Future + Send + 'static, { let Self { mut acceptor, builder, - idle_timeout, + conn_idle_timeout, + mut rx_cmd, + .. } = self; let alive_connections = Arc::new(AtomicUsize::new(0)); let notify = Arc::new(Notify::new()); let timeout_token = CancellationToken::new(); - let server_shutdown_token = CancellationToken::new(); - - tokio::pin!(signal); let mut alt_svc_h3 = None; for holding in acceptor.holdings() { @@ -198,21 +190,28 @@ impl Server { let builder = Arc::new(builder); loop { tokio::select! { - _ = &mut signal => { - server_shutdown_token.cancel(); - if let Some(timeout) = timeout { - tracing::info!( - timeout_in_seconds = timeout.as_secs_f32(), - "initiate graceful shutdown", - ); - - let timeout_token = timeout_token.clone(); - tokio::spawn(async move { - tokio::time::sleep(timeout).await; + Some(cmd) = rx_cmd.recv() => { + match cmd { + ServerCommand::StopGraceful(timeout) => { + if let Some(timeout) = timeout { + tracing::info!( + timeout_in_seconds = timeout.as_secs_f32(), + "initiate graceful stop server", + ); + + let timeout_token = timeout_token.clone(); + tokio::spawn(async move { + tokio::time::sleep(timeout).await; + timeout_token.cancel(); + }); + } else { + tracing::info!("initiate graceful stop server"); + } + }, + ServerCommand::StopForcible => { + tracing::info!("force stop server"); timeout_token.cancel(); - }); - } else { - tracing::info!("initiate graceful shutdown"); + }, } break; }, @@ -228,19 +227,14 @@ impl Server { let builder = builder.clone(); let timeout_token = timeout_token.clone(); - let server_shutdown_token = server_shutdown_token.clone(); tokio::spawn(async move { - let conn = conn.serve(handler, builder, server_shutdown_token, idle_timeout); - if timeout.is_some() { - tokio::select! { - _ = conn => { - }, - _ = timeout_token.cancelled() => { - } + let conn = conn.serve(handler, builder, conn_idle_timeout); + tokio::select! { + _ = conn => { + }, + _ = timeout_token.cancelled() => { } - } else { - conn.await.ok(); } if alive_connections.fetch_sub(1, Ordering::Acquire) == 1 {