diff --git a/Cargo.lock b/Cargo.lock index 2e74311fa1..654521a63c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5232,6 +5232,7 @@ dependencies = [ "sqlx", "subtle-encoding", "thiserror", + "time", "tokio", "tokio-stream", "tokio-tungstenite", diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index cbdcfc1ad2..683229f3a9 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -41,15 +41,9 @@ rand = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } si-scale = { workspace = true } -sqlx = { workspace = true, features = [ - "runtime-tokio-rustls", - "sqlite", - "macros", - "migrate", - "time", -] } subtle-encoding = { workspace = true, features = ["bech32-preview"] } thiserror = { workspace = true } +time = { workspace = true } tokio = { workspace = true, features = [ "rt-multi-thread", "net", diff --git a/gateway/src/node/client_handling/active_clients.rs b/gateway/src/node/client_handling/active_clients.rs index 962765067e..0b1ca6ddc2 100644 --- a/gateway/src/node/client_handling/active_clients.rs +++ b/gateway/src/node/client_handling/active_clients.rs @@ -5,7 +5,7 @@ use super::websocket::message_receiver::{IsActiveRequestSender, MixMessageSender use crate::node::client_handling::embedded_clients::LocalEmbeddedClientHandle; use dashmap::DashMap; use nym_sphinx::DestinationAddressBytes; -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use tracing::warn; enum ActiveClient { @@ -165,4 +165,8 @@ impl ActiveClientsStore { pub(crate) fn size(&self) -> usize { self.inner.len() } + + pub(crate) fn client_list(&self) -> HashSet { + self.inner.iter().map(|entry| *entry.key()).collect() + } } diff --git a/gateway/src/node/mod.rs b/gateway/src/node/mod.rs index bf5c3f0ab0..91bef81b88 100644 --- a/gateway/src/node/mod.rs +++ b/gateway/src/node/mod.rs @@ -22,12 +22,14 @@ use nym_crypto::asymmetric::{encryption, identity}; use nym_mixnet_client::forwarder::{MixForwardingSender, PacketForwarder}; use nym_network_defaults::NymNetworkDetails; use nym_network_requester::{LocalGateway, NRServiceProviderBuilder, RequestFilter}; +use nym_node_http_api::state::metrics::SharedSessionStats; use nym_task::{TaskClient, TaskHandle, TaskManager}; use nym_types::gateway::GatewayNodeDetailsResponse; use nym_validator_client::nyxd::{Coin, CosmWasmClient}; use nym_validator_client::{nyxd, DirectSigningHttpRpcNyxdClient}; use rand::seq::SliceRandom; use rand::thread_rng; +use statistics::collector::GatewayStatisticsCollector; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; @@ -36,6 +38,7 @@ use tracing::*; pub(crate) mod client_handling; pub(crate) mod helpers; pub(crate) mod mixnet_handling; +pub(crate) mod statistics; pub use nym_gateway_storage::{PersistentStorage, Storage}; @@ -147,6 +150,8 @@ pub struct Gateway { wireguard_data: Option, + session_stats: Option, + run_http_server: bool, task_client: Option, } @@ -168,6 +173,7 @@ impl Gateway { ip_packet_router_opts, authenticator_opts: None, wireguard_data: None, + session_stats: None, run_http_server: true, task_client: None, }) @@ -191,6 +197,7 @@ impl Gateway { sphinx_keypair, storage, wireguard_data: None, + session_stats: None, run_http_server: true, task_client: None, } @@ -204,6 +211,10 @@ impl Gateway { self.task_client = Some(task_client) } + pub fn set_session_stats(&mut self, session_stats: SharedSessionStats) { + self.session_stats = Some(session_stats); + } + pub fn set_wireguard_data(&mut self, wireguard_data: nym_wireguard::WireguardData) { self.wireguard_data = Some(wireguard_data) } @@ -393,6 +404,19 @@ impl Gateway { packet_sender } + fn start_stats_collector( + &self, + active_clients_store: ActiveClientsStore, + shared_session_stats: SharedSessionStats, + shutdown: TaskClient, + ) { + info!("Starting gateway stats collector..."); + + let mut stats_collector = + GatewayStatisticsCollector::new(active_clients_store, shared_session_stats); + tokio::spawn(async move { stats_collector.run(shutdown).await }); + } + // TODO: rethink the logic in this function... async fn start_network_requester( &self, @@ -636,6 +660,13 @@ impl Gateway { shutdown.fork("mixnet_handling::Listener"), ); + let shared_session_stats = self.session_stats.take().unwrap_or_default(); + self.start_stats_collector( + active_clients_store.clone(), + shared_session_stats, + shutdown.fork("statistics::sessionCollector"), + ); + self.start_client_websocket_listener( mix_forwarding_channel.clone(), active_clients_store.clone(), diff --git a/gateway/src/node/statistics/collector.rs b/gateway/src/node/statistics/collector.rs new file mode 100644 index 0000000000..d617c7520d --- /dev/null +++ b/gateway/src/node/statistics/collector.rs @@ -0,0 +1,118 @@ +// Copyright 2022 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use nym_node_http_api::state::metrics::SharedSessionStats; +use nym_sphinx::DestinationAddressBytes; +use nym_task::TaskClient; +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; +use time::{Date, OffsetDateTime}; +use tracing::trace; + +use crate::node::client_handling::active_clients::ActiveClientsStore; + +const PROBING_TIME: u64 = 60; +const SESSION_DURATION_INCREMENT: u64 = PROBING_TIME * 1000; //miliseconds +const STATISTICS_GATHERING_TIMER_INTERVAL: Duration = Duration::from_secs(PROBING_TIME); //probing time +const STATISTICS_UPDATE_TIMER_INTERVAL: Duration = Duration::from_secs(3600); //update timer, no need to check everytime + +type SessionDuration = u64; //in miliseconds + +pub(crate) struct GatewayStatisticsCollector { + gathering_interval: Duration, + update_interval: Duration, + last_update_day: Date, + + //settion stats_gathering + active_clients_store: ActiveClientsStore, + shared_session_stats: SharedSessionStats, + active_sessions: HashMap, + unique_users: HashSet, //might be a bloom filter if this takes too much space + sessions_started: u32, + finished_sessions: Vec, +} + +impl GatewayStatisticsCollector { + pub fn new( + active_clients_store: ActiveClientsStore, + shared_session_stats: SharedSessionStats, + ) -> Self { + GatewayStatisticsCollector { + active_clients_store, + shared_session_stats, + gathering_interval: STATISTICS_GATHERING_TIMER_INTERVAL, + update_interval: STATISTICS_UPDATE_TIMER_INTERVAL, + last_update_day: OffsetDateTime::now_utc().date(), + active_sessions: Default::default(), + unique_users: Default::default(), + sessions_started: 0, + finished_sessions: Default::default(), + } + } + + async fn gather_stats(&mut self) { + let current_sessions = self.active_clients_store.client_list(); + let past_sessions = self.active_sessions.keys().copied().collect::>(); + + //active and new sessions + for session in ¤t_sessions { + if let Some(duration) = self.active_sessions.get_mut(session) { + *duration += SESSION_DURATION_INCREMENT; + } else { + self.active_sessions + .insert(*session, SESSION_DURATION_INCREMENT); + self.unique_users.insert(*session); + self.sessions_started += 1; + } + } + + //handling finished sessions + for client in past_sessions.difference(¤t_sessions) { + if let Some(session_duration) = self.active_sessions.remove(client) { + self.finished_sessions.push(session_duration); + } + } + } + //update shared state once a day has passed, with data from the previous day + async fn update_shared_session_stats(&mut self) { + let mut shared_state = self.shared_session_stats.write().await; + shared_state.update_time = self.last_update_day; + shared_state.unique_active_users = self.unique_users.len() as u32; + shared_state.session_started = self.sessions_started; + shared_state.session_durations = self.finished_sessions.clone(); + } + + fn reset_stats(&mut self, reset_day: Date) { + self.last_update_day = reset_day; + self.unique_users = self.active_sessions.keys().copied().collect(); + self.finished_sessions = Default::default(); + self.sessions_started = 0; + } + + pub async fn run(&mut self, mut shutdown: TaskClient) { + let mut gathering_interval = tokio::time::interval(self.gathering_interval); + let mut update_interval = tokio::time::interval(self.update_interval); + while !shutdown.is_shutdown() { + tokio::select! { + biased; + _ = shutdown.recv() => { + trace!("StatisticsCollector: Received shutdown"); + }, + _ = update_interval.tick() => { + let today = OffsetDateTime::now_utc().date(); + if today != self.last_update_day { + self.update_shared_session_stats().await; + self.reset_stats(today); + } + + }, + _ = gathering_interval.tick() => { + self.gather_stats().await; + } + + } + } + } +} diff --git a/gateway/src/node/statistics/mod.rs b/gateway/src/node/statistics/mod.rs new file mode 100644 index 0000000000..eea54a1a85 --- /dev/null +++ b/gateway/src/node/statistics/mod.rs @@ -0,0 +1,4 @@ +// Copyright 2022 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +pub mod collector; diff --git a/nym-node/nym-node-http-api/src/router/api/v1/metrics/mod.rs b/nym-node/nym-node-http-api/src/router/api/v1/metrics/mod.rs index 7f3dc9151e..cabf4aedf5 100644 --- a/nym-node/nym-node-http-api/src/router/api/v1/metrics/mod.rs +++ b/nym-node/nym-node-http-api/src/router/api/v1/metrics/mod.rs @@ -3,6 +3,7 @@ use crate::api::v1::metrics::mixing::mixing_stats; use crate::api::v1::metrics::prometheus::prometheus_metrics; +use crate::api::v1::metrics::sessions::sessions_stats; use crate::api::v1::metrics::verloc::verloc_stats; use crate::state::metrics::MetricsAppState; use axum::extract::FromRef; @@ -12,6 +13,7 @@ use nym_node_requests::routes::api::v1::metrics; pub mod mixing; pub mod prometheus; +pub mod sessions; pub mod verloc; #[derive(Debug, Clone, Default)] @@ -26,6 +28,7 @@ where { Router::new() .route(metrics::MIXING, get(mixing_stats)) + .route(metrics::SESSIONS, get(sessions_stats)) .route(metrics::VERLOC, get(verloc_stats)) .route(metrics::PROMETHEUS, get(prometheus_metrics)) } diff --git a/nym-node/nym-node-http-api/src/router/api/v1/metrics/sessions.rs b/nym-node/nym-node-http-api/src/router/api/v1/metrics/sessions.rs new file mode 100644 index 0000000000..59eceb8f89 --- /dev/null +++ b/nym-node/nym-node-http-api/src/router/api/v1/metrics/sessions.rs @@ -0,0 +1,33 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use crate::state::metrics::MetricsAppState; +use axum::extract::{Query, State}; +use nym_http_api_common::{FormattedResponse, OutputParams}; +use nym_node_requests::api::v1::metrics::models::SessionStats; + +/// If applicable, returns sessions statistics information of this node. +/// This information is **PURELY** self-reported and in no way validated. +#[utoipa::path( + get, + path = "/sessions", + context_path = "/api/v1/metrics", + tag = "Metrics", + responses( + (status = 200, content( + ("application/json" = SessionStats), + ("application/yaml" = SessionStats) + )) + ), + params(OutputParams), +)] +pub(crate) async fn sessions_stats( + Query(output): Query, + State(metrics_state): State, +) -> SessionStatsResponse { + let output = output.output.unwrap_or_default(); + let response = metrics_state.session_stats.read().await.as_response(); + output.to_response(response) +} + +pub type SessionStatsResponse = FormattedResponse; diff --git a/nym-node/nym-node-http-api/src/state/metrics.rs b/nym-node/nym-node-http-api/src/state/metrics.rs index d4691727d9..75616cf885 100644 --- a/nym-node/nym-node-http-api/src/state/metrics.rs +++ b/nym-node/nym-node-http-api/src/state/metrics.rs @@ -4,11 +4,12 @@ use crate::state::AppState; use axum::extract::FromRef; use nym_node_requests::api::v1::metrics::models::{ - MixingStats, VerlocResult, VerlocResultData, VerlocStats, + MixingStats, SessionStats, VerlocResult, VerlocResultData, VerlocStats, }; use std::collections::HashMap; use std::sync::Arc; -use time::OffsetDateTime; +use time::macros::time; +use time::{Date, OffsetDateTime}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; pub use nym_node_requests::api::v1::metrics::models::{VerlocMeasurement, VerlocNodeResult}; @@ -132,12 +133,66 @@ impl VerlocStatsState { } } +#[derive(Clone, Debug, Default)] +pub struct SharedSessionStats { + inner: Arc>, +} + +impl SharedSessionStats { + pub fn new() -> SharedSessionStats { + SharedSessionStats { + inner: Arc::new(RwLock::new(Default::default())), + } + } + + pub async fn read(&self) -> RwLockReadGuard<'_, SessionStatsState> { + self.inner.read().await + } + + pub async fn write(&self) -> RwLockWriteGuard<'_, SessionStatsState> { + self.inner.write().await + } +} + +#[derive(Debug, Clone)] +pub struct SessionStatsState { + pub update_time: Date, + pub unique_active_users: u32, + pub session_started: u32, + pub session_durations: Vec, +} + +impl SessionStatsState { + pub fn as_response(&self) -> SessionStats { + SessionStats { + update_time: self.update_time.with_time(time!(0:00)).assume_utc(), + unique_active_users: self.unique_active_users, + session_durations: self.session_durations.clone(), + sessions_started: self.session_started, + sessions_finished: self.session_durations.len() as u32, + } + } +} + +impl Default for SessionStatsState { + fn default() -> Self { + SessionStatsState { + update_time: OffsetDateTime::UNIX_EPOCH.date(), + unique_active_users: 0, + session_started: 0, + session_durations: Default::default(), + } + } +} + #[derive(Debug, Clone, Default)] pub struct MetricsAppState { pub(crate) prometheus_access_token: Option, pub(crate) mixing_stats: SharedMixingStats, + pub(crate) session_stats: SharedSessionStats, + pub(crate) verloc: SharedVerlocStats, } diff --git a/nym-node/nym-node-http-api/src/state/mod.rs b/nym-node/nym-node-http-api/src/state/mod.rs index 077ca782b6..67f16d1bd3 100644 --- a/nym-node/nym-node-http-api/src/state/mod.rs +++ b/nym-node/nym-node-http-api/src/state/mod.rs @@ -1,7 +1,9 @@ // Copyright 2023-2024 - Nym Technologies SA // SPDX-License-Identifier: GPL-3.0-only -use crate::state::metrics::{MetricsAppState, SharedMixingStats, SharedVerlocStats}; +use crate::state::metrics::{ + MetricsAppState, SharedMixingStats, SharedSessionStats, SharedVerlocStats, +}; use tokio::time::Instant; pub mod metrics; @@ -32,6 +34,12 @@ impl AppState { self } + #[must_use] + pub fn with_sessions_stats(mut self, session_stats: SharedSessionStats) -> Self { + self.metrics.session_stats = session_stats; + self + } + #[must_use] pub fn with_verloc_stats(mut self, verloc_stats: SharedVerlocStats) -> Self { self.metrics.verloc = verloc_stats; diff --git a/nym-node/nym-node-requests/src/api/v1/metrics/models.rs b/nym-node/nym-node-requests/src/api/v1/metrics/models.rs index f0051e81e1..502e223c24 100644 --- a/nym-node/nym-node-requests/src/api/v1/metrics/models.rs +++ b/nym-node/nym-node-requests/src/api/v1/metrics/models.rs @@ -35,6 +35,21 @@ pub struct MixingStats { pub dropped_since_last_update: u64, } +#[derive(Serialize, Deserialize, Debug, Clone)] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] +pub struct SessionStats { + #[serde(with = "time::serde::rfc3339")] + pub update_time: OffsetDateTime, + + pub unique_active_users: u32, + + pub session_durations: Vec, + + pub sessions_started: u32, + + pub sessions_finished: u32, +} + #[derive(Serialize, Deserialize, Default, Debug, Clone)] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub struct VerlocStats { diff --git a/nym-node/nym-node-requests/src/lib.rs b/nym-node/nym-node-requests/src/lib.rs index 7b4e0ee9b7..983b1a6aec 100644 --- a/nym-node/nym-node-requests/src/lib.rs +++ b/nym-node/nym-node-requests/src/lib.rs @@ -65,10 +65,12 @@ pub mod routes { use super::*; pub const MIXING: &str = "/mixing"; + pub const SESSIONS: &str = "/sessions"; pub const VERLOC: &str = "/verloc"; pub const PROMETHEUS: &str = "/prometheus"; absolute_route!(mixing_absolute, metrics_absolute(), MIXING); + absolute_route!(sessions_absolute, metrics_absolute(), SESSIONS); absolute_route!(verloc_absolute, metrics_absolute(), VERLOC); absolute_route!(prometheus_absolute, metrics_absolute(), PROMETHEUS); } diff --git a/nym-node/src/node/mod.rs b/nym-node/src/node/mod.rs index d3e756c6e0..5a7bc0c419 100644 --- a/nym-node/src/node/mod.rs +++ b/nym-node/src/node/mod.rs @@ -26,7 +26,7 @@ use nym_node::config::{ use nym_node::error::{EntryGatewayError, ExitGatewayError, MixnodeError, NymNodeError}; use nym_node_http_api::api::api_requests; use nym_node_http_api::api::api_requests::v1::node::models::{AnnouncePorts, NodeDescription}; -use nym_node_http_api::state::metrics::{SharedMixingStats, SharedVerlocStats}; +use nym_node_http_api::state::metrics::{SharedMixingStats, SharedSessionStats, SharedVerlocStats}; use nym_node_http_api::state::AppState; use nym_node_http_api::{NymNodeHTTPServer, NymNodeRouter}; use nym_sphinx_acknowledgements::AckKey; @@ -67,6 +67,7 @@ impl MixnodeData { pub struct EntryGatewayData { mnemonic: Zeroizing, client_storage: nym_gateway::node::PersistentStorage, + sessions_stats: SharedSessionStats, } impl EntryGatewayData { @@ -93,6 +94,7 @@ impl EntryGatewayData { ) .await .map_err(nym_gateway::GatewayError::from)?, + sessions_stats: SharedSessionStats::new(), }) } } @@ -581,6 +583,7 @@ impl NymNode { ); entry_gateway.disable_http_server(); entry_gateway.set_task_client(task_client); + entry_gateway.set_session_stats(self.entry_gateway.sessions_stats.clone()); if self.config.wireguard.enabled { entry_gateway.set_wireguard_data(self.wireguard.into()); } @@ -728,6 +731,7 @@ impl NymNode { let app_state = AppState::new() .with_mixing_stats(self.mixnode.mixing_stats.clone()) + .with_sessions_stats(self.entry_gateway.sessions_stats.clone()) .with_verloc_stats(self.verloc_stats.clone()) .with_metrics_key(self.config.http.access_token.clone());