Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update span error #144

Merged
merged 2 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 5 additions & 27 deletions src/extensions/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use jsonrpsee::{
client::{ClientT, Subscription, SubscriptionClientT},
Error, JsonValue,
},
types::ErrorObjectOwned,
ws_client::{WsClient, WsClientBuilder},
};
use opentelemetry::trace::FutureExt;
Expand All @@ -25,6 +24,7 @@ use tokio::sync::Notify;
use super::ExtensionRegistry;
use crate::{
extensions::Extension,
middlewares::CallResult,
utils::{self, errors},
};

Expand Down Expand Up @@ -378,7 +378,7 @@ impl Client {
Self::new(endpoints, None, None, None)
}

pub async fn request(&self, method: &str, params: Vec<JsonValue>) -> Result<JsonValue, ErrorObjectOwned> {
pub async fn request(&self, method: &str, params: Vec<JsonValue>) -> CallResult {
async move {
let (tx, rx) = tokio::sync::oneshot::channel();
self.sender
Expand All @@ -391,18 +391,7 @@ impl Client {
.await
.map_err(errors::internal_error)?;

let result = rx.await.map_err(errors::internal_error)?.map_err(errors::map_error);

match result.as_ref() {
Ok(_) => {
TRACER.span_ok();
}
Err(err) => {
TRACER.span_error(format!("{}", err));
}
}

result
rx.await.map_err(errors::internal_error)?.map_err(errors::map_error)
}
.with_context(TRACER.context(method.to_string()))
.await
Expand All @@ -425,20 +414,9 @@ impl Client {
retries: self.retries,
})
.await
.map_err(errors::failed)?;

let result = rx.await.map_err(errors::failed)?;

match result.as_ref() {
Ok(_) => {
TRACER.span_ok();
}
Err(err) => {
TRACER.span_error(format!("{}", err));
}
};
.map_err(errors::internal_error)?;

result
rx.await.map_err(errors::internal_error)?
}
.with_context(TRACER.context(subscribe.to_string()))
.await
Expand Down
9 changes: 4 additions & 5 deletions src/middlewares/methods/block_tag.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::Arc;

use async_trait::async_trait;
use jsonrpsee::{core::JsonValue, types::ErrorObjectOwned};
use opentelemetry::trace::FutureExt;

use crate::{
Expand Down Expand Up @@ -91,13 +90,13 @@ impl BlockTagMiddleware {
}

#[async_trait]
impl Middleware<CallRequest, Result<JsonValue, ErrorObjectOwned>> for BlockTagMiddleware {
impl Middleware<CallRequest, CallResult> for BlockTagMiddleware {
async fn call(
&self,
request: CallRequest,
context: TypeRegistry,
next: NextFn<CallRequest, Result<JsonValue, ErrorObjectOwned>>,
) -> Result<JsonValue, ErrorObjectOwned> {
next: NextFn<CallRequest, CallResult>,
) -> CallResult {
async move {
let (request, context) = self.replace(request, context).await;
next(request, context).await
Expand All @@ -119,7 +118,7 @@ mod tests {
Client,
};
use futures::FutureExt;
use jsonrpsee::server::ServerHandle;
use jsonrpsee::{core::JsonValue, server::ServerHandle};
use serde_json::json;
use std::time::Duration;
use tokio::sync::mpsc;
Expand Down
8 changes: 4 additions & 4 deletions src/middlewares/methods/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::num::NonZeroUsize;
use async_trait::async_trait;
use blake2::Blake2b512;
use futures::FutureExt as _;
use jsonrpsee::{core::JsonValue, types::ErrorObjectOwned};
use opentelemetry::trace::FutureExt;

use crate::{
Expand Down Expand Up @@ -63,13 +62,13 @@ impl MiddlewareBuilder<RpcMethod, CallRequest, CallResult> for CacheMiddleware {
}

#[async_trait]
impl Middleware<CallRequest, Result<JsonValue, ErrorObjectOwned>> for CacheMiddleware {
impl Middleware<CallRequest, CallResult> for CacheMiddleware {
async fn call(
&self,
request: CallRequest,
context: TypeRegistry,
next: NextFn<CallRequest, Result<JsonValue, ErrorObjectOwned>>,
) -> Result<JsonValue, ErrorObjectOwned> {
next: NextFn<CallRequest, CallResult>,
) -> CallResult {
async move {
let bypass_cache = context.get::<BypassCache>().map(|v| v.0).unwrap_or(false);
if bypass_cache {
Expand Down Expand Up @@ -101,6 +100,7 @@ impl Middleware<CallRequest, Result<JsonValue, ErrorObjectOwned>> for CacheMiddl
#[cfg(test)]
mod tests {
use futures::FutureExt;
use jsonrpsee::core::JsonValue;
use serde_json::json;
use std::num::NonZeroUsize;
use std::time::Duration;
Expand Down
8 changes: 4 additions & 4 deletions src/middlewares/methods/inject_params.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use async_trait::async_trait;
use jsonrpsee::{core::JsonValue, types::ErrorObjectOwned};
use jsonrpsee::core::JsonValue;
use opentelemetry::trace::FutureExt;
use std::sync::Arc;

Expand Down Expand Up @@ -95,13 +95,13 @@ impl InjectParamsMiddleware {
}

#[async_trait]
impl Middleware<CallRequest, Result<JsonValue, ErrorObjectOwned>> for InjectParamsMiddleware {
impl Middleware<CallRequest, CallResult> for InjectParamsMiddleware {
async fn call(
&self,
mut request: CallRequest,
context: TypeRegistry,
next: NextFn<CallRequest, Result<JsonValue, ErrorObjectOwned>>,
) -> Result<JsonValue, ErrorObjectOwned> {
next: NextFn<CallRequest, CallResult>,
) -> CallResult {
let idx = self.get_index();
match request.params.len() {
len if len == idx + 1 => {
Expand Down
7 changes: 3 additions & 4 deletions src/middlewares/methods/upstream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::Arc;

use async_trait::async_trait;
use jsonrpsee::{core::JsonValue, types::ErrorObjectOwned};
use opentelemetry::trace::FutureExt;

use crate::{
Expand Down Expand Up @@ -36,13 +35,13 @@ impl MiddlewareBuilder<RpcMethod, CallRequest, CallResult> for UpstreamMiddlewar
}

#[async_trait]
impl Middleware<CallRequest, Result<JsonValue, ErrorObjectOwned>> for UpstreamMiddleware {
impl Middleware<CallRequest, CallResult> for UpstreamMiddleware {
async fn call(
&self,
request: CallRequest,
_context: TypeRegistry,
_next: NextFn<CallRequest, Result<JsonValue, ErrorObjectOwned>>,
) -> Result<JsonValue, ErrorObjectOwned> {
_next: NextFn<CallRequest, CallResult>,
) -> CallResult {
self.client
.request(&request.method, request.params)
.with_context(TRACER.context("upstream"))
Expand Down
4 changes: 2 additions & 2 deletions src/middlewares/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{

use crate::{
config::{RpcMethod, RpcSubscription},
utils::{telemetry, TypeRegistry, TypeRegistryRef},
utils::{errors, telemetry, TypeRegistry, TypeRegistryRef},
};

pub mod factory;
Expand Down Expand Up @@ -154,7 +154,7 @@ impl<Request: Debug + Send + 'static, Result: Send + 'static> Middlewares<Reques
tokio::select! {
_ = sleep => {
tracing::error!("middlewares timeout: {req}");
TRACER.span_error("middlewares timeout");
TRACER.span_error(&errors::failed("middlewares timeout"));
task_handle.abort();
}
_ = &mut task_handle => {
Expand Down
11 changes: 4 additions & 7 deletions src/middlewares/subscriptions/merge_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ use std::{

use async_trait::async_trait;
use blake2::Blake2b512;
use jsonrpsee::{
core::{JsonValue, StringError},
SubscriptionMessage,
};
use jsonrpsee::{core::JsonValue, SubscriptionMessage};
use opentelemetry::trace::FutureExt;
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, RwLock};
Expand Down Expand Up @@ -199,13 +196,13 @@ impl MiddlewareBuilder<RpcSubscription, SubscriptionRequest, SubscriptionResult>
}

#[async_trait]
impl Middleware<SubscriptionRequest, Result<(), StringError>> for MergeSubscriptionMiddleware {
impl Middleware<SubscriptionRequest, SubscriptionResult> for MergeSubscriptionMiddleware {
async fn call(
&self,
request: SubscriptionRequest,
_context: TypeRegistry,
_next: NextFn<SubscriptionRequest, Result<(), StringError>>,
) -> Result<(), StringError> {
_next: NextFn<SubscriptionRequest, SubscriptionResult>,
) -> SubscriptionResult {
async move {
let key = CacheKey::new(&request.subscribe, &request.params);

Expand Down
4 changes: 2 additions & 2 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub async fn build(config: Config) -> anyhow::Result<SubwayServerHandle> {
match result.as_ref() {
Ok(_) => tracer.span_ok(),
Err(err) => {
tracer.span_error(format!("{}", err));
tracer.span_error(err);
}
};

Expand Down Expand Up @@ -165,7 +165,7 @@ pub async fn build(config: Config) -> anyhow::Result<SubwayServerHandle> {
tracer.span_ok();
}
Err(err) => {
tracer.span_error(format!("{:?}", err));
tracer.span_error(&errors::failed(format!("{:?}", err)));
}
};

Expand Down
5 changes: 3 additions & 2 deletions src/utils/cache.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::middlewares::CallResult;
use blake2::{digest::Output, Digest};
use futures::future::BoxFuture;
use jsonrpsee::core::JsonValue;
Expand Down Expand Up @@ -94,9 +95,9 @@ impl<D: Digest + 'static> Cache<D> {
self.cache.insert(key, CacheValue::Value(value)).await;
}

pub async fn get_or_insert_with<F>(&self, key: CacheKey<D>, f: F) -> Result<JsonValue, ErrorObjectOwned>
pub async fn get_or_insert_with<F>(&self, key: CacheKey<D>, f: F) -> CallResult
where
F: FnOnce() -> BoxFuture<'static, Result<JsonValue, ErrorObjectOwned>>,
F: FnOnce() -> BoxFuture<'static, CallResult>,
{
let fetch = || async {
let (tx, rx) = watch::channel(None);
Expand Down
10 changes: 6 additions & 4 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub mod errors {
}

pub mod telemetry {
use jsonrpsee::{types::error::ErrorCode, types::ErrorObjectOwned};
use opentelemetry::{
global::{self, BoxedSpan},
trace::{get_active_span, Status, TraceContextExt, Tracer as _},
Expand Down Expand Up @@ -69,11 +70,12 @@ pub mod telemetry {
});
}

pub fn span_error(&self, err: impl Into<Cow<'static, str>>) {
pub fn span_error(&self, err: &ErrorObjectOwned) {
get_active_span(|span| {
let err_msg = err.into();
span.set_status(Status::error(format!("{}", err_msg)));
span.set_attribute(KeyValue::new("error.message", err_msg));
span.set_status(Status::error(err.message().to_string()));
span.set_attribute(KeyValue::new("error.type", format!("{}", ErrorCode::from(err.code()))));
span.set_attribute(KeyValue::new("error.msg", err.message().to_string()));
span.set_attribute(KeyValue::new("error.stack", format!("{}", err)));
});
}
}
Expand Down