From 716cbf2d8aa619d024138c2b71b3df92d002f961 Mon Sep 17 00:00:00 2001 From: Ermal Kaleci Date: Thu, 16 Nov 2023 20:50:48 +0100 Subject: [PATCH] fix tracing --- Cargo.lock | 100 ++++++------ Cargo.toml | 8 +- config.yml | 2 +- src/extensions/telemetry/mod.rs | 7 +- src/middlewares/methods/block_tag.rs | 11 +- src/middlewares/methods/cache.rs | 8 +- src/middlewares/methods/delay.rs | 11 +- src/middlewares/methods/inject_params.rs | 45 +++--- src/middlewares/methods/response.rs | 3 +- src/middlewares/methods/upstream.rs | 8 +- src/middlewares/mod.rs | 33 +++- .../subscriptions/merge_subscription.rs | 143 +++++++++--------- src/middlewares/subscriptions/upstream.rs | 119 ++++++++------- src/server.rs | 75 ++++----- 14 files changed, 319 insertions(+), 254 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 48ce690..6b8b6db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -164,7 +164,7 @@ dependencies = [ "log", "parking", "polling", - "rustix 0.37.23", + "rustix 0.37.27", "slab", "socket2 0.4.9", "waker-fn", @@ -584,9 +584,9 @@ checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" [[package]] name = "concurrent-queue" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" +checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400" dependencies = [ "crossbeam-utils", ] @@ -684,7 +684,7 @@ dependencies = [ "criterion-plot", "futures", "is-terminal", - "itertools", + "itertools 0.10.5", "num-traits", "once_cell", "oorandom", @@ -706,7 +706,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" dependencies = [ "cast", - "itertools", + "itertools 0.10.5", ] [[package]] @@ -1509,6 +1509,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.9" @@ -2093,28 +2102,35 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "opentelemetry" -version = "0.20.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9591d937bc0e6d2feb6f71a559540ab300ea49955229c347a517a28d27784c54" +checksum = "1e32339a5dc40459130b3bd269e9892439f55b33e772d2a9d402a789baaf4e8a" dependencies = [ - "opentelemetry_api", - "opentelemetry_sdk", + "futures-core", + "futures-sink", + "indexmap 2.0.0", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", ] [[package]] name = "opentelemetry-datadog" -version = "0.8.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5f4ecf595095d3b641dd2761a0c3d1f175d3d6c28f38e65418d8004ea3255dd" +checksum = "3e09667367cb509f10d7cf5960a83f9c4d96e93715f750b164b4b98d46c3cbf4" dependencies = [ "futures-core", "http", - "indexmap 1.9.3", - "itertools", + "indexmap 2.0.0", + "itertools 0.11.0", "once_cell", "opentelemetry", "opentelemetry-http", "opentelemetry-semantic-conventions", + "opentelemetry_sdk", "reqwest", "rmp", "thiserror", @@ -2123,62 +2139,47 @@ dependencies = [ [[package]] name = "opentelemetry-http" -version = "0.9.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7594ec0e11d8e33faf03530a4c49af7064ebba81c1480e01be67d90b356508b" +checksum = "7f51189ce8be654f9b5f7e70e49967ed894e84a06fc35c6c042e64ac1fc5399e" dependencies = [ "async-trait", "bytes 1.5.0", "http", - "opentelemetry_api", + "opentelemetry", "reqwest", ] [[package]] name = "opentelemetry-jaeger" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "876958ba9084f390f913fcf04ddf7bbbb822898867bb0a51cc28f2b9e5c1b515" +checksum = "e617c66fd588e40e0dbbd66932fdc87393095b125d4459b1a3a10feb1712f8a1" dependencies = [ "async-trait", "futures-core", "futures-util", "opentelemetry", "opentelemetry-semantic-conventions", + "opentelemetry_sdk", "thrift", "tokio", ] [[package]] name = "opentelemetry-semantic-conventions" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73c9f9340ad135068800e7f1b24e9e09ed9e7143f5bf8518ded3d3ec69789269" +checksum = "f5774f1ef1f982ef2a447f6ee04ec383981a3ab99c8e77a1a7b30182e65bbc84" dependencies = [ "opentelemetry", ] -[[package]] -name = "opentelemetry_api" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a81f725323db1b1206ca3da8bb19874bbd3f57c3bcd59471bfb04525b265b9b" -dependencies = [ - "futures-channel", - "futures-util", - "indexmap 1.9.3", - "js-sys", - "once_cell", - "pin-project-lite", - "thiserror", - "urlencoding", -] - [[package]] name = "opentelemetry_sdk" -version = "0.20.0" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa8e705a0612d48139799fcbaba0d4a90f06277153e43dd2bdc16c6f0edd8026" +checksum = "968ba3f2ca03e90e5187f5e4f46c791ef7f2c163ae87789c8ce5f5ca3b7b7de5" dependencies = [ "async-trait", "crossbeam-channel", @@ -2186,8 +2187,8 @@ dependencies = [ "futures-executor", "futures-util", "once_cell", - "opentelemetry_api", - "ordered-float 3.9.1", + "opentelemetry", + "ordered-float 4.1.1", "percent-encoding", "rand 0.8.5", "thiserror", @@ -2206,9 +2207,9 @@ dependencies = [ [[package]] name = "ordered-float" -version = "3.9.1" +version = "4.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a54938017eacd63036332b4ae5c8a49fc8c0c1d6d629893057e4f13609edd06" +checksum = "536900a8093134cf9ccf00a27deb3532421099e958d9dd431135d0c7543ca1e8" dependencies = [ "num-traits", ] @@ -2239,9 +2240,9 @@ dependencies = [ [[package]] name = "parking" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" [[package]] name = "parking_lot" @@ -2449,7 +2450,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" dependencies = [ "anyhow", - "itertools", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.37", @@ -2769,9 +2770,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.37.23" +version = "0.37.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06" +checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2" dependencies = [ "bitflags 1.3.2", "errno", @@ -3151,6 +3152,7 @@ dependencies = [ "opentelemetry", "opentelemetry-datadog", "opentelemetry-jaeger", + "opentelemetry_sdk", "pprof", "rand 0.8.5", "serde", @@ -3691,9 +3693,9 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "waker-fn" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" +checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" [[package]] name = "walkdir" diff --git a/Cargo.toml b/Cargo.toml index 3cf5013..9d6c0fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,9 +27,11 @@ http = "0.2.8" hyper = "0.14.23" log = "0.4.17" moka = { version = "0.11.0", features = ["future"] } -opentelemetry = { version = "0.20.0", features = ["rt-tokio"] } -opentelemetry-datadog = { version = "0.8.0", features = ["reqwest-client"] } -opentelemetry-jaeger = { version = "0.19.0", features = ["rt-tokio"] } +opentelemetry = { version = "0.21.0" } +opentelemetry-datadog = { version = "0.9.0", features = ["reqwest-client"] } +opentelemetry-jaeger = { version = "0.20.0", features = ["rt-tokio"] } +opentelemetry_sdk = { version = "0.21.1", features = ["rt-tokio", "trace"] } + rand = "0.8.5" serde = "1.0.152" serde_json = "1.0.92" diff --git a/config.yml b/config.yml index eb1629f..395578c 100644 --- a/config.yml +++ b/config.yml @@ -7,7 +7,7 @@ extensions: substrate_api: stale_timeout_seconds: 180 # rotate endpoint if no new blocks for 3 minutes telemetry: - provider: none + provider: jaeger cache: default_ttl_seconds: 60 default_size: 500 diff --git a/src/extensions/telemetry/mod.rs b/src/extensions/telemetry/mod.rs index 5b425bf..4426deb 100644 --- a/src/extensions/telemetry/mod.rs +++ b/src/extensions/telemetry/mod.rs @@ -1,7 +1,8 @@ use std::env; use async_trait::async_trait; -use opentelemetry::{global, sdk::trace::Tracer, trace::TraceError}; +use opentelemetry::{global, trace::TraceError}; +use opentelemetry_sdk::trace::Tracer; use serde::Deserialize; use super::{Extension, ExtensionRegistry}; @@ -64,7 +65,7 @@ pub fn setup_telemetry(options: &TelemetryConfig) -> Result, Trac tracer = tracer.with_endpoint(agent_endpoint.clone()); } - let tracer = tracer.install_batch(opentelemetry::runtime::Tokio)?; + let tracer = tracer.install_batch(opentelemetry_sdk::runtime::Tokio)?; Some(tracer) } @@ -80,7 +81,7 @@ pub fn setup_telemetry(options: &TelemetryConfig) -> Result, Trac tracer = tracer.with_agent_endpoint(agent_endpoint); } - let tracer = tracer.install_batch(opentelemetry::runtime::Tokio)?; + let tracer = tracer.install_batch(opentelemetry_sdk::runtime::Tokio)?; Some(tracer) } diff --git a/src/middlewares/methods/block_tag.rs b/src/middlewares/methods/block_tag.rs index 6011e6c..a3f553d 100644 --- a/src/middlewares/methods/block_tag.rs +++ b/src/middlewares/methods/block_tag.rs @@ -2,10 +2,11 @@ use std::sync::Arc; use async_trait::async_trait; use jsonrpsee::{core::JsonValue, types::ErrorObjectOwned}; +use opentelemetry::trace::FutureExt; use crate::{ extensions::api::EthApi, - middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod}, + middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod, TRACER}, utils::{TypeRegistry, TypeRegistryRef}, }; @@ -97,8 +98,12 @@ impl Middleware> for BlockTagMi context: TypeRegistry, next: NextFn>, ) -> Result { - let (request, context) = self.replace(request, context).await; - next(request, context).await + async move { + let (request, context) = self.replace(request, context).await; + next(request, context).await + } + .with_context(TRACER.context("block_tag")) + .await } } diff --git a/src/middlewares/methods/cache.rs b/src/middlewares/methods/cache.rs index 663898a..ca14b08 100644 --- a/src/middlewares/methods/cache.rs +++ b/src/middlewares/methods/cache.rs @@ -9,8 +9,8 @@ use opentelemetry::trace::FutureExt; use crate::{ config::CacheParams, extensions::cache::Cache as CacheExtension, - middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod}, - utils::{telemetry, Cache, CacheKey, TypeRegistry, TypeRegistryRef}, + middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod, TRACER}, + utils::{Cache, CacheKey, TypeRegistry, TypeRegistryRef}, }; pub struct BypassCache(pub bool); @@ -62,8 +62,6 @@ impl MiddlewareBuilder for CacheMiddleware { } } -const TRACER: telemetry::Tracer = telemetry::Tracer::new("cache-middleware"); - #[async_trait] impl Middleware> for CacheMiddleware { async fn call( @@ -95,7 +93,7 @@ impl Middleware> for CacheMiddl result } - .with_context(TRACER.context("call")) + .with_context(TRACER.context("cache")) .await } } diff --git a/src/middlewares/methods/delay.rs b/src/middlewares/methods/delay.rs index 0faa1e1..ae07888 100644 --- a/src/middlewares/methods/delay.rs +++ b/src/middlewares/methods/delay.rs @@ -1,9 +1,10 @@ use std::time::Duration; use async_trait::async_trait; +use opentelemetry::trace::FutureExt; use crate::{ - middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod}, + middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod, TRACER}, utils::{TypeRegistry, TypeRegistryRef}, }; @@ -39,7 +40,11 @@ impl Middleware for DelayMiddleware { context: TypeRegistry, next: NextFn, ) -> CallResult { - tokio::time::sleep(self.delay).await; - next(request, context).await + async move { + tokio::time::sleep(self.delay).await; + next(request, context).await + } + .with_context(TRACER.context("delay")) + .await } } diff --git a/src/middlewares/methods/inject_params.rs b/src/middlewares/methods/inject_params.rs index f43229b..94cf207 100644 --- a/src/middlewares/methods/inject_params.rs +++ b/src/middlewares/methods/inject_params.rs @@ -1,11 +1,12 @@ use async_trait::async_trait; use jsonrpsee::{core::JsonValue, types::ErrorObjectOwned}; +use opentelemetry::trace::FutureExt; use std::sync::Arc; use crate::{ config::MethodParam, extensions::api::{SubstrateApi, ValueHandle}, - middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod}, + middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod, TRACER}, utils::errors, utils::{TypeRegistry, TypeRegistryRef}, }; @@ -108,27 +109,31 @@ impl Middleware> for InjectPara return next(request, context).await; } len if len <= idx => { - // without current block - let to_inject = self.get_parameter().await; - tracing::trace!("Injected param {} to method {}", &to_inject, request.method); - let params_passed = request.params.len(); - while request.params.len() < idx { - let current = request.params.len(); - if self.params[current].optional { - request.params.push(JsonValue::Null); - } else { - let (required, optional) = self.params_count(); - return Err(errors::invalid_params(format!( - "Expected {:?} parameters ({:?} optional), {:?} found instead", - required + optional, - optional, - params_passed - ))); + async move { + // without current block + let to_inject = self.get_parameter().await; + tracing::trace!("Injected param {} to method {}", &to_inject, request.method); + let params_passed = request.params.len(); + while request.params.len() < idx { + let current = request.params.len(); + if self.params[current].optional { + request.params.push(JsonValue::Null); + } else { + let (required, optional) = self.params_count(); + return Err(errors::invalid_params(format!( + "Expected {:?} parameters ({:?} optional), {:?} found instead", + required + optional, + optional, + params_passed + ))); + } } - } - request.params.push(to_inject); + request.params.push(to_inject); - return next(request, context).await; + next(request, context).await + } + .with_context(TRACER.context("inject_params")) + .await } _ => { // unexpected number of params diff --git a/src/middlewares/methods/response.rs b/src/middlewares/methods/response.rs index 9651117..304d919 100644 --- a/src/middlewares/methods/response.rs +++ b/src/middlewares/methods/response.rs @@ -2,7 +2,7 @@ use async_trait::async_trait; use jsonrpsee::core::JsonValue; use crate::{ - middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod}, + middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod, TRACER}, utils::{TypeRegistry, TypeRegistryRef}, }; @@ -37,6 +37,7 @@ impl Middleware for ResponseMiddleware { _context: TypeRegistry, _next: NextFn, ) -> CallResult { + let _span = TRACER.context("response"); Ok(self.resp.clone()) } } diff --git a/src/middlewares/methods/upstream.rs b/src/middlewares/methods/upstream.rs index 7b73895..da819b2 100644 --- a/src/middlewares/methods/upstream.rs +++ b/src/middlewares/methods/upstream.rs @@ -2,10 +2,11 @@ use std::sync::Arc; use async_trait::async_trait; use jsonrpsee::{core::JsonValue, types::ErrorObjectOwned}; +use opentelemetry::trace::FutureExt; use crate::{ extensions::client::Client, - middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod}, + middlewares::{CallRequest, CallResult, Middleware, MiddlewareBuilder, NextFn, RpcMethod, TRACER}, utils::{TypeRegistry, TypeRegistryRef}, }; @@ -42,6 +43,9 @@ impl Middleware> for UpstreamMi _context: TypeRegistry, _next: NextFn>, ) -> Result { - self.client.request(&request.method, request.params).await + self.client + .request(&request.method, request.params) + .with_context(TRACER.context("upstream")) + .await } } diff --git a/src/middlewares/mod.rs b/src/middlewares/mod.rs index 62b76e3..93d172f 100644 --- a/src/middlewares/mod.rs +++ b/src/middlewares/mod.rs @@ -5,6 +5,7 @@ use jsonrpsee::{ types::ErrorObjectOwned, PendingSubscriptionSink, }; +use opentelemetry::trace::FutureExt as _; use std::{ fmt::{Debug, Formatter}, sync::Arc, @@ -12,18 +13,18 @@ use std::{ use crate::{ config::{RpcMethod, RpcSubscription}, - utils::{TypeRegistry, TypeRegistryRef}, + utils::{telemetry, TypeRegistry, TypeRegistryRef}, }; pub mod factory; pub mod methods; pub mod subscriptions; -#[derive(Debug)] /// Represents a RPC request made to a middleware function. pub struct CallRequest { pub method: String, pub params: Vec, + pub tracer: Option, } impl CallRequest { @@ -31,8 +32,23 @@ impl CallRequest { Self { method: method.to_string(), params, + tracer: None, } } + + pub fn with_tracer(mut self, tracer: opentelemetry::global::BoxedTracer) -> Self { + self.tracer = Some(tracer); + self + } +} + +impl Debug for CallRequest { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CallRequest") + .field("method", &self.method) + .field("params", &self.params) + .finish() + } } /// Alias for the result of a method request. @@ -92,6 +108,8 @@ impl Clone for Middlewares { } } +const TRACER: telemetry::Tracer = telemetry::Tracer::new("middlewares"); + impl Middlewares { /// Creates a new middleware instance with the given middlewares and fallback function. /// @@ -134,10 +152,13 @@ impl Middlewares> for MergeSubscript _context: TypeRegistry, _next: NextFn>, ) -> Result<(), StringError> { - let key = CacheKey::new(&request.subscribe, &request.params); - - let SubscriptionRequest { - subscribe, - params, - unsubscribe, - pending_sink, - } = request; - - let subscribe = match self - .get_upstream_subscription(key.clone(), subscribe, params.to_owned(), unsubscribe) - .await - { - Ok(subscribe) => subscribe, - Err(err) => { - pending_sink.reject(errors::map_error(err)).await; - return Ok(()); - } - }; - - // accept pending subscription - let sink = match pending_sink.accept().await { - Ok(sink) => sink, - Err(e) => { - tracing::trace!("Failed to accept pending subscription {e:?}"); - return Ok(()); - } - }; + async move { + let key = CacheKey::new(&request.subscribe, &request.params); + + let SubscriptionRequest { + subscribe, + params, + unsubscribe, + pending_sink, + } = request; + + let subscribe = match self + .get_upstream_subscription(key.clone(), subscribe, params.to_owned(), unsubscribe) + .await + { + Ok(subscribe) => subscribe, + Err(err) => { + pending_sink.reject(errors::map_error(err)).await; + return Ok(()); + } + }; + + // accept pending subscription + let sink = match pending_sink.accept().await { + Ok(sink) => sink, + Err(e) => { + tracing::trace!("Failed to accept pending subscription {e:?}"); + return Ok(()); + } + }; - let current_values = self.current_values.clone(); + let current_values = self.current_values.clone(); - // send any current value and broadcast new values - tokio::spawn(async move { - // read lock before subscribing to make sure we don't miss any value - let read_lock = current_values.read().await; - let mut stream = subscribe(); - - // send current value if any - if let Some(current_value) = read_lock - .get(&key) - .map(|x| SubscriptionMessage::from_json(&x).ok()) - .unwrap_or(None) - { - if let Err(e) = sink.send(current_value).await { - tracing::trace!("subscription sink closed {e:?}"); - return; + // send any current value and broadcast new values + tokio::spawn(async move { + // read lock before subscribing to make sure we don't miss any value + let read_lock = current_values.read().await; + let mut stream = subscribe(); + + // send current value if any + if let Some(current_value) = read_lock + .get(&key) + .map(|x| SubscriptionMessage::from_json(&x).ok()) + .unwrap_or(None) + { + if let Err(e) = sink.send(current_value).await { + tracing::trace!("subscription sink closed {e:?}"); + return; + } } - } - drop(read_lock); - - loop { - tokio::select! { - resp = stream.recv() => { - match resp { - Ok(new_value) => { - if let Err(e) = sink.send(new_value).await { - tracing::trace!("subscription sink closed {e:?}"); - break; + drop(read_lock); + + loop { + tokio::select! { + resp = stream.recv() => { + match resp { + Ok(new_value) => { + if let Err(e) = sink.send(new_value).await { + tracing::trace!("subscription sink closed {e:?}"); + break; + } + } + Err(e) => { + // this should never happen + tracing::error!("subscription stream error {e:?}"); + unreachable!("subscription stream error {e:?}"); } - } - Err(e) => { - // this should never happen - tracing::error!("subscription stream error {e:?}"); - unreachable!("subscription stream error {e:?}"); } } - } - _ = sink.closed() => { - tracing::trace!("subscription sink closed"); - break; + _ = sink.closed() => { + tracing::trace!("subscription sink closed"); + break; + } } } - } - }); + }); - Ok(()) + Ok(()) + } + .with_context(TRACER.context("merge_subscription")) + .await } } diff --git a/src/middlewares/subscriptions/upstream.rs b/src/middlewares/subscriptions/upstream.rs index 378d777..7145bb0 100644 --- a/src/middlewares/subscriptions/upstream.rs +++ b/src/middlewares/subscriptions/upstream.rs @@ -2,10 +2,13 @@ use std::sync::Arc; use async_trait::async_trait; use jsonrpsee::SubscriptionMessage; +use opentelemetry::trace::FutureExt; use crate::{ extensions::client::Client, - middlewares::{Middleware, MiddlewareBuilder, NextFn, RpcSubscription, SubscriptionRequest, SubscriptionResult}, + middlewares::{ + Middleware, MiddlewareBuilder, NextFn, RpcSubscription, SubscriptionRequest, SubscriptionResult, TRACER, + }, utils::{errors, TypeRegistry, TypeRegistryRef}, }; @@ -42,71 +45,77 @@ impl Middleware for UpstreamMiddleware _context: TypeRegistry, _next: NextFn, ) -> SubscriptionResult { - let SubscriptionRequest { - subscribe, - params, - unsubscribe, - pending_sink, - } = request; + async move { + let SubscriptionRequest { + subscribe, + params, + unsubscribe, + pending_sink, + } = request; - let result = self.client.subscribe(&subscribe, params, &unsubscribe).await; + let result = self.client.subscribe(&subscribe, params, &unsubscribe).await; - let (mut subscription, sink) = match result { - // subscription was successful, accept the sink - Ok(sub) => match pending_sink.accept().await { - Ok(sink) => (sub, sink), - Err(e) => { - tracing::trace!("Failed to accept pending subscription {:?}", e); - // sink was closed before we could accept it, unsubscribe remote upstream - if let Err(err) = sub.unsubscribe().await { - tracing::error!("Failed to unsubscribe: {}", err); + let (mut subscription, sink) = match result { + // subscription was successful, accept the sink + Ok(sub) => match pending_sink.accept().await { + Ok(sink) => (sub, sink), + Err(e) => { + tracing::trace!("Failed to accept pending subscription {:?}", e); + // sink was closed before we could accept it, unsubscribe remote upstream + if let Err(err) = sub.unsubscribe().await { + tracing::error!("Failed to unsubscribe: {}", err); + } + return Ok(()); } + }, + // subscription failed, reject the sink + Err(e) => { + pending_sink.reject(errors::map_error(e)).await; return Ok(()); } - }, - // subscription failed, reject the sink - Err(e) => { - pending_sink.reject(errors::map_error(e)).await; - return Ok(()); - } - }; + }; - loop { - tokio::select! { - msg = subscription.next() => { - match msg { - Some(resp) => { - let resp = match resp { - Ok(resp) => resp, - Err(e) => { - tracing::error!("Subscription error: {}", e); - continue; - } - }; - let resp = match SubscriptionMessage::from_json(&resp) { - Ok(resp) => resp, - Err(e) => { - tracing::error!("Failed to serialize subscription response: {}", e); - continue; + tokio::spawn(async move { + loop { + tokio::select! { + msg = subscription.next() => { + match msg { + Some(resp) => { + let resp = match resp { + Ok(resp) => resp, + Err(e) => { + tracing::error!("Subscription error: {}", e); + continue; + } + }; + let resp = match SubscriptionMessage::from_json(&resp) { + Ok(resp) => resp, + Err(e) => { + tracing::error!("Failed to serialize subscription response: {}", e); + continue; + } + }; + if let Err(e) = sink.send(resp).await { + tracing::error!("Failed to send subscription response: {}", e); + break; + } } - }; - if let Err(e) = sink.send(resp).await { - tracing::error!("Failed to send subscription response: {}", e); - break; + None => break, } } - None => break, + _ = sink.closed() => { + if let Err(err) = subscription.unsubscribe().await { + tracing::error!("Failed to unsubscribe: {}", err); + } + break + }, } } - _ = sink.closed() => { - if let Err(err) = subscription.unsubscribe().await { - tracing::error!("Failed to unsubscribe: {}", err); - } - break - }, - } - } + }); - Ok(()) + Ok(()) + } + .with_context(TRACER.context("upstream")) + .await } } diff --git a/src/server.rs b/src/server.rs index 7600e94..95e4fab 100644 --- a/src/server.rs +++ b/src/server.rs @@ -120,41 +120,46 @@ pub async fn build(config: Config) -> anyhow::Result { Arc::new(|_, _| async { Err("Bad configuration".into()) }.boxed()), ); - module.register_subscription(subscribe_name, name, unsubscribe_name, move |params, sink, _| { - let subscription_middlewares = subscription_middlewares.clone(); - - async move { - let cx = tracer.context(name); - - let parsed = params.parse::()?; - let params = if parsed == JsonValue::Null { - vec![] - } else { - parsed.as_array().ok_or_else(|| errors::invalid_params(""))?.to_owned() - }; - - let (result_tx, result_rx) = tokio::sync::oneshot::channel(); - let timeout = tokio::time::Duration::from_secs(request_timeout_seconds); - - subscription_middlewares - .call( - SubscriptionRequest { - subscribe: subscribe_name.into(), - params, - unsubscribe: unsubscribe_name.into(), - pending_sink: sink, - }, - result_tx, - timeout, - ) - .with_context(cx) - .await; - - result_rx - .await - .map_err(|_| errors::map_error(jsonrpsee::core::Error::RequestTimeout))? - } - })?; + module.register_subscription( + subscribe_name, + name, + unsubscribe_name, + move |params, pending_sink, _| { + let subscription_middlewares = subscription_middlewares.clone(); + + async move { + let cx = tracer.context(name); + + let parsed = params.parse::()?; + let params = if parsed == JsonValue::Null { + vec![] + } else { + parsed.as_array().ok_or_else(|| errors::invalid_params(""))?.to_owned() + }; + + let (result_tx, result_rx) = tokio::sync::oneshot::channel(); + let timeout = tokio::time::Duration::from_secs(request_timeout_seconds); + + subscription_middlewares + .call( + SubscriptionRequest { + subscribe: subscribe_name.into(), + params, + unsubscribe: unsubscribe_name.into(), + pending_sink, + }, + result_tx, + timeout, + ) + .with_context(cx) + .await; + + result_rx + .await + .map_err(|_| errors::map_error(jsonrpsee::core::Error::RequestTimeout))? + } + }, + )?; } // register aliases from config