diff --git a/linkerd/app/integration/src/tests/telemetry.rs b/linkerd/app/integration/src/tests/telemetry.rs index 3b466a0a0c..4d3bda9abc 100644 --- a/linkerd/app/integration/src/tests/telemetry.rs +++ b/linkerd/app/integration/src/tests/telemetry.rs @@ -7,6 +7,8 @@ mod env; mod log_stream; mod tcp_errors; +use app_core::Infallible; + use crate::*; use std::io::Read; @@ -33,6 +35,28 @@ struct TcpFixture { } impl Fixture { + async fn http_chunked_server(send_chunks: usize, chunk_size: usize) -> server::Listening { + server::new() + .route_async("/", move |_req| async move { + let chunk: bytes::Bytes = (0..chunk_size).map(|u| u as u8).collect(); + let body = { + let (mut tx, body) = hyper::Body::channel(); + + tokio::spawn(async move { + for nth in 0..send_chunks { + tx.send_data(chunk.clone()) + .await + .expect(&format!("failed to send {nth} chunk")); + } + }); + body + }; + Ok::<_, Infallible>(http::Response::new(body)) + }) + .run() + .await + } + async fn inbound() -> Self { info!("running test server"); Fixture::inbound_with_server(server::new().route("/", "hello").run().await).await @@ -211,6 +235,35 @@ impl TcpFixture { } } +async fn test_http_response_frames_count( + fixture: impl Future, + expected_chunks: usize, +) { + let _trace = trace_init(); + let Fixture { + client, + metrics, + proxy: _proxy, + _profile, + dst_tx: _dst_tx, + pol_out_tx: _pol_out_tx, + labels, + .. + } = fixture.await; + + let metric = labels.metric("response_frames_total"); + + assert!(metric.is_not_in(metrics.get("/metrics").await)); + + info!("client.get(/)"); + client.get("/").await; + + metric + .value(expected_chunks as u64) + .assert_in(&metrics) + .await; +} + #[tokio::test] async fn admin_request_count() { let _trace = trace_init(); @@ -279,6 +332,28 @@ async fn metrics_endpoint_inbound_request_count() { test_http_count("request_total", Fixture::inbound()).await; } +#[tokio::test] +async fn metrics_endpoint_inbound_response_frames_total() { + for chunks in 1..=10 { + test_http_response_frames_count( + Fixture::inbound_with_server(Fixture::http_chunked_server(chunks, 64).await), + chunks + 1, + ) + .await; + } +} + +#[tokio::test] +async fn metrics_endpoint_outbound_response_frames_total() { + for chunks in 1..=10 { + test_http_response_frames_count( + Fixture::outbound_with_server(Fixture::http_chunked_server(chunks, 64).await), + chunks + 1, + ) + .await; + } +} + #[tokio::test] async fn metrics_endpoint_outbound_request_count() { test_http_count("request_total", Fixture::outbound()).await diff --git a/linkerd/http/metrics/src/requests.rs b/linkerd/http/metrics/src/requests.rs index 7a0df94b87..e755ef7fd8 100644 --- a/linkerd/http/metrics/src/requests.rs +++ b/linkerd/http/metrics/src/requests.rs @@ -24,6 +24,7 @@ where { last_update: Instant, total: Counter, + response_frames_total: Counter, by_status: HashMap, StatusMetrics>, } @@ -82,6 +83,7 @@ impl Default for Metrics { Self { last_update: Instant::now(), total: Counter::default(), + response_frames_total: Counter::default(), by_status: HashMap::default(), } } diff --git a/linkerd/http/metrics/src/requests/report.rs b/linkerd/http/metrics/src/requests/report.rs index 9b4b7e5766..ca3dba9125 100644 --- a/linkerd/http/metrics/src/requests/report.rs +++ b/linkerd/http/metrics/src/requests/report.rs @@ -30,6 +30,13 @@ where ) } + fn response_frames_total(&self) -> Metric<'_, Prefixed<'_, &'static str>, Counter> { + Metric::new( + self.prefix_key("response_frames_total"), + "Total number of chunks sent through HTTP as the response.", + ) + } + fn response_latency_ms( &self, ) -> Metric<'_, Prefixed<'_, &'static str>, Histogram> { @@ -128,6 +135,10 @@ where metric.fmt_help(f)?; Self::fmt_by_target(®istry, f, metric, |s| &s.total)?; + let metric = self.response_frames_total(); + metric.fmt_help(f)?; + Self::fmt_by_target(®istry, f, metric, |s| &s.response_frames_total)?; + if self.include_latencies { let metric = self.response_latency_ms(); metric.fmt_help(f)?; diff --git a/linkerd/http/metrics/src/requests/service.rs b/linkerd/http/metrics/src/requests/service.rs index ad1af7ecbc..9e0fe6c4e9 100644 --- a/linkerd/http/metrics/src/requests/service.rs +++ b/linkerd/http/metrics/src/requests/service.rs @@ -333,9 +333,8 @@ where C: ClassifyEos, C::Class: Hash + Eq, { - fn record_latency(self: Pin<&mut Self>) { + fn record_latency(self: Pin<&mut Self>, now: Instant) { let this = self.project(); - let now = Instant::now(); let lock = match this.metrics.as_mut() { Some(lock) => lock, @@ -377,6 +376,11 @@ where } } +fn count_frame(lock: &Mutex>) { + let metrics = lock.lock(); + metrics.response_frames_total.incr(); +} + fn measure_class( lock: &Arc>>, class: C, @@ -415,8 +419,11 @@ where let poll = ready!(self.as_mut().project().inner.poll_data(cx)); let frame = poll.map(|opt| opt.map_err(|e| self.as_mut().measure_err(e.into()))); + if let Some(lock) = self.metrics.as_ref().map(Arc::as_ref) { + count_frame(lock); + } if !(*self.as_mut().project().latency_recorded) { - self.record_latency(); + self.record_latency(Instant::now()); } Poll::Ready(frame) @@ -457,7 +464,7 @@ where { fn drop(mut self: Pin<&mut Self>) { if !self.as_ref().latency_recorded { - self.as_mut().record_latency(); + self.as_mut().record_latency(Instant::now()); } if let Some(c) = self.as_mut().project().classify.take().map(|c| c.eos(None)) {