diff --git a/src/extensions/prometheus/mod.rs b/src/extensions/prometheus/mod.rs index 155fcc0..1b54935 100644 --- a/src/extensions/prometheus/mod.rs +++ b/src/extensions/prometheus/mod.rs @@ -1,3 +1,5 @@ +mod rpc_metrics; + use super::{Extension, ExtensionRegistry}; use async_trait::async_trait; use prometheus_endpoint::init_prometheus; @@ -7,8 +9,21 @@ use std::iter; use std::net::SocketAddr; use tokio::task::JoinHandle; +use crate::utils::TypeRegistryRef; +pub use rpc_metrics::RpcMetrics; + +pub async fn get_rpc_metrics(registry: &TypeRegistryRef) -> RpcMetrics { + let prometheus = registry.read().await.get::(); + + match prometheus { + None => RpcMetrics::noop(), + Some(prom) => prom.rpc_metrics(), + } +} + pub struct Prometheus { pub registry: Registry, + rpc_metrics: RpcMetrics, pub exporter_task: JoinHandle<()>, } @@ -46,17 +61,23 @@ impl Prometheus { p => p, }; let registry = Registry::new_custom(prefix, labels).expect("Can't happen"); + let rpc_metrics = RpcMetrics::new(®istry); let exporter_task = start_prometheus_exporter(registry.clone(), config.port, config.listen_address); Self { registry, exporter_task, + rpc_metrics, } } pub fn registry(&self) -> &Registry { &self.registry } + + pub fn rpc_metrics(&self) -> RpcMetrics { + self.rpc_metrics.clone() + } } fn start_prometheus_exporter(registry: Registry, port: u16, listen_address: String) -> JoinHandle<()> { diff --git a/src/extensions/prometheus/rpc_metrics.rs b/src/extensions/prometheus/rpc_metrics.rs new file mode 100644 index 0000000..9a09869 --- /dev/null +++ b/src/extensions/prometheus/rpc_metrics.rs @@ -0,0 +1,84 @@ +use prometheus_endpoint::{register, Counter, CounterVec, Opts, Registry, U64}; + +#[derive(Clone)] +pub enum RpcMetrics { + Prometheus(InnerMetrics), + Noop, +} + +impl RpcMetrics { + pub fn new(registry: &Registry) -> Self { + Self::Prometheus(InnerMetrics::new(registry)) + } + + pub fn noop() -> Self { + Self::Noop + } + + pub fn ws_open(&self) { + if let Self::Prometheus(inner) = self { + inner.ws_open(); + } + } + + pub fn ws_closed(&self) { + if let Self::Prometheus(inner) = self { + inner.ws_closed(); + } + } + + pub fn cache_query(&self, method: &str) { + if let Self::Prometheus(inner) = self { + inner.cache_query(method); + } + } + pub fn cache_miss(&self, method: &str) { + if let Self::Prometheus(inner) = self { + inner.cache_miss(method); + } + } +} + +#[derive(Clone)] +pub struct InnerMetrics { + open_session_count: Counter, + closed_session_count: Counter, + cache_query_counter: CounterVec, + cache_miss_counter: CounterVec, +} + +impl InnerMetrics { + fn new(registry: &Registry) -> Self { + let open_counter = Counter::new("open_ws_counter", "No help").unwrap(); + let closed_counter = Counter::new("closed_ws_counter", "No help").unwrap(); + let cache_miss_counter = CounterVec::new(Opts::new("cache_miss_counter", "No help"), &["method"]).unwrap(); + let cache_query_counter = CounterVec::new(Opts::new("cache_query_counter", "No help"), &["method"]).unwrap(); + + let open_session_count = register(open_counter, registry).unwrap(); + let closed_session_count = register(closed_counter, registry).unwrap(); + let cache_query_counter = register(cache_query_counter, registry).unwrap(); + let cache_miss_counter = register(cache_miss_counter, registry).unwrap(); + + Self { + cache_miss_counter, + cache_query_counter, + open_session_count, + closed_session_count, + } + } + fn ws_open(&self) { + self.open_session_count.inc(); + } + + fn ws_closed(&self) { + self.closed_session_count.inc(); + } + + fn cache_query(&self, method: &str) { + self.cache_query_counter.with_label_values(&[method]).inc(); + } + + fn cache_miss(&self, method: &str) { + self.cache_miss_counter.with_label_values(&[method]).inc(); + } +} diff --git a/src/extensions/server/mod.rs b/src/extensions/server/mod.rs index 6ee50c5..390a5a3 100644 --- a/src/extensions/server/mod.rs +++ b/src/extensions/server/mod.rs @@ -25,7 +25,8 @@ use crate::extensions::rate_limit::{MethodWeights, RateLimitBuilder, XFF}; mod prometheus; mod proxy_get_request; mod ready_get_request; -use crate::extensions::server::prometheus::{PrometheusService, Protocol, WsMetrics}; +use crate::extensions::prometheus::RpcMetrics; +use crate::extensions::server::prometheus::{PrometheusService, Protocol}; use proxy_get_request::{ProxyGetRequestLayer, ProxyGetRequestMethod}; use ready_get_request::ReadyProxyLayer; @@ -107,6 +108,7 @@ impl SubwayServerBuilder { rate_limit_builder: Option>, rpc_method_weights: MethodWeights, prometheus_registry: Option, + rpc_metrics: RpcMetrics, rpc_module_builder: impl FnOnce() -> Fut, ) -> anyhow::Result<(SocketAddr, ServerHandle)> { let config = self.config.clone(); @@ -114,7 +116,6 @@ impl SubwayServerBuilder { let (stop_handle, server_handle) = stop_channel(); let handle = stop_handle.clone(); let rpc_module = rpc_module_builder().await?; - let ws_metrics = WsMetrics::new(prometheus_registry.as_ref()); // make_service handle each connection let make_service = make_service_fn(move |socket: &AddrStream| { @@ -142,7 +143,7 @@ impl SubwayServerBuilder { let rate_limit_builder = rate_limit_builder.clone(); let rpc_method_weights = rpc_method_weights.clone(); let prometheus_registry = prometheus_registry.clone(); - let ws_metrics = ws_metrics.clone(); + let rpc_metrics = rpc_metrics.clone(); async move { // service_fn handle each request @@ -154,7 +155,7 @@ impl SubwayServerBuilder { let methods: Methods = rpc_module.clone().into(); let stop_handle = stop_handle.clone(); let http_middleware = http_middleware.clone(); - let ws_metrics = ws_metrics.clone(); + let rpc_metrics = rpc_metrics.clone(); if let Some(true) = rate_limit_builder.as_ref().map(|r| r.use_xff()) { socket_ip = req.xxf_ip().unwrap_or(socket_ip); @@ -188,10 +189,10 @@ impl SubwayServerBuilder { if is_websocket { let on_ws_close = service.on_session_closed(); - ws_metrics.ws_open(); + rpc_metrics.ws_open(); tokio::spawn(async move { on_ws_close.await; - ws_metrics.ws_closed(); + rpc_metrics.ws_closed(); }); } service.call(req) diff --git a/src/extensions/server/prometheus.rs b/src/extensions/server/prometheus.rs index 77c5f46..aed9e0d 100644 --- a/src/extensions/server/prometheus.rs +++ b/src/extensions/server/prometheus.rs @@ -2,64 +2,10 @@ use futures::{future::BoxFuture, FutureExt}; use jsonrpsee::server::middleware::rpc::RpcServiceT; use jsonrpsee::types::Request; use jsonrpsee::MethodResponse; -use prometheus_endpoint::{register, Counter, CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64}; +use prometheus_endpoint::{register, CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64}; use std::fmt::Display; -#[derive(Clone)] -pub enum WsMetrics { - Prometheus(InnerMetrics), - Noop, -} - -impl WsMetrics { - pub fn new(registry: Option<&Registry>) -> Self { - match registry { - None => Self::Noop, - Some(r) => Self::Prometheus(InnerMetrics::new(r)), - } - } - - pub fn ws_open(&self) { - if let Self::Prometheus(inner) = self { - inner.ws_open(); - } - } - - pub fn ws_closed(&self) { - if let Self::Prometheus(inner) = self { - inner.ws_closed(); - } - } -} - -#[derive(Clone)] -pub struct InnerMetrics { - open_session_count: Counter, - closed_session_count: Counter, -} - -impl InnerMetrics { - fn new(registry: &Registry) -> Self { - let open_counter = Counter::new("open_ws_counter", "No help").unwrap(); - let closed_counter = Counter::new("closed_ws_counter", "No help").unwrap(); - - let open_session_count = register(open_counter, registry).unwrap(); - let closed_session_count = register(closed_counter, registry).unwrap(); - Self { - open_session_count, - closed_session_count, - } - } - fn ws_open(&self) { - self.open_session_count.inc(); - } - - fn ws_closed(&self) { - self.closed_session_count.inc(); - } -} - #[derive(Clone, Copy)] pub enum Protocol { Ws, diff --git a/src/middlewares/methods/cache.rs b/src/middlewares/methods/cache.rs index 4a3eba1..85c0e63 100644 --- a/src/middlewares/methods/cache.rs +++ b/src/middlewares/methods/cache.rs @@ -5,6 +5,7 @@ use blake2::Blake2b512; use futures::FutureExt as _; use opentelemetry::trace::FutureExt; +use crate::extensions::prometheus::{get_rpc_metrics, RpcMetrics}; use crate::{ config::CacheParams, extensions::cache::Cache as CacheExtension, @@ -16,11 +17,12 @@ pub struct BypassCache(pub bool); pub struct CacheMiddleware { cache: Cache, + metrics: RpcMetrics, } impl CacheMiddleware { - pub fn new(cache: Cache) -> Self { - Self { cache } + pub fn new(cache: Cache, metrics: RpcMetrics) -> Self { + Self { cache, metrics } } } @@ -36,6 +38,8 @@ impl MiddlewareBuilder for CacheMiddleware { .get::() .expect("Cache extension not found"); + let metrics = get_rpc_metrics(extensions).await; + // do not cache if size is 0, otherwise use default size let size = match method.cache { Some(CacheParams { size: Some(0), .. }) => return None, @@ -57,7 +61,7 @@ impl MiddlewareBuilder for CacheMiddleware { ttl_seconds.map(std::time::Duration::from_secs), ); - Some(Box::new(Self::new(cache))) + Some(Box::new(Self::new(cache, metrics))) } } @@ -75,11 +79,21 @@ impl Middleware for CacheMiddleware { return next(request, context).await; } + let metrics = self.metrics.clone(); let key = CacheKey::::new(&request.method, &request.params); + let method = request.method.to_string(); + metrics.cache_query(&method); + let result = self .cache - .get_or_insert_with(key.clone(), || next(request, context).boxed()) + .get_or_insert_with(key.clone(), || { + async move { + metrics.cache_miss(&method); + next(request, context).await + } + .boxed() + }) .await; if let Ok(ref value) = result { @@ -110,7 +124,7 @@ mod tests { #[tokio::test] async fn handle_ok_resp() { let cache = Cache::new(NonZeroUsize::try_from(1).unwrap(), None); - let middleware = CacheMiddleware::new(cache.clone()); + let middleware = CacheMiddleware::new(cache.clone(), RpcMetrics::noop()); let res = middleware .call( @@ -187,7 +201,7 @@ mod tests { #[tokio::test] async fn should_not_cache_null() { - let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None)); + let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None), RpcMetrics::noop()); let res = middleware .call( @@ -214,10 +228,10 @@ mod tests { #[tokio::test] async fn cache_ttl_works() { - let middleware = CacheMiddleware::new(Cache::new( - NonZeroUsize::new(1).unwrap(), - Some(Duration::from_millis(10)), - )); + let middleware = CacheMiddleware::new( + Cache::new(NonZeroUsize::new(1).unwrap(), Some(Duration::from_millis(10))), + RpcMetrics::noop(), + ); let res = middleware .call( @@ -257,7 +271,7 @@ mod tests { #[tokio::test] async fn bypass_cache() { - let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None)); + let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None), RpcMetrics::noop()); let res = middleware .call( @@ -300,7 +314,7 @@ mod tests { #[tokio::test] async fn avoid_repeated_requests() { - let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None)); + let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None), RpcMetrics::noop()); let (tx, mut rx) = tokio::sync::mpsc::channel(1); let res = middleware.call( diff --git a/src/server.rs b/src/server.rs index 67d8ed2..333abc3 100644 --- a/src/server.rs +++ b/src/server.rs @@ -9,6 +9,7 @@ use jsonrpsee::{ use opentelemetry::trace::FutureExt as _; use serde_json::json; +use crate::extensions::prometheus::get_rpc_metrics; use crate::{ config::Config, extensions::{ @@ -57,12 +58,15 @@ pub async fn build(config: Config) -> anyhow::Result { .get::(); let prometheus_registry = prometheus.map(|p| p.registry().clone()); + let metrics = get_rpc_metrics(&extensions_registry).await; + let registry = extensions_registry.clone(); let (addr, handle) = server_builder .build( rate_limit_builder, rpc_method_weights, prometheus_registry, + metrics, move || async move { let mut module = RpcModule::new(());