Skip to content

Commit

Permalink
A0-3853: Prometheus metrics (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
kostekIV authored Jan 12, 2024
1 parent 854c2d0 commit 7218798
Show file tree
Hide file tree
Showing 8 changed files with 940 additions and 218 deletions.
757 changes: 656 additions & 101 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ tracing = "0.1.37"
tracing-serde = "0.1.3"
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "json"] }

prometheus-endpoint = "2.1.2"

jsonrpsee = { path = "./vendor/jsonrpsee/jsonrpsee", features = ["full"] }
governor = { path = "./vendor/governor/governor" }

Expand Down
3 changes: 3 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ extensions:
# use X-Forwarded-For header to get real ip, if available (e.g. behind a load balancer).
# WARNING: Use with caution, as this xff header can be forged.
use_xff: true # default is false
prometheus:
port: 9615
label: "dev"

middlewares:
methods:
Expand Down
2 changes: 2 additions & 0 deletions src/extensions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod cache;
pub mod client;
pub mod event_bus;
pub mod merge_subscription;
pub mod prometheus;
pub mod rate_limit;
pub mod server;
pub mod telemetry;
Expand Down Expand Up @@ -138,4 +139,5 @@ define_all_extensions! {
server: server::SubwayServerBuilder,
event_bus: event_bus::EventBus,
rate_limit: rate_limit::RateLimitBuilder,
prometheus: prometheus::Prometheus,
}
67 changes: 67 additions & 0 deletions src/extensions/prometheus/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use super::{Extension, ExtensionRegistry};
use async_trait::async_trait;
use prometheus_endpoint::init_prometheus;
use prometheus_endpoint::Registry;
use serde::Deserialize;
use std::iter;
use std::net::{Ipv4Addr, SocketAddr};
use tokio::task::JoinHandle;

pub struct Prometheus {
pub registry: Registry,
pub exporter_task: JoinHandle<()>,
}

impl Drop for Prometheus {
fn drop(&mut self) {
self.exporter_task.abort();
}
}

#[derive(Deserialize, Debug, Clone)]
pub struct PrometheusConfig {
pub port: u16,
pub prefix: Option<String>,
pub chain_label: Option<String>,
}

#[async_trait]
impl Extension for Prometheus {
type Config = PrometheusConfig;

async fn from_config(config: &Self::Config, _registry: &ExtensionRegistry) -> Result<Self, anyhow::Error> {
Ok(Self::new(config.clone()))
}
}

impl Prometheus {
pub fn new(config: PrometheusConfig) -> Self {
let labels = config
.chain_label
.clone()
.map(|l| iter::once(("chain".to_string(), l.clone())).collect());
let prefix = match config.prefix {
Some(p) if p.is_empty() => None,
p => p,
};
let registry = Registry::new_custom(prefix, labels).expect("Can't happen");

let exporter_task = start_prometheus_exporter(registry.clone(), config.port);
Self {
registry,
exporter_task,
}
}

pub fn registry(&self) -> &Registry {
&self.registry
}
}

fn start_prometheus_exporter(registry: Registry, port: u16) -> JoinHandle<()> {
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port);

tokio::spawn(async move {
init_prometheus(addr, registry).await.unwrap();
})
}
11 changes: 11 additions & 0 deletions src/extensions/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,23 @@ use jsonrpsee::server::{
middleware::rpc::RpcServiceBuilder, stop_channel, RandomStringIdProvider, RpcModule, ServerBuilder, ServerHandle,
};
use jsonrpsee::Methods;
use prometheus_endpoint::Registry;
use serde::ser::StdError;
use serde::Deserialize;
use std::str::FromStr;
use std::sync::Arc;
use std::{future::Future, net::SocketAddr};
use tower::layer::layer_fn;
use tower::ServiceBuilder;
use tower_http::cors::{AllowOrigin, CorsLayer};

use super::{Extension, ExtensionRegistry};
use crate::extensions::rate_limit::{MethodWeights, RateLimitBuilder, XFF};

mod prometheus;
mod proxy_get_request;
mod ready_get_request;
use crate::extensions::server::prometheus::PrometheusService;
use proxy_get_request::{ProxyGetRequestLayer, ProxyGetRequestMethod};
use ready_get_request::ReadyProxyLayer;

Expand Down Expand Up @@ -100,6 +104,7 @@ impl SubwayServerBuilder {
&self,
rate_limit_builder: Option<Arc<RateLimitBuilder>>,
rpc_method_weights: MethodWeights,
prometheus_registry: Option<Registry>,
rpc_module_builder: impl FnOnce() -> Fut,
) -> anyhow::Result<(SocketAddr, ServerHandle)> {
let config = self.config.clone();
Expand Down Expand Up @@ -133,6 +138,7 @@ impl SubwayServerBuilder {
let stop_handle = stop_handle.clone();
let rate_limit_builder = rate_limit_builder.clone();
let rpc_method_weights = rpc_method_weights.clone();
let prometheus_registry = prometheus_registry.clone();

async move {
// service_fn handle each request
Expand All @@ -156,6 +162,11 @@ impl SubwayServerBuilder {
rate_limit_builder
.as_ref()
.and_then(|r| r.connection_limit(rpc_method_weights.clone())),
)
.option_layer(
prometheus_registry
.as_ref()
.map(|r| layer_fn(|s| PrometheusService::new(s, r))),
);

let service_builder = ServerBuilder::default()
Expand Down
71 changes: 71 additions & 0 deletions src/extensions/server/prometheus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use futures::{future::BoxFuture, FutureExt};
use jsonrpsee::server::middleware::rpc::RpcServiceT;
use jsonrpsee::types::Request;
use jsonrpsee::MethodResponse;
use prometheus_endpoint::{register, Counter, Histogram, HistogramOpts, Registry, U64};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

type MetricPair = (Counter<U64>, Histogram);

#[derive(Clone)]
pub struct PrometheusService<S> {
inner: S,
registry: Registry,
call_metrics: Arc<Mutex<HashMap<String, MetricPair>>>,
}

impl<S> PrometheusService<S> {
pub fn new(inner: S, registry: &Registry) -> Self {
Self {
inner,
registry: registry.clone(),
call_metrics: Arc::new(Mutex::new(HashMap::new())),
}
}

fn register_metrics_for(&self, method: String) -> MetricPair {
let counter_name = format!("{}_count", method);
let histogram_name = format!("{}_histogram", method);

let counter = Counter::new(counter_name, "No help").unwrap();
let histogram = Histogram::with_opts(HistogramOpts::new(histogram_name, "No help")).unwrap();

let counter = register(counter, &self.registry).unwrap();
let histogram = register(histogram, &self.registry).unwrap();

(counter, histogram)
}

fn metrics_for(&self, method: String) -> MetricPair {
let mut metrics = self.call_metrics.lock().unwrap();
let (counter, histogram) = metrics
.entry(method.clone())
.or_insert_with(|| self.register_metrics_for(method));

(counter.clone(), histogram.clone())
}
}

impl<'a, S> RpcServiceT<'a> for PrometheusService<S>
where
S: RpcServiceT<'a> + Send + Sync + Clone + 'static,
{
type Future = BoxFuture<'a, MethodResponse>;

fn call(&self, req: Request<'a>) -> Self::Future {
let (counter, histogram) = self.metrics_for(req.method.to_string());

let service = self.inner.clone();
async move {
counter.inc();

let timer = histogram.start_timer();
let res = service.call(req).await;
timer.stop_and_record();

res
}
.boxed()
}
}
Loading

0 comments on commit 7218798

Please sign in to comment.