Skip to content

Commit

Permalink
Add cache-metrics (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
kostekIV authored Feb 29, 2024
1 parent c983ce9 commit 6792092
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 73 deletions.
21 changes: 21 additions & 0 deletions src/extensions/prometheus/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod rpc_metrics;

use super::{Extension, ExtensionRegistry};
use async_trait::async_trait;
use prometheus_endpoint::init_prometheus;
Expand All @@ -7,8 +9,21 @@ use std::iter;
use std::net::SocketAddr;
use tokio::task::JoinHandle;

use crate::utils::TypeRegistryRef;
pub use rpc_metrics::RpcMetrics;

pub async fn get_rpc_metrics(registry: &TypeRegistryRef) -> RpcMetrics {
let prometheus = registry.read().await.get::<Prometheus>();

match prometheus {
None => RpcMetrics::noop(),
Some(prom) => prom.rpc_metrics(),
}
}

pub struct Prometheus {
pub registry: Registry,
rpc_metrics: RpcMetrics,
pub exporter_task: JoinHandle<()>,
}

Expand Down Expand Up @@ -46,17 +61,23 @@ impl Prometheus {
p => p,
};
let registry = Registry::new_custom(prefix, labels).expect("Can't happen");
let rpc_metrics = RpcMetrics::new(&registry);

let exporter_task = start_prometheus_exporter(registry.clone(), config.port, config.listen_address);
Self {
registry,
exporter_task,
rpc_metrics,
}
}

pub fn registry(&self) -> &Registry {
&self.registry
}

pub fn rpc_metrics(&self) -> RpcMetrics {
self.rpc_metrics.clone()
}
}

fn start_prometheus_exporter(registry: Registry, port: u16, listen_address: String) -> JoinHandle<()> {
Expand Down
84 changes: 84 additions & 0 deletions src/extensions/prometheus/rpc_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use prometheus_endpoint::{register, Counter, CounterVec, Opts, Registry, U64};

#[derive(Clone)]
pub enum RpcMetrics {
Prometheus(InnerMetrics),
Noop,
}

impl RpcMetrics {
pub fn new(registry: &Registry) -> Self {
Self::Prometheus(InnerMetrics::new(registry))
}

pub fn noop() -> Self {
Self::Noop
}

pub fn ws_open(&self) {
if let Self::Prometheus(inner) = self {
inner.ws_open();
}
}

pub fn ws_closed(&self) {
if let Self::Prometheus(inner) = self {
inner.ws_closed();
}
}

pub fn cache_query(&self, method: &str) {
if let Self::Prometheus(inner) = self {
inner.cache_query(method);
}
}
pub fn cache_miss(&self, method: &str) {
if let Self::Prometheus(inner) = self {
inner.cache_miss(method);
}
}
}

#[derive(Clone)]
pub struct InnerMetrics {
open_session_count: Counter<U64>,
closed_session_count: Counter<U64>,
cache_query_counter: CounterVec<U64>,
cache_miss_counter: CounterVec<U64>,
}

impl InnerMetrics {
fn new(registry: &Registry) -> Self {
let open_counter = Counter::new("open_ws_counter", "No help").unwrap();
let closed_counter = Counter::new("closed_ws_counter", "No help").unwrap();
let cache_miss_counter = CounterVec::new(Opts::new("cache_miss_counter", "No help"), &["method"]).unwrap();
let cache_query_counter = CounterVec::new(Opts::new("cache_query_counter", "No help"), &["method"]).unwrap();

let open_session_count = register(open_counter, registry).unwrap();
let closed_session_count = register(closed_counter, registry).unwrap();
let cache_query_counter = register(cache_query_counter, registry).unwrap();
let cache_miss_counter = register(cache_miss_counter, registry).unwrap();

Self {
cache_miss_counter,
cache_query_counter,
open_session_count,
closed_session_count,
}
}
fn ws_open(&self) {
self.open_session_count.inc();
}

fn ws_closed(&self) {
self.closed_session_count.inc();
}

fn cache_query(&self, method: &str) {
self.cache_query_counter.with_label_values(&[method]).inc();
}

fn cache_miss(&self, method: &str) {
self.cache_miss_counter.with_label_values(&[method]).inc();
}
}
13 changes: 7 additions & 6 deletions src/extensions/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use crate::extensions::rate_limit::{MethodWeights, RateLimitBuilder, XFF};
mod prometheus;
mod proxy_get_request;
mod ready_get_request;
use crate::extensions::server::prometheus::{PrometheusService, Protocol, WsMetrics};
use crate::extensions::prometheus::RpcMetrics;
use crate::extensions::server::prometheus::{PrometheusService, Protocol};
use proxy_get_request::{ProxyGetRequestLayer, ProxyGetRequestMethod};
use ready_get_request::ReadyProxyLayer;

Expand Down Expand Up @@ -107,14 +108,14 @@ impl SubwayServerBuilder {
rate_limit_builder: Option<Arc<RateLimitBuilder>>,
rpc_method_weights: MethodWeights,
prometheus_registry: Option<Registry>,
rpc_metrics: RpcMetrics,
rpc_module_builder: impl FnOnce() -> Fut,
) -> anyhow::Result<(SocketAddr, ServerHandle)> {
let config = self.config.clone();

let (stop_handle, server_handle) = stop_channel();
let handle = stop_handle.clone();
let rpc_module = rpc_module_builder().await?;
let ws_metrics = WsMetrics::new(prometheus_registry.as_ref());

// make_service handle each connection
let make_service = make_service_fn(move |socket: &AddrStream| {
Expand Down Expand Up @@ -142,7 +143,7 @@ impl SubwayServerBuilder {
let rate_limit_builder = rate_limit_builder.clone();
let rpc_method_weights = rpc_method_weights.clone();
let prometheus_registry = prometheus_registry.clone();
let ws_metrics = ws_metrics.clone();
let rpc_metrics = rpc_metrics.clone();

async move {
// service_fn handle each request
Expand All @@ -154,7 +155,7 @@ impl SubwayServerBuilder {
let methods: Methods = rpc_module.clone().into();
let stop_handle = stop_handle.clone();
let http_middleware = http_middleware.clone();
let ws_metrics = ws_metrics.clone();
let rpc_metrics = rpc_metrics.clone();

if let Some(true) = rate_limit_builder.as_ref().map(|r| r.use_xff()) {
socket_ip = req.xxf_ip().unwrap_or(socket_ip);
Expand Down Expand Up @@ -188,10 +189,10 @@ impl SubwayServerBuilder {

if is_websocket {
let on_ws_close = service.on_session_closed();
ws_metrics.ws_open();
rpc_metrics.ws_open();
tokio::spawn(async move {
on_ws_close.await;
ws_metrics.ws_closed();
rpc_metrics.ws_closed();
});
}
service.call(req)
Expand Down
56 changes: 1 addition & 55 deletions src/extensions/server/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,64 +2,10 @@ use futures::{future::BoxFuture, FutureExt};
use jsonrpsee::server::middleware::rpc::RpcServiceT;
use jsonrpsee::types::Request;
use jsonrpsee::MethodResponse;
use prometheus_endpoint::{register, Counter, CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64};
use prometheus_endpoint::{register, CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64};

use std::fmt::Display;

#[derive(Clone)]
pub enum WsMetrics {
Prometheus(InnerMetrics),
Noop,
}

impl WsMetrics {
pub fn new(registry: Option<&Registry>) -> Self {
match registry {
None => Self::Noop,
Some(r) => Self::Prometheus(InnerMetrics::new(r)),
}
}

pub fn ws_open(&self) {
if let Self::Prometheus(inner) = self {
inner.ws_open();
}
}

pub fn ws_closed(&self) {
if let Self::Prometheus(inner) = self {
inner.ws_closed();
}
}
}

#[derive(Clone)]
pub struct InnerMetrics {
open_session_count: Counter<U64>,
closed_session_count: Counter<U64>,
}

impl InnerMetrics {
fn new(registry: &Registry) -> Self {
let open_counter = Counter::new("open_ws_counter", "No help").unwrap();
let closed_counter = Counter::new("closed_ws_counter", "No help").unwrap();

let open_session_count = register(open_counter, registry).unwrap();
let closed_session_count = register(closed_counter, registry).unwrap();
Self {
open_session_count,
closed_session_count,
}
}
fn ws_open(&self) {
self.open_session_count.inc();
}

fn ws_closed(&self) {
self.closed_session_count.inc();
}
}

#[derive(Clone, Copy)]
pub enum Protocol {
Ws,
Expand Down
38 changes: 26 additions & 12 deletions src/middlewares/methods/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use blake2::Blake2b512;
use futures::FutureExt as _;
use opentelemetry::trace::FutureExt;

use crate::extensions::prometheus::{get_rpc_metrics, RpcMetrics};
use crate::{
config::CacheParams,
extensions::cache::Cache as CacheExtension,
Expand All @@ -16,11 +17,12 @@ pub struct BypassCache(pub bool);

pub struct CacheMiddleware {
cache: Cache<Blake2b512>,
metrics: RpcMetrics,
}

impl CacheMiddleware {
pub fn new(cache: Cache<Blake2b512>) -> Self {
Self { cache }
pub fn new(cache: Cache<Blake2b512>, metrics: RpcMetrics) -> Self {
Self { cache, metrics }
}
}

Expand All @@ -36,6 +38,8 @@ impl MiddlewareBuilder<RpcMethod, CallRequest, CallResult> for CacheMiddleware {
.get::<CacheExtension>()
.expect("Cache extension not found");

let metrics = get_rpc_metrics(extensions).await;

// do not cache if size is 0, otherwise use default size
let size = match method.cache {
Some(CacheParams { size: Some(0), .. }) => return None,
Expand All @@ -57,7 +61,7 @@ impl MiddlewareBuilder<RpcMethod, CallRequest, CallResult> for CacheMiddleware {
ttl_seconds.map(std::time::Duration::from_secs),
);

Some(Box::new(Self::new(cache)))
Some(Box::new(Self::new(cache, metrics)))
}
}

Expand All @@ -75,11 +79,21 @@ impl Middleware<CallRequest, CallResult> for CacheMiddleware {
return next(request, context).await;
}

let metrics = self.metrics.clone();
let key = CacheKey::<Blake2b512>::new(&request.method, &request.params);

let method = request.method.to_string();
metrics.cache_query(&method);

let result = self
.cache
.get_or_insert_with(key.clone(), || next(request, context).boxed())
.get_or_insert_with(key.clone(), || {
async move {
metrics.cache_miss(&method);
next(request, context).await
}
.boxed()
})
.await;

if let Ok(ref value) = result {
Expand Down Expand Up @@ -110,7 +124,7 @@ mod tests {
#[tokio::test]
async fn handle_ok_resp() {
let cache = Cache::new(NonZeroUsize::try_from(1).unwrap(), None);
let middleware = CacheMiddleware::new(cache.clone());
let middleware = CacheMiddleware::new(cache.clone(), RpcMetrics::noop());

let res = middleware
.call(
Expand Down Expand Up @@ -187,7 +201,7 @@ mod tests {

#[tokio::test]
async fn should_not_cache_null() {
let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None));
let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None), RpcMetrics::noop());

let res = middleware
.call(
Expand All @@ -214,10 +228,10 @@ mod tests {

#[tokio::test]
async fn cache_ttl_works() {
let middleware = CacheMiddleware::new(Cache::new(
NonZeroUsize::new(1).unwrap(),
Some(Duration::from_millis(10)),
));
let middleware = CacheMiddleware::new(
Cache::new(NonZeroUsize::new(1).unwrap(), Some(Duration::from_millis(10))),
RpcMetrics::noop(),
);

let res = middleware
.call(
Expand Down Expand Up @@ -257,7 +271,7 @@ mod tests {

#[tokio::test]
async fn bypass_cache() {
let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None));
let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None), RpcMetrics::noop());

let res = middleware
.call(
Expand Down Expand Up @@ -300,7 +314,7 @@ mod tests {

#[tokio::test]
async fn avoid_repeated_requests() {
let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None));
let middleware = CacheMiddleware::new(Cache::new(NonZeroUsize::try_from(3).unwrap(), None), RpcMetrics::noop());

let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let res = middleware.call(
Expand Down
4 changes: 4 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use jsonrpsee::{
use opentelemetry::trace::FutureExt as _;
use serde_json::json;

use crate::extensions::prometheus::get_rpc_metrics;
use crate::{
config::Config,
extensions::{
Expand Down Expand Up @@ -57,12 +58,15 @@ pub async fn build(config: Config) -> anyhow::Result<SubwayServerHandle> {
.get::<crate::extensions::prometheus::Prometheus>();
let prometheus_registry = prometheus.map(|p| p.registry().clone());

let metrics = get_rpc_metrics(&extensions_registry).await;

let registry = extensions_registry.clone();
let (addr, handle) = server_builder
.build(
rate_limit_builder,
rpc_method_weights,
prometheus_registry,
metrics,
move || async move {
let mut module = RpcModule::new(());

Expand Down

0 comments on commit 6792092

Please sign in to comment.