Skip to content

Commit

Permalink
fix(client): avoid double-polling a Select future (hyperium#3290)
Browse files Browse the repository at this point in the history
This reworks http2 client connection task in order to avoid storing
a `Select` future. Under some scheduling cases the future was polled
multiple times, resulting in runtime panics:

```
thread 'main' panicked at 'cannot poll Select twice', /index.crates.io/futures-util-0.3.28/src/future/select.rs:112:42
stack backtrace:
[...]
   5: <futures_util::future::select::Select<A,B> as core::future::future::Future>::poll
   6: <hyper::proto::h2::client::H2ClientFuture<B,T> as core::future::future::Future>::poll
[...]
```

Closes hyperium#3289
  • Loading branch information
lucab authored and 0xE282B0 committed Jan 12, 2024
1 parent 9405c40 commit 4e929bc
Showing 1 changed file with 49 additions and 28 deletions.
77 changes: 49 additions & 28 deletions src/proto/h2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::rt::{Read, Write};
use bytes::Bytes;
use futures_channel::mpsc::{Receiver, Sender};
use futures_channel::{mpsc, oneshot};
use futures_util::future::{self, Either, FutureExt as _, Select};
use futures_util::future::{Either, FusedFuture, FutureExt as _};
use futures_util::stream::{StreamExt as _, StreamFuture};
use h2::client::{Builder, Connection, SendRequest};
use h2::SendStream;
Expand Down Expand Up @@ -143,7 +143,10 @@ where
} else {
(Either::Right(conn), ping::disabled())
};
let conn: ConnMapErr<T, B> = ConnMapErr { conn };
let conn: ConnMapErr<T, B> = ConnMapErr {
conn,
is_terminated: false,
};

exec.execute_h2_future(H2ClientFuture::Task {
task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
Expand Down Expand Up @@ -218,6 +221,8 @@ pin_project! {
{
#[pin]
conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
#[pin]
is_terminated: bool,
}
}

Expand All @@ -229,10 +234,26 @@ where
type Output = Result<(), ()>;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
self.project()
.conn
.poll(cx)
.map_err(|e| debug!("connection error: {}", e))
let mut this = self.project();

if *this.is_terminated {
return Poll::Pending;
}
let polled = this.conn.poll(cx);
if polled.is_ready() {
*this.is_terminated = true;
}
polled.map_err(|e| debug!("connection error: {}", e))
}
}

impl<T, B> FusedFuture for ConnMapErr<T, B>
where
B: Body,
T: Read + Write + Unpin,
{
fn is_terminated(&self) -> bool {
self.is_terminated
}
}

Expand All @@ -245,10 +266,11 @@ pin_project! {
T: Unpin,
{
#[pin]
select: Select<ConnMapErr<T, B>, StreamFuture<Receiver<Never>>>,
drop_rx: StreamFuture<Receiver<Never>>,
#[pin]
cancel_tx: Option<oneshot::Sender<Never>>,
conn: Option<ConnMapErr<T, B>>,
#[pin]
conn: ConnMapErr<T, B>,
}
}

Expand All @@ -263,9 +285,9 @@ where
cancel_tx: oneshot::Sender<Never>,
) -> Self {
Self {
select: future::select(conn, drop_rx),
drop_rx,
cancel_tx: Some(cancel_tx),
conn: None,
conn,
}
}
}
Expand All @@ -280,25 +302,24 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

if let Some(conn) = this.conn {
conn.poll_unpin(cx).map(|_| ())
} else {
match ready!(this.select.poll_unpin(cx)) {
Either::Left((_, _)) => {
// ok or err, the `conn` has finished
return Poll::Ready(());
}
Either::Right((_, b)) => {
// mpsc has been dropped, hopefully polling
// the connection some more should start shutdown
// and then close
trace!("send_request dropped, starting conn shutdown");
drop(this.cancel_tx.take().expect("Future polled twice"));
this.conn = &mut Some(b);
return Poll::Pending;
}
}
if !this.conn.is_terminated() {
if let Poll::Ready(_) = this.conn.poll_unpin(cx) {
// ok or err, the `conn` has finished.
return Poll::Ready(());
};
}

if !this.drop_rx.is_terminated() {
if let Poll::Ready(_) = this.drop_rx.poll_unpin(cx) {
// mpsc has been dropped, hopefully polling
// the connection some more should start shutdown
// and then close.
trace!("send_request dropped, starting conn shutdown");
drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
}
};

Poll::Pending
}
}

Expand Down

0 comments on commit 4e929bc

Please sign in to comment.