From 1430e2f6792e3a9b70197b72dfd29509445abb88 Mon Sep 17 00:00:00 2001 From: Ermal Kaleci Date: Fri, 24 Nov 2023 21:41:19 +0100 Subject: [PATCH 1/2] update span error --- src/extensions/client/mod.rs | 32 +++---------------- src/middlewares/methods/block_tag.rs | 9 +++--- src/middlewares/methods/cache.rs | 8 ++--- src/middlewares/methods/inject_params.rs | 8 ++--- src/middlewares/methods/upstream.rs | 7 ++-- src/middlewares/mod.rs | 4 +-- .../subscriptions/merge_subscription.rs | 11 +++---- src/server.rs | 4 +-- src/utils/cache.rs | 5 +-- src/utils/mod.rs | 10 +++--- 10 files changed, 37 insertions(+), 61 deletions(-) diff --git a/src/extensions/client/mod.rs b/src/extensions/client/mod.rs index af27525..a05441d 100644 --- a/src/extensions/client/mod.rs +++ b/src/extensions/client/mod.rs @@ -14,7 +14,6 @@ use jsonrpsee::{ client::{ClientT, Subscription, SubscriptionClientT}, Error, JsonValue, }, - types::ErrorObjectOwned, ws_client::{WsClient, WsClientBuilder}, }; use opentelemetry::trace::FutureExt; @@ -25,6 +24,7 @@ use tokio::sync::Notify; use super::ExtensionRegistry; use crate::{ extensions::Extension, + middlewares::CallResult, utils::{self, errors}, }; @@ -378,7 +378,7 @@ impl Client { Self::new(endpoints, None, None, None) } - pub async fn request(&self, method: &str, params: Vec) -> Result { + pub async fn request(&self, method: &str, params: Vec) -> CallResult { async move { let (tx, rx) = tokio::sync::oneshot::channel(); self.sender @@ -391,18 +391,7 @@ impl Client { .await .map_err(errors::internal_error)?; - let result = rx.await.map_err(errors::internal_error)?.map_err(errors::map_error); - - match result.as_ref() { - Ok(_) => { - TRACER.span_ok(); - } - Err(err) => { - TRACER.span_error(format!("{}", err)); - } - } - - result + rx.await.map_err(errors::internal_error)?.map_err(errors::map_error) } .with_context(TRACER.context(method.to_string())) .await @@ -425,20 +414,9 @@ impl Client { retries: self.retries, }) .await - .map_err(errors::failed)?; - - let result = rx.await.map_err(errors::failed)?; - - match result.as_ref() { - Ok(_) => { - TRACER.span_ok(); - } - Err(err) => { - TRACER.span_error(format!("{}", err)); - } - }; + .map_err(errors::internal_error)?; - result + rx.await.map_err(errors::internal_error)? } .with_context(TRACER.context(subscribe.to_string())) .await diff --git a/src/middlewares/methods/block_tag.rs b/src/middlewares/methods/block_tag.rs index a3f553d..531e497 100644 --- a/src/middlewares/methods/block_tag.rs +++ b/src/middlewares/methods/block_tag.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use async_trait::async_trait; -use jsonrpsee::{core::JsonValue, types::ErrorObjectOwned}; use opentelemetry::trace::FutureExt; use crate::{ @@ -91,13 +90,13 @@ impl BlockTagMiddleware { } #[async_trait] -impl Middleware> for BlockTagMiddleware { +impl Middleware for BlockTagMiddleware { async fn call( &self, request: CallRequest, context: TypeRegistry, - next: NextFn>, - ) -> Result { + next: NextFn, + ) -> CallResult { async move { let (request, context) = self.replace(request, context).await; next(request, context).await @@ -119,7 +118,7 @@ mod tests { Client, }; use futures::FutureExt; - use jsonrpsee::server::ServerHandle; + use jsonrpsee::{core::JsonValue, server::ServerHandle}; use serde_json::json; use std::time::Duration; use tokio::sync::mpsc; diff --git a/src/middlewares/methods/cache.rs b/src/middlewares/methods/cache.rs index 6f09be6..9f9704c 100644 --- a/src/middlewares/methods/cache.rs +++ b/src/middlewares/methods/cache.rs @@ -3,7 +3,6 @@ use std::num::NonZeroUsize; use async_trait::async_trait; use blake2::Blake2b512; use futures::FutureExt as _; -use jsonrpsee::{core::JsonValue, types::ErrorObjectOwned}; use opentelemetry::trace::FutureExt; use crate::{ @@ -63,13 +62,13 @@ impl MiddlewareBuilder for CacheMiddleware { } #[async_trait] -impl Middleware> for CacheMiddleware { +impl Middleware for CacheMiddleware { async fn call( &self, request: CallRequest, context: TypeRegistry, - next: NextFn>, - ) -> Result { + next: NextFn, + ) -> CallResult { async move { let bypass_cache = context.get::().map(|v| v.0).unwrap_or(false); if bypass_cache { @@ -101,6 +100,7 @@ impl Middleware> for CacheMiddl #[cfg(test)] mod tests { use futures::FutureExt; + use jsonrpsee::core::JsonValue; use serde_json::json; use std::num::NonZeroUsize; use std::time::Duration; diff --git a/src/middlewares/methods/inject_params.rs b/src/middlewares/methods/inject_params.rs index 94cf207..6992fdc 100644 --- a/src/middlewares/methods/inject_params.rs +++ b/src/middlewares/methods/inject_params.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use jsonrpsee::{core::JsonValue, types::ErrorObjectOwned}; +use jsonrpsee::core::JsonValue; use opentelemetry::trace::FutureExt; use std::sync::Arc; @@ -95,13 +95,13 @@ impl InjectParamsMiddleware { } #[async_trait] -impl Middleware> for InjectParamsMiddleware { +impl Middleware for InjectParamsMiddleware { async fn call( &self, mut request: CallRequest, context: TypeRegistry, - next: NextFn>, - ) -> Result { + next: NextFn, + ) -> CallResult { let idx = self.get_index(); match request.params.len() { len if len == idx + 1 => { diff --git a/src/middlewares/methods/upstream.rs b/src/middlewares/methods/upstream.rs index da819b2..ba7ead4 100644 --- a/src/middlewares/methods/upstream.rs +++ b/src/middlewares/methods/upstream.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use async_trait::async_trait; -use jsonrpsee::{core::JsonValue, types::ErrorObjectOwned}; use opentelemetry::trace::FutureExt; use crate::{ @@ -36,13 +35,13 @@ impl MiddlewareBuilder for UpstreamMiddlewar } #[async_trait] -impl Middleware> for UpstreamMiddleware { +impl Middleware for UpstreamMiddleware { async fn call( &self, request: CallRequest, _context: TypeRegistry, - _next: NextFn>, - ) -> Result { + _next: NextFn, + ) -> CallResult { self.client .request(&request.method, request.params) .with_context(TRACER.context("upstream")) diff --git a/src/middlewares/mod.rs b/src/middlewares/mod.rs index 93e2228..03871b9 100644 --- a/src/middlewares/mod.rs +++ b/src/middlewares/mod.rs @@ -13,7 +13,7 @@ use std::{ use crate::{ config::{RpcMethod, RpcSubscription}, - utils::{telemetry, TypeRegistry, TypeRegistryRef}, + utils::{errors, telemetry, TypeRegistry, TypeRegistryRef}, }; pub mod factory; @@ -154,7 +154,7 @@ impl Middlewares { tracing::error!("middlewares timeout: {req}"); - TRACER.span_error("middlewares timeout"); + TRACER.span_error(&errors::failed("middlewares timeout")); task_handle.abort(); } _ = &mut task_handle => { diff --git a/src/middlewares/subscriptions/merge_subscription.rs b/src/middlewares/subscriptions/merge_subscription.rs index 5fb29fe..cbc20f1 100644 --- a/src/middlewares/subscriptions/merge_subscription.rs +++ b/src/middlewares/subscriptions/merge_subscription.rs @@ -6,10 +6,7 @@ use std::{ use async_trait::async_trait; use blake2::Blake2b512; -use jsonrpsee::{ - core::{JsonValue, StringError}, - SubscriptionMessage, -}; +use jsonrpsee::{core::JsonValue, SubscriptionMessage}; use opentelemetry::trace::FutureExt; use serde::{Deserialize, Serialize}; use tokio::sync::{broadcast, RwLock}; @@ -199,13 +196,13 @@ impl MiddlewareBuilder } #[async_trait] -impl Middleware> for MergeSubscriptionMiddleware { +impl Middleware for MergeSubscriptionMiddleware { async fn call( &self, request: SubscriptionRequest, _context: TypeRegistry, - _next: NextFn>, - ) -> Result<(), StringError> { + _next: NextFn, + ) -> SubscriptionResult { async move { let key = CacheKey::new(&request.subscribe, &request.params); diff --git a/src/server.rs b/src/server.rs index b01c0a1..8660917 100644 --- a/src/server.rs +++ b/src/server.rs @@ -95,7 +95,7 @@ pub async fn build(config: Config) -> anyhow::Result { match result.as_ref() { Ok(_) => tracer.span_ok(), Err(err) => { - tracer.span_error(format!("{}", err)); + tracer.span_error(err); } }; @@ -165,7 +165,7 @@ pub async fn build(config: Config) -> anyhow::Result { tracer.span_ok(); } Err(err) => { - tracer.span_error(format!("{:?}", err)); + tracer.span_error(&errors::failed(format!("{:?}", err))); } }; diff --git a/src/utils/cache.rs b/src/utils/cache.rs index 613fae8..439c602 100644 --- a/src/utils/cache.rs +++ b/src/utils/cache.rs @@ -1,3 +1,4 @@ +use crate::middlewares::CallResult; use blake2::{digest::Output, Digest}; use futures::future::BoxFuture; use jsonrpsee::core::JsonValue; @@ -94,9 +95,9 @@ impl Cache { self.cache.insert(key, CacheValue::Value(value)).await; } - pub async fn get_or_insert_with(&self, key: CacheKey, f: F) -> Result + pub async fn get_or_insert_with(&self, key: CacheKey, f: F) -> CallResult where - F: FnOnce() -> BoxFuture<'static, Result>, + F: FnOnce() -> BoxFuture<'static, CallResult>, { let fetch = || async { let (tx, rx) = watch::channel(None); diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 8d4d6a4..2f408e1 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -39,6 +39,7 @@ pub mod errors { } pub mod telemetry { + use jsonrpsee::{types::error::ErrorCode, types::ErrorObjectOwned}; use opentelemetry::{ global::{self, BoxedSpan}, trace::{get_active_span, Status, TraceContextExt, Tracer as _}, @@ -69,11 +70,12 @@ pub mod telemetry { }); } - pub fn span_error(&self, err: impl Into>) { + pub fn span_error(&self, err: &ErrorObjectOwned) { get_active_span(|span| { - let err_msg = err.into(); - span.set_status(Status::error(format!("{}", err_msg))); - span.set_attribute(KeyValue::new("error.message", err_msg)); + span.set_status(Status::error(format!("{}", err.message()))); + span.set_attribute(KeyValue::new("error.type", format!("{}", ErrorCode::from(err.code())))); + span.set_attribute(KeyValue::new("error.msg", format!("{}", err.message()))); + span.set_attribute(KeyValue::new("error.stack", format!("{}", err))); }); } } From 9006c38d523e0b0fdb7bc57437d1f10be342ba29 Mon Sep 17 00:00:00 2001 From: Ermal Kaleci Date: Fri, 24 Nov 2023 21:55:19 +0100 Subject: [PATCH 2/2] clippy --- src/utils/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 2f408e1..2fe38c8 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -72,9 +72,9 @@ pub mod telemetry { pub fn span_error(&self, err: &ErrorObjectOwned) { get_active_span(|span| { - span.set_status(Status::error(format!("{}", err.message()))); + span.set_status(Status::error(err.message().to_string())); span.set_attribute(KeyValue::new("error.type", format!("{}", ErrorCode::from(err.code())))); - span.set_attribute(KeyValue::new("error.msg", format!("{}", err.message()))); + span.set_attribute(KeyValue::new("error.msg", err.message().to_string())); span.set_attribute(KeyValue::new("error.stack", format!("{}", err))); }); }