Skip to content

Commit

Permalink
Fix double registering (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
kostekIV authored Mar 4, 2024
1 parent 6792092 commit 15565c8
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 155 deletions.
33 changes: 32 additions & 1 deletion src/extensions/prometheus/rpc_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use prometheus_endpoint::{register, Counter, CounterVec, Opts, Registry, U64};
use prometheus_endpoint::{register, Counter, CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64};

#[derive(Clone)]
pub enum RpcMetrics {
Expand Down Expand Up @@ -37,6 +37,18 @@ impl RpcMetrics {
inner.cache_miss(method);
}
}

pub fn call_metrics(&self) -> Option<(HistogramVec, CounterVec<U64>, CounterVec<U64>)> {
if let Self::Prometheus(inner) = self {
return Some((
inner.call_times.clone(),
inner.calls_started.clone(),
inner.calls_finished.clone(),
));
}

None
}
}

#[derive(Clone)]
Expand All @@ -45,6 +57,9 @@ pub struct InnerMetrics {
closed_session_count: Counter<U64>,
cache_query_counter: CounterVec<U64>,
cache_miss_counter: CounterVec<U64>,
call_times: HistogramVec,
calls_started: CounterVec<U64>,
calls_finished: CounterVec<U64>,
}

impl InnerMetrics {
Expand All @@ -53,17 +68,33 @@ impl InnerMetrics {
let closed_counter = Counter::new("closed_ws_counter", "No help").unwrap();
let cache_miss_counter = CounterVec::new(Opts::new("cache_miss_counter", "No help"), &["method"]).unwrap();
let cache_query_counter = CounterVec::new(Opts::new("cache_query_counter", "No help"), &["method"]).unwrap();
let call_times =
HistogramVec::new(HistogramOpts::new("rpc_calls_time", "No help"), &["protocol", "method"]).unwrap();
let calls_started_counter =
CounterVec::new(Opts::new("rpc_calls_started", "No help"), &["protocol", "method"]).unwrap();
let calls_finished_counter = CounterVec::new(
Opts::new("rpc_calls_finished", "No help"),
&["protocol", "method", "is_error"],
)
.unwrap();

let open_session_count = register(open_counter, registry).unwrap();
let closed_session_count = register(closed_counter, registry).unwrap();
let cache_query_counter = register(cache_query_counter, registry).unwrap();
let cache_miss_counter = register(cache_miss_counter, registry).unwrap();

let call_times = register(call_times, registry).unwrap();
let calls_started = register(calls_started_counter, registry).unwrap();
let calls_finished = register(calls_finished_counter, registry).unwrap();

Self {
cache_miss_counter,
cache_query_counter,
open_session_count,
closed_session_count,
calls_started,
calls_finished,
call_times,
}
}
fn ws_open(&self) {
Expand Down
12 changes: 6 additions & 6 deletions src/extensions/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use jsonrpsee::server::{
ServerHandle,
};
use jsonrpsee::Methods;
use prometheus_endpoint::Registry;

use serde::ser::StdError;
use serde::Deserialize;

Expand All @@ -21,12 +21,13 @@ use tower_http::cors::{AllowOrigin, CorsLayer};

use super::{Extension, ExtensionRegistry};
use crate::extensions::rate_limit::{MethodWeights, RateLimitBuilder, XFF};
pub use prometheus::Protocol;

mod prometheus;
mod proxy_get_request;
mod ready_get_request;
use crate::extensions::prometheus::RpcMetrics;
use crate::extensions::server::prometheus::{PrometheusService, Protocol};
use crate::extensions::server::prometheus::PrometheusService;
use proxy_get_request::{ProxyGetRequestLayer, ProxyGetRequestMethod};
use ready_get_request::ReadyProxyLayer;

Expand Down Expand Up @@ -107,7 +108,6 @@ impl SubwayServerBuilder {
&self,
rate_limit_builder: Option<Arc<RateLimitBuilder>>,
rpc_method_weights: MethodWeights,
prometheus_registry: Option<Registry>,
rpc_metrics: RpcMetrics,
rpc_module_builder: impl FnOnce() -> Fut,
) -> anyhow::Result<(SocketAddr, ServerHandle)> {
Expand Down Expand Up @@ -142,7 +142,6 @@ impl SubwayServerBuilder {
let stop_handle = stop_handle.clone();
let rate_limit_builder = rate_limit_builder.clone();
let rpc_method_weights = rpc_method_weights.clone();
let prometheus_registry = prometheus_registry.clone();
let rpc_metrics = rpc_metrics.clone();

async move {
Expand All @@ -156,6 +155,7 @@ impl SubwayServerBuilder {
let stop_handle = stop_handle.clone();
let http_middleware = http_middleware.clone();
let rpc_metrics = rpc_metrics.clone();
let call_metrics = rpc_metrics.call_metrics();

if let Some(true) = rate_limit_builder.as_ref().map(|r| r.use_xff()) {
socket_ip = req.xxf_ip().unwrap_or(socket_ip);
Expand All @@ -173,9 +173,9 @@ impl SubwayServerBuilder {
.and_then(|r| r.connection_limit(rpc_method_weights.clone())),
)
.option_layer(
prometheus_registry
call_metrics
.as_ref()
.map(|r| layer_fn(|s| PrometheusService::new(s, r, protocol))),
.map(|(a, b, c)| layer_fn(|s| PrometheusService::new(s, protocol, a, b, c))),
);

let service_builder = ServerBuilder::default()
Expand Down
30 changes: 11 additions & 19 deletions src/extensions/server/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use futures::{future::BoxFuture, FutureExt};
use jsonrpsee::server::middleware::rpc::RpcServiceT;
use jsonrpsee::types::Request;
use jsonrpsee::MethodResponse;
use prometheus_endpoint::{register, CounterVec, HistogramOpts, HistogramVec, Opts, Registry, U64};
use prometheus_endpoint::{CounterVec, HistogramVec, U64};

use std::fmt::Display;

Expand Down Expand Up @@ -32,27 +32,19 @@ pub struct PrometheusService<S> {
}

impl<S> PrometheusService<S> {
pub fn new(inner: S, registry: &Registry, protocol: Protocol) -> Self {
let call_times =
HistogramVec::new(HistogramOpts::new("rpc_calls_time", "No help"), &["protocol", "method"]).unwrap();
let calls_started_counter =
CounterVec::new(Opts::new("rpc_calls_started", "No help"), &["protocol", "method"]).unwrap();
let calls_finished_counter = CounterVec::new(
Opts::new("rpc_calls_finished", "No help"),
&["protocol", "method", "is_error"],
)
.unwrap();

let call_times = register(call_times, registry).unwrap();
let calls_started = register(calls_started_counter, registry).unwrap();
let calls_finished = register(calls_finished_counter, registry).unwrap();

pub fn new(
inner: S,
protocol: Protocol,
call_times: &HistogramVec,
calls_started: &CounterVec<U64>,
calls_finished: &CounterVec<U64>,
) -> Self {
Self {
inner,
protocol,
calls_started,
calls_finished,
call_times,
calls_started: calls_started.clone(),
calls_finished: calls_finished.clone(),
call_times: call_times.clone(),
}
}
}
Expand Down
Loading

0 comments on commit 15565c8

Please sign in to comment.