Skip to content

Commit

Permalink
Better handling for connection upgrade #385 (#389)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Jul 30, 2024
1 parent f5bad7b commit 6e5fea7
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 199 deletions.
4 changes: 4 additions & 0 deletions ntex/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [2.1.0] - 2024-07-30

* Better handling for connection upgrade #385

## [2.0.3] - 2024-06-27

* Re-export server signals api
Expand Down
4 changes: 2 additions & 2 deletions ntex/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "2.0.3"
version = "2.1.0"
authors = ["ntex contributors <[email protected]>"]
description = "Framework for composable network services"
readme = "README.md"
Expand Down Expand Up @@ -68,7 +68,7 @@ ntex-bytes = "0.1.27"
ntex-server = "2.1"
ntex-h2 = "1.0"
ntex-rt = "0.4.13"
ntex-io = "2.0"
ntex-io = "2.1"
ntex-net = "2.0"
ntex-tls = "2.0"

Expand Down
47 changes: 16 additions & 31 deletions ntex/examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,29 @@
use ntex::http;
use ntex::web::{self, middleware, App, HttpRequest, HttpResponse, HttpServer};
use ntex::web;

#[web::get("/resource1/{name}/index.html")]
async fn index(req: HttpRequest, name: web::types::Path<String>) -> String {
println!("REQ: {:?}", req);
format!("Hello: {}!\r\n", name)
#[derive(serde::Deserialize)]
struct Info {
username: String,
}

async fn index_async(req: HttpRequest) -> &'static str {
println!("REQ: {:?}", req);
"Hello world!\r\n"
}

#[web::get("/")]
async fn no_params() -> &'static str {
"Hello world!\r\n"
async fn submit(info: web::types::Json<Info>) -> Result<String, web::Error> {
Ok(format!("Welcome {}!", info.username))
}

#[ntex::main]
async fn main() -> std::io::Result<()> {
std::env::set_var("RUST_LOG", "ntex=trace");
std::env::set_var("RUST_LOG", "trace");
env_logger::init();
web::HttpServer::new(|| {
let json_config = web::types::JsonConfig::default().limit(4096);

HttpServer::new(|| {
App::new()
.wrap(middleware::Logger::default())
.service((index, no_params))
.service(
web::resource("/resource2/index.html")
.wrap(middleware::DefaultHeaders::new().header("X-Version-R2", "0.3"))
.default_service(
web::route().to(|| async { HttpResponse::MethodNotAllowed() }),
)
.route(web::get().to(index_async)),
)
.service(web::resource("/test1.html").to(|| async { "Test\r\n" }))
web::App::new().service(
web::resource("/")
.state(json_config)
.route(web::post().to(submit)),
)
})
.bind("0.0.0.0:8081")?
.workers(4)
.keep_alive(http::KeepAlive::Disabled)
.bind(("127.0.0.1", 8080))?
.workers(1)
.run()
.await
}
7 changes: 2 additions & 5 deletions ntex/src/http/h1/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,6 @@ impl Codec {
self.ctype.get() == ConnectionType::KeepAlive
}

pub(super) fn set_ctype(&self, ctype: ConnectionType) {
self.ctype.set(ctype)
}

#[inline]
#[doc(hidden)]
pub fn set_date_header(&self, dst: &mut BytesMut) {
Expand All @@ -115,10 +111,11 @@ impl Codec {
self.flags.set(flags);
}

pub(super) fn unset_streaming(&self) {
pub(super) fn reset_upgrade(&self) {
let mut flags = self.flags.get();
flags.remove(Flags::STREAM);
self.flags.set(flags);
self.ctype.set(ConnectionType::Close);
}
}

Expand Down
53 changes: 38 additions & 15 deletions ntex/src/http/h1/control.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{fmt, future::Future, io};
use std::{fmt, future::Future, io, rc::Rc};

use crate::http::message::CurrentIo;
use crate::http::{body::Body, h1::Codec, Request, Response, ResponseError};
use crate::io::{Filter, Io, IoBoxed};
use crate::io::{Filter, Io, IoBoxed, IoRef};

pub enum Control<F, Err> {
/// New request is loaded
Expand Down Expand Up @@ -46,12 +46,8 @@ pub(super) enum ControlResult {
Upgrade(Request),
/// forward request to publish service
Publish(Request),
/// forward request to publish service
PublishUpgrade(Request),
/// send response
Response(Response<()>, Body),
/// send response
ResponseWithIo(Response<()>, Body, IoBoxed),
/// drop connection
Stop,
}
Expand All @@ -72,7 +68,7 @@ impl<F, Err> Control<F, Err> {
Control::NewRequest(NewRequest(req))
}

pub(super) fn upgrade(req: Request, io: Io<F>, codec: Codec) -> Self {
pub(super) fn upgrade(req: Request, io: Rc<Io<F>>, codec: Codec) -> Self {
Control::Upgrade(Upgrade { req, io, codec })
}

Expand Down Expand Up @@ -188,10 +184,34 @@ impl NewRequest {

pub struct Upgrade<F> {
req: Request,
io: Io<F>,
io: Rc<Io<F>>,
codec: Codec,
}

struct RequestIoAccess<F> {
io: Rc<Io<F>>,
codec: Codec,
}

impl<F> fmt::Debug for RequestIoAccess<F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RequestIoAccess")
.field("io", self.io.as_ref())
.field("codec", &self.codec)
.finish()
}
}

impl<F: Filter> crate::http::message::IoAccess for RequestIoAccess<F> {
fn get(&self) -> Option<&IoRef> {
Some(self.io.as_ref())
}

fn take(&self) -> Option<(IoBoxed, Codec)> {
Some((self.io.take().into(), self.codec.clone()))
}
}

impl<F: Filter> Upgrade<F> {
#[inline]
/// Returns reference to Io
Expand All @@ -215,12 +235,14 @@ impl<F: Filter> Upgrade<F> {
/// Ack upgrade request and continue handling process
pub fn ack(mut self) -> ControlAck {
// Move io into request
let io: IoBoxed = self.io.into();
io.stop_timer();
self.req.head_mut().io = CurrentIo::new(io, self.codec);
let io = Rc::new(RequestIoAccess {
io: self.io,
codec: self.codec,
});
self.req.head_mut().io = CurrentIo::new(io);

ControlAck {
result: ControlResult::PublishUpgrade(self.req),
result: ControlResult::Publish(self.req),
flags: ControlFlags::DISCONNECT,
}
}
Expand All @@ -232,8 +254,9 @@ impl<F: Filter> Upgrade<F> {
H: FnOnce(Request, Io<F>, Codec) -> R + 'static,
R: Future<Output = O>,
{
let io = self.io.take();
let _ = crate::rt::spawn(async move {
let _ = f(self.req, self.io, self.codec).await;
let _ = f(self.req, io, self.codec).await;
});
ControlAck {
result: ControlResult::Stop,
Expand All @@ -248,7 +271,7 @@ impl<F: Filter> Upgrade<F> {
let (res, body) = res.into_parts();

ControlAck {
result: ControlResult::ResponseWithIo(res, body.into(), self.io.into()),
result: ControlResult::Response(res, body.into()),
flags: ControlFlags::DISCONNECT,
}
}
Expand All @@ -259,7 +282,7 @@ impl<F: Filter> Upgrade<F> {
let (res, body) = res.into_parts();

ControlAck {
result: ControlResult::ResponseWithIo(res, body.into(), self.io.into()),
result: ControlResult::Response(res, body.into()),
flags: ControlFlags::DISCONNECT,
}
}
Expand Down
Loading

0 comments on commit 6e5fea7

Please sign in to comment.