diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index 7fe766abc..b7a80d283 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -71,22 +71,6 @@ where upgrade: Option, } -#[pin_project] -enum CallState { - Io, - Expect(#[pin] X::Future), - Service(#[pin] S::Future), -} - -impl CallState { - fn is_io(&self) -> bool { - match self { - CallState::Io => true, - _ => false, - } - } -} - struct InnerDispatcher where S: Service, @@ -139,6 +123,22 @@ enum PollRead { HasUpdates, } +#[pin_project] +enum CallState { + Io, + Expect(#[pin] X::Future), + Service(#[pin] S::Future), +} + +impl CallState { + fn is_io(&self) -> bool { + match self { + CallState::Io => true, + _ => false, + } + } +} + enum CallProcess { /// next call is available Next(CallState), @@ -260,7 +260,10 @@ where } // keep-alive book-keeping - this.inner.poll_keepalive(cx, this.call.is_io())?; + this.inner.poll_keepalive( + cx, + this.call.is_io() && this.inner.send_payload.is_none(), + )?; // shutdown process if this.inner.flags.contains(Flags::SHUTDOWN) { @@ -666,6 +669,7 @@ where match read(cx, io, buf) { Poll::Pending => break, Poll::Ready(Ok(n)) => { + updated = true; if n == 0 { trace!( "Disconnected during read, buffer size {}", @@ -673,8 +677,6 @@ where ); self.flags.insert(Flags::DISCONNECT); break; - } else { - updated = true; } } Poll::Ready(Err(e)) => { @@ -691,20 +693,22 @@ where } } - if self.read_buf.is_empty() { + let result = if self.read_buf.is_empty() { Ok(PollRead::NoUpdates) } else { - let result = self.input_decode(); + self.input_decode() + }; - // socket is disconnected clear read buf - if self.flags.contains(Flags::DISCONNECT) { - self.read_buf.clear(); - if let Some(mut payload) = self.payload.take() { - payload.feed_eof(); - } + // socket is disconnected clear read buf + if self.flags.contains(Flags::DISCONNECT) { + self.read_buf.clear(); + // decode operation wont run again, so we have to + // stop payload stream + if let Some(mut payload) = self.payload.take() { + payload.feed_eof(); } - result } + result } fn internal_error(&mut self, msg: &'static str) { @@ -814,9 +818,7 @@ where is_empty: bool, ) -> Result<(), DispatchError> { // do nothing for disconnected or upgrade socket or if keep-alive timer is disabled - if self.flags.intersects(Flags::DISCONNECT | Flags::UPGRADE) - || self.ka_timer.is_none() - { + if self.flags.contains(Flags::DISCONNECT) || self.ka_timer.is_none() { return Ok(()); } diff --git a/ntex/src/http/h1/payload.rs b/ntex/src/http/h1/payload.rs index 39ee4178f..424578800 100644 --- a/ntex/src/http/h1/payload.rs +++ b/ntex/src/http/h1/payload.rs @@ -92,16 +92,24 @@ pub struct PayloadSender { inner: Weak>, } +impl Drop for PayloadSender { + fn drop(&mut self) { + self.set_error(PayloadError::Incomplete(None)) + } +} + impl PayloadSender { pub fn set_error(&mut self, err: PayloadError) { if let Some(shared) = self.inner.upgrade() { - shared.borrow_mut().set_error(err) + shared.borrow_mut().set_error(err); + self.inner = Weak::new(); } } pub fn feed_eof(&mut self) { if let Some(shared) = self.inner.upgrade() { - shared.borrow_mut().feed_eof() + shared.borrow_mut().feed_eof(); + self.inner = Weak::new(); } } @@ -157,6 +165,9 @@ impl Inner { fn feed_eof(&mut self) { self.eof = true; + if let Some(task) = self.task.take() { + task.wake() + } } fn feed_data(&mut self, data: Bytes) { diff --git a/ntex/src/web/ws.rs b/ntex/src/web/ws.rs index 5846eb53b..250f3c783 100644 --- a/ntex/src/web/ws.rs +++ b/ntex/src/web/ws.rs @@ -68,7 +68,7 @@ where // ws handshake let mut res = handshake(req.head())?; - // response body stream + // converter wraper from ws::Message to Bytes let sink = ws::StreamEncoder::new(tx); // create ws service @@ -81,7 +81,7 @@ where e }); - // start websockets protocol dispatcher + // start websockets service dispatcher rt::spawn(crate::util::stream::Dispatcher::new( // wrap bytes stream to ws::Frame's stream ws::StreamDecoder::new(payload).map_err(|e| {