Skip to content

Commit

Permalink
fix h1 keep-alive timer; fix h1 payload feed_eof
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed May 3, 2020
1 parent fff6073 commit 34184ba
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 35 deletions.
64 changes: 33 additions & 31 deletions ntex/src/http/h1/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,6 @@ where
upgrade: Option<U::Future>,
}

#[pin_project]
enum CallState<S: Service, X: Service> {
Io,
Expect(#[pin] X::Future),
Service(#[pin] S::Future),
}

impl<S: Service, X: Service> CallState<S, X> {
fn is_io(&self) -> bool {
match self {
CallState::Io => true,
_ => false,
}
}
}

struct InnerDispatcher<T, S, B, X, U>
where
S: Service<Request = Request>,
Expand Down Expand Up @@ -139,6 +123,22 @@ enum PollRead {
HasUpdates,
}

#[pin_project]
enum CallState<S: Service, X: Service> {
Io,
Expect(#[pin] X::Future),
Service(#[pin] S::Future),
}

impl<S: Service, X: Service> CallState<S, X> {
fn is_io(&self) -> bool {
match self {
CallState::Io => true,
_ => false,
}
}
}

enum CallProcess<S: Service, X: Service, U: Service> {
/// next call is available
Next(CallState<S, X>),
Expand Down Expand Up @@ -260,7 +260,10 @@ where
}

// keep-alive book-keeping
this.inner.poll_keepalive(cx, this.call.is_io())?;
this.inner.poll_keepalive(
cx,
this.call.is_io() && this.inner.send_payload.is_none(),
)?;

// shutdown process
if this.inner.flags.contains(Flags::SHUTDOWN) {
Expand Down Expand Up @@ -666,15 +669,14 @@ where
match read(cx, io, buf) {
Poll::Pending => break,
Poll::Ready(Ok(n)) => {
updated = true;
if n == 0 {
trace!(
"Disconnected during read, buffer size {}",
buf.len()
);
self.flags.insert(Flags::DISCONNECT);
break;
} else {
updated = true;
}
}
Poll::Ready(Err(e)) => {
Expand All @@ -691,20 +693,22 @@ where
}
}

if self.read_buf.is_empty() {
let result = if self.read_buf.is_empty() {
Ok(PollRead::NoUpdates)
} else {
let result = self.input_decode();
self.input_decode()
};

// socket is disconnected clear read buf
if self.flags.contains(Flags::DISCONNECT) {
self.read_buf.clear();
if let Some(mut payload) = self.payload.take() {
payload.feed_eof();
}
// socket is disconnected clear read buf
if self.flags.contains(Flags::DISCONNECT) {
self.read_buf.clear();
// decode operation wont run again, so we have to
// stop payload stream
if let Some(mut payload) = self.payload.take() {
payload.feed_eof();
}
result
}
result
}

fn internal_error(&mut self, msg: &'static str) {
Expand Down Expand Up @@ -814,9 +818,7 @@ where
is_empty: bool,
) -> Result<(), DispatchError> {
// do nothing for disconnected or upgrade socket or if keep-alive timer is disabled
if self.flags.intersects(Flags::DISCONNECT | Flags::UPGRADE)
|| self.ka_timer.is_none()
{
if self.flags.contains(Flags::DISCONNECT) || self.ka_timer.is_none() {
return Ok(());
}

Expand Down
15 changes: 13 additions & 2 deletions ntex/src/http/h1/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,24 @@ pub struct PayloadSender {
inner: Weak<RefCell<Inner>>,
}

impl Drop for PayloadSender {
fn drop(&mut self) {
self.set_error(PayloadError::Incomplete(None))
}
}

impl PayloadSender {
pub fn set_error(&mut self, err: PayloadError) {
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().set_error(err)
shared.borrow_mut().set_error(err);
self.inner = Weak::new();
}
}

pub fn feed_eof(&mut self) {
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().feed_eof()
shared.borrow_mut().feed_eof();
self.inner = Weak::new();
}
}

Expand Down Expand Up @@ -157,6 +165,9 @@ impl Inner {

fn feed_eof(&mut self) {
self.eof = true;
if let Some(task) = self.task.take() {
task.wake()
}
}

fn feed_data(&mut self, data: Bytes) {
Expand Down
4 changes: 2 additions & 2 deletions ntex/src/web/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ where
// ws handshake
let mut res = handshake(req.head())?;

// response body stream
// converter wraper from ws::Message to Bytes
let sink = ws::StreamEncoder::new(tx);

// create ws service
Expand All @@ -81,7 +81,7 @@ where
e
});

// start websockets protocol dispatcher
// start websockets service dispatcher
rt::spawn(crate::util::stream::Dispatcher::new(
// wrap bytes stream to ws::Frame's stream
ws::StreamDecoder::new(payload).map_err(|e| {
Expand Down

0 comments on commit 34184ba

Please sign in to comment.