diff --git a/.bleep b/.bleep index 9840797f..da57c1f8 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -dc97a9520b124eb464f348b0381991d8669c8d8a \ No newline at end of file +8e05a8f5b9d09885e6374011c422678043a2bda0 \ No newline at end of file diff --git a/pingora-proxy/src/proxy_h1.rs b/pingora-proxy/src/proxy_h1.rs index 15ce0387..544a74e6 100644 --- a/pingora-proxy/src/proxy_h1.rs +++ b/pingora-proxy/src/proxy_h1.rs @@ -98,7 +98,7 @@ impl HttpProxy { ); match ret { - Ok((_first, _second)) => (true, true, None), + Ok((downstream_can_reuse, _upstream)) => (downstream_can_reuse, true, None), Err(e) => (false, false, Some(e)), } } @@ -204,13 +204,14 @@ impl HttpProxy { } // todo use this function to replace bidirection_1to2() + // returns whether this server (downstream) session can be reused async fn proxy_handle_downstream( &self, session: &mut Session, tx: mpsc::Sender, mut rx: mpsc::Receiver, ctx: &mut SV::CTX, - ) -> Result<()> + ) -> Result where SV: ProxyHttp + Send + Sync, SV::CTX: Send + Sync, @@ -416,16 +417,19 @@ impl HttpProxy { } } - match session.as_mut().finish_body().await { - Ok(_) => { - debug!("finished sending body to downstream"); - } - Err(e) => { - error!("Error finish sending body to downstream: {}", e); - // TODO: don't do downstream keepalive + let mut reuse_downstream = !downstream_state.is_errored(); + if reuse_downstream { + match session.as_mut().finish_body().await { + Ok(_) => { + debug!("finished sending body to downstream"); + } + Err(e) => { + error!("Error finish sending body to downstream: {}", e); + reuse_downstream = false; + } } } - Ok(()) + Ok(reuse_downstream) } async fn h1_response_filter( diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs index f133c2fa..53b2c140 100644 --- a/pingora-proxy/src/proxy_h2.rs +++ b/pingora-proxy/src/proxy_h2.rs @@ -176,7 +176,7 @@ impl HttpProxy { ); match ret { - Ok((_first, _second)) => (true, None), + Ok((downstream_can_reuse, _upstream)) => (downstream_can_reuse, None), Err(e) => (false, Some(e)), } } @@ -212,13 +212,14 @@ impl HttpProxy { (server_session_reuse, error) } + // returns whether server (downstream) session can be reused async fn bidirection_1to2( &self, session: &mut Session, client_body: &mut h2::SendStream, mut rx: mpsc::Receiver, ctx: &mut SV::CTX, - ) -> Result<()> + ) -> Result where SV: ProxyHttp + Send + Sync, SV::CTX: Send + Sync, @@ -369,16 +370,19 @@ impl HttpProxy { } } - match session.as_mut().finish_body().await { - Ok(_) => { - debug!("finished sending body to downstream"); - } - Err(e) => { - error!("Error finish sending body to downstream: {}", e); - // TODO: don't do downstream keepalive + let mut reuse_downstream = !downstream_state.is_errored(); + if reuse_downstream { + match session.as_mut().finish_body().await { + Ok(_) => { + debug!("finished sending body to downstream"); + } + Err(e) => { + error!("Error finish sending body to downstream: {}", e); + reuse_downstream = false; + } } } - Ok(()) + Ok(reuse_downstream) } async fn h2_response_filter(