diff --git a/src/extensions/prometheus/rpc_metrics.rs b/src/extensions/prometheus/rpc_metrics.rs index 9a09869..37aadaa 100644 --- a/src/extensions/prometheus/rpc_metrics.rs +++ b/src/extensions/prometheus/rpc_metrics.rs @@ -1,4 +1,4 @@ -use prometheus_endpoint::{register, Counter, CounterVec, Opts, Registry, U64}; +use prometheus_endpoint::{register, Counter, CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64}; #[derive(Clone)] pub enum RpcMetrics { @@ -37,6 +37,18 @@ impl RpcMetrics { inner.cache_miss(method); } } + + pub fn call_metrics(&self) -> Option<(HistogramVec, CounterVec, CounterVec)> { + if let Self::Prometheus(inner) = self { + return Some(( + inner.call_times.clone(), + inner.calls_started.clone(), + inner.calls_finished.clone(), + )); + } + + None + } } #[derive(Clone)] @@ -45,6 +57,9 @@ pub struct InnerMetrics { closed_session_count: Counter, cache_query_counter: CounterVec, cache_miss_counter: CounterVec, + call_times: HistogramVec, + calls_started: CounterVec, + calls_finished: CounterVec, } impl InnerMetrics { @@ -53,17 +68,33 @@ impl InnerMetrics { let closed_counter = Counter::new("closed_ws_counter", "No help").unwrap(); let cache_miss_counter = CounterVec::new(Opts::new("cache_miss_counter", "No help"), &["method"]).unwrap(); let cache_query_counter = CounterVec::new(Opts::new("cache_query_counter", "No help"), &["method"]).unwrap(); + let call_times = + HistogramVec::new(HistogramOpts::new("rpc_calls_time", "No help"), &["protocol", "method"]).unwrap(); + let calls_started_counter = + CounterVec::new(Opts::new("rpc_calls_started", "No help"), &["protocol", "method"]).unwrap(); + let calls_finished_counter = CounterVec::new( + Opts::new("rpc_calls_finished", "No help"), + &["protocol", "method", "is_error"], + ) + .unwrap(); let open_session_count = register(open_counter, registry).unwrap(); let closed_session_count = register(closed_counter, registry).unwrap(); let cache_query_counter = register(cache_query_counter, registry).unwrap(); let cache_miss_counter = register(cache_miss_counter, registry).unwrap(); + let call_times = register(call_times, registry).unwrap(); + let calls_started = register(calls_started_counter, registry).unwrap(); + let calls_finished = register(calls_finished_counter, registry).unwrap(); + Self { cache_miss_counter, cache_query_counter, open_session_count, closed_session_count, + calls_started, + calls_finished, + call_times, } } fn ws_open(&self) { diff --git a/src/extensions/server/mod.rs b/src/extensions/server/mod.rs index 390a5a3..a8a8cbd 100644 --- a/src/extensions/server/mod.rs +++ b/src/extensions/server/mod.rs @@ -8,7 +8,7 @@ use jsonrpsee::server::{ ServerHandle, }; use jsonrpsee::Methods; -use prometheus_endpoint::Registry; + use serde::ser::StdError; use serde::Deserialize; @@ -21,12 +21,13 @@ use tower_http::cors::{AllowOrigin, CorsLayer}; use super::{Extension, ExtensionRegistry}; use crate::extensions::rate_limit::{MethodWeights, RateLimitBuilder, XFF}; +pub use prometheus::Protocol; mod prometheus; mod proxy_get_request; mod ready_get_request; use crate::extensions::prometheus::RpcMetrics; -use crate::extensions::server::prometheus::{PrometheusService, Protocol}; +use crate::extensions::server::prometheus::PrometheusService; use proxy_get_request::{ProxyGetRequestLayer, ProxyGetRequestMethod}; use ready_get_request::ReadyProxyLayer; @@ -107,7 +108,6 @@ impl SubwayServerBuilder { &self, rate_limit_builder: Option>, rpc_method_weights: MethodWeights, - prometheus_registry: Option, rpc_metrics: RpcMetrics, rpc_module_builder: impl FnOnce() -> Fut, ) -> anyhow::Result<(SocketAddr, ServerHandle)> { @@ -142,7 +142,6 @@ impl SubwayServerBuilder { let stop_handle = stop_handle.clone(); let rate_limit_builder = rate_limit_builder.clone(); let rpc_method_weights = rpc_method_weights.clone(); - let prometheus_registry = prometheus_registry.clone(); let rpc_metrics = rpc_metrics.clone(); async move { @@ -156,6 +155,7 @@ impl SubwayServerBuilder { let stop_handle = stop_handle.clone(); let http_middleware = http_middleware.clone(); let rpc_metrics = rpc_metrics.clone(); + let call_metrics = rpc_metrics.call_metrics(); if let Some(true) = rate_limit_builder.as_ref().map(|r| r.use_xff()) { socket_ip = req.xxf_ip().unwrap_or(socket_ip); @@ -173,9 +173,9 @@ impl SubwayServerBuilder { .and_then(|r| r.connection_limit(rpc_method_weights.clone())), ) .option_layer( - prometheus_registry + call_metrics .as_ref() - .map(|r| layer_fn(|s| PrometheusService::new(s, r, protocol))), + .map(|(a, b, c)| layer_fn(|s| PrometheusService::new(s, protocol, a, b, c))), ); let service_builder = ServerBuilder::default() diff --git a/src/extensions/server/prometheus.rs b/src/extensions/server/prometheus.rs index aed9e0d..940fcc7 100644 --- a/src/extensions/server/prometheus.rs +++ b/src/extensions/server/prometheus.rs @@ -2,7 +2,7 @@ use futures::{future::BoxFuture, FutureExt}; use jsonrpsee::server::middleware::rpc::RpcServiceT; use jsonrpsee::types::Request; use jsonrpsee::MethodResponse; -use prometheus_endpoint::{register, CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64}; +use prometheus_endpoint::{CounterVec, HistogramVec, U64}; use std::fmt::Display; @@ -32,27 +32,19 @@ pub struct PrometheusService { } impl PrometheusService { - pub fn new(inner: S, registry: &Registry, protocol: Protocol) -> Self { - let call_times = - HistogramVec::new(HistogramOpts::new("rpc_calls_time", "No help"), &["protocol", "method"]).unwrap(); - let calls_started_counter = - CounterVec::new(Opts::new("rpc_calls_started", "No help"), &["protocol", "method"]).unwrap(); - let calls_finished_counter = CounterVec::new( - Opts::new("rpc_calls_finished", "No help"), - &["protocol", "method", "is_error"], - ) - .unwrap(); - - let call_times = register(call_times, registry).unwrap(); - let calls_started = register(calls_started_counter, registry).unwrap(); - let calls_finished = register(calls_finished_counter, registry).unwrap(); - + pub fn new( + inner: S, + protocol: Protocol, + call_times: &HistogramVec, + calls_started: &CounterVec, + calls_finished: &CounterVec, + ) -> Self { Self { inner, protocol, - calls_started, - calls_finished, - call_times, + calls_started: calls_started.clone(), + calls_finished: calls_finished.clone(), + call_times: call_times.clone(), } } } diff --git a/src/server.rs b/src/server.rs index 333abc3..49250ce 100644 --- a/src/server.rs +++ b/src/server.rs @@ -52,47 +52,95 @@ pub async fn build(config: Config) -> anyhow::Result { let request_timeout_seconds = server_builder.config.request_timeout_seconds; - let prometheus = extensions_registry - .read() - .await - .get::(); - let prometheus_registry = prometheus.map(|p| p.registry().clone()); - let metrics = get_rpc_metrics(&extensions_registry).await; let registry = extensions_registry.clone(); let (addr, handle) = server_builder - .build( - rate_limit_builder, - rpc_method_weights, - prometheus_registry, - metrics, - move || async move { - let mut module = RpcModule::new(()); - - let tracer = telemetry::Tracer::new("server"); - - // register methods from config - for method in config.rpcs.methods { - let mut method_middlewares: Vec> = vec![]; - - for middleware_name in &config.middlewares.methods { - if let Some(middleware) = - factory::create_method_middleware(middleware_name, &method, ®istry).await - { - method_middlewares.push(middleware.into()); - } + .build(rate_limit_builder, rpc_method_weights, metrics, move || async move { + let mut module = RpcModule::new(()); + + let tracer = telemetry::Tracer::new("server"); + + // register methods from config + for method in config.rpcs.methods { + let mut method_middlewares: Vec> = vec![]; + + for middleware_name in &config.middlewares.methods { + if let Some(middleware) = + factory::create_method_middleware(middleware_name, &method, ®istry).await + { + method_middlewares.push(middleware.into()); } + } - let method_middlewares = Middlewares::new( - method_middlewares, - Arc::new(|_, _| async { Err(errors::failed("Bad configuration")) }.boxed()), - ); + let method_middlewares = Middlewares::new( + method_middlewares, + Arc::new(|_, _| async { Err(errors::failed("Bad configuration")) }.boxed()), + ); + + let method_name = string_to_static_str(method.method.clone()); + + module.register_async_method(method_name, move |params, _| { + let method_middlewares = method_middlewares.clone(); + async move { + let parsed = params.parse::()?; + let params = if parsed == JsonValue::Null { + vec![] + } else { + parsed.as_array().ok_or_else(|| errors::invalid_params(""))?.to_owned() + }; + + let (result_tx, result_rx) = tokio::sync::oneshot::channel(); + let timeout = tokio::time::Duration::from_secs(request_timeout_seconds); + + method_middlewares + .call(CallRequest::new(method_name, params), result_tx, timeout) + .await; + + let result = result_rx + .await + .map_err(|_| errors::map_error(jsonrpsee::core::client::Error::RequestTimeout))?; + + match result.as_ref() { + Ok(_) => tracer.span_ok(), + Err(err) => { + tracer.span_error(err); + } + }; + + result + } + .with_context(tracer.context(method_name)) + })?; + } - let method_name = string_to_static_str(method.method.clone()); + // register subscriptions from config + for subscription in config.rpcs.subscriptions { + let subscribe_name = string_to_static_str(subscription.subscribe.clone()); + let unsubscribe_name = string_to_static_str(subscription.unsubscribe.clone()); + let name = string_to_static_str(subscription.name.clone()); + + let mut subscription_middlewares: Vec> = vec![]; + + for middleware_name in &config.middlewares.subscriptions { + if let Some(middleware) = + factory::create_subscription_middleware(middleware_name, &subscription, ®istry).await + { + subscription_middlewares.push(middleware.into()); + } + } - module.register_async_method(method_name, move |params, _| { - let method_middlewares = method_middlewares.clone(); + let subscription_middlewares = Middlewares::new( + subscription_middlewares, + Arc::new(|_, _| async { Err("Bad configuration".into()) }.boxed()), + ); + + module.register_subscription( + subscribe_name, + name, + unsubscribe_name, + move |params, pending_sink, _| { + let subscription_middlewares = subscription_middlewares.clone(); async move { let parsed = params.parse::()?; let params = if parsed == JsonValue::Null { @@ -104,8 +152,17 @@ pub async fn build(config: Config) -> anyhow::Result { let (result_tx, result_rx) = tokio::sync::oneshot::channel(); let timeout = tokio::time::Duration::from_secs(request_timeout_seconds); - method_middlewares - .call(CallRequest::new(method_name, params), result_tx, timeout) + subscription_middlewares + .call( + SubscriptionRequest { + subscribe: subscribe_name.into(), + params, + unsubscribe: unsubscribe_name.into(), + pending_sink, + }, + result_tx, + timeout, + ) .await; let result = result_rx @@ -113,112 +170,43 @@ pub async fn build(config: Config) -> anyhow::Result { .map_err(|_| errors::map_error(jsonrpsee::core::client::Error::RequestTimeout))?; match result.as_ref() { - Ok(_) => tracer.span_ok(), + Ok(_) => { + tracer.span_ok(); + } Err(err) => { - tracer.span_error(err); + tracer.span_error(&errors::failed(format!("{:?}", err))); } }; result } - .with_context(tracer.context(method_name)) - })?; - } - - // register subscriptions from config - for subscription in config.rpcs.subscriptions { - let subscribe_name = string_to_static_str(subscription.subscribe.clone()); - let unsubscribe_name = string_to_static_str(subscription.unsubscribe.clone()); - let name = string_to_static_str(subscription.name.clone()); - - let mut subscription_middlewares: Vec> = vec![]; - - for middleware_name in &config.middlewares.subscriptions { - if let Some(middleware) = - factory::create_subscription_middleware(middleware_name, &subscription, ®istry).await - { - subscription_middlewares.push(middleware.into()); - } - } + .with_context(tracer.context(name)) + }, + )?; + } - let subscription_middlewares = Middlewares::new( - subscription_middlewares, - Arc::new(|_, _| async { Err("Bad configuration".into()) }.boxed()), - ); - - module.register_subscription( - subscribe_name, - name, - unsubscribe_name, - move |params, pending_sink, _| { - let subscription_middlewares = subscription_middlewares.clone(); - async move { - let parsed = params.parse::()?; - let params = if parsed == JsonValue::Null { - vec![] - } else { - parsed.as_array().ok_or_else(|| errors::invalid_params(""))?.to_owned() - }; - - let (result_tx, result_rx) = tokio::sync::oneshot::channel(); - let timeout = tokio::time::Duration::from_secs(request_timeout_seconds); - - subscription_middlewares - .call( - SubscriptionRequest { - subscribe: subscribe_name.into(), - params, - unsubscribe: unsubscribe_name.into(), - pending_sink, - }, - result_tx, - timeout, - ) - .await; - - let result = result_rx - .await - .map_err(|_| errors::map_error(jsonrpsee::core::client::Error::RequestTimeout))?; - - match result.as_ref() { - Ok(_) => { - tracer.span_ok(); - } - Err(err) => { - tracer.span_error(&errors::failed(format!("{:?}", err))); - } - }; - - result - } - .with_context(tracer.context(name)) - }, - )?; - } + // register aliases from config + for (alias_old, alias_new) in config.rpcs.aliases { + let alias_old = string_to_static_str(alias_old); + let alias_new = string_to_static_str(alias_new); + module.register_alias(alias_new, alias_old)?; + } - // register aliases from config - for (alias_old, alias_new) in config.rpcs.aliases { - let alias_old = string_to_static_str(alias_old); - let alias_new = string_to_static_str(alias_new); - module.register_alias(alias_new, alias_old)?; - } + module.register_method("health", |_, _| Ok::<_, ErrorObjectOwned>(()))?; - module.register_method("health", |_, _| Ok::<_, ErrorObjectOwned>(()))?; + let mut rpc_methods = module.method_names().map(|x| x.to_owned()).collect::>(); - let mut rpc_methods = module.method_names().map(|x| x.to_owned()).collect::>(); + rpc_methods.sort(); - rpc_methods.sort(); + module.register_method("rpc_methods", move |_, _| { + Ok::(json!({ + "version": 1, + "methods": rpc_methods + })) + })?; - module.register_method("rpc_methods", move |_, _| { - Ok::(json!({ - "version": 1, - "methods": rpc_methods - })) - })?; - - Ok(module) - }, - ) + Ok(module) + }) .await?; Ok(SubwayServerHandle {