diff --git a/tests/client.rs b/tests/client.rs index 43e1f08acb..80e77d589c 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -21,6 +21,7 @@ use futures_channel::oneshot; use futures_util::future::{self, FutureExt, TryFuture, TryFutureExt}; use support::TokioIo; use tokio::net::TcpStream; + mod support; fn s(buf: &[u8]) -> &str { @@ -2009,6 +2010,74 @@ mod conn { .expect_err("client should be closed"); } + #[tokio::test] + async fn http2_connect_detect_close() { + // Regression test for failure to fully close connections when using HTTP2 CONNECT + // We send a request, read/write some data, then drop the client connection. + use futures_util::future; + let (listener, addr) = setup_tk_test_server().await; + let (tx, rx) = oneshot::channel::<()>(); + const BODY: &[u8] = b"hello world"; + tokio::task::spawn(async move { + use hyper::server::conn::http2; + use hyper::service::service_fn; + + let res = listener.accept().await; + let (stream, _) = res.unwrap(); + let stream = TokioIo::new(stream); + + let service = service_fn(|req: Request| { + tokio::task::spawn(async move { + let io = &mut TokioIo::new(hyper::upgrade::on(req).await.unwrap()); + let mut buf: [u8; BODY.len()] = [0; BODY.len()]; + io.read_exact(&mut buf).await.unwrap(); + io.write_all(BODY).await.unwrap(); + }); + + future::ok::<_, hyper::Error>(Response::new(Empty::::new())) + }); + + tokio::task::spawn(async move { + let conn = http2::Builder::new(TokioExecutor).serve_connection(stream, service); + conn.await.unwrap(); + drop(tx); + }); + }); + + let io = tcp_connect(&addr).await.expect("tcp connect"); + let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) + .handshake(io) + .await + .expect("http handshake"); + + tokio::task::spawn(async move { + conn.await.expect("client conn"); + }); + + // Sanity check that client is ready + future::poll_fn(|ctx| client.poll_ready(ctx)) + .await + .expect("client poll ready sanity"); + + let req = Request::builder() + .method(Method::CONNECT) + .uri(format!("{}", addr)) + .body(Empty::::new()) + .expect("request builder"); + + let resp = client.send_request(req).await.expect("req1 send"); + assert_eq!(resp.status(), 200); + let io = &mut TokioIo::new(hyper::upgrade::on(resp).await.unwrap()); + + let mut buf: [u8; BODY.len()] = [0; BODY.len()]; + io.write_all(BODY).await.unwrap(); + io.read_exact(&mut buf).await.unwrap(); + drop(client); + let _ = tokio::time::timeout(Duration::from_secs(1), rx) + .await + .unwrap(); + } + #[tokio::test] async fn http2_keep_alive_detects_unresponsive_server() { let (listener, addr) = setup_tk_test_server().await;