Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Product Data] First step in gateway usage data collection #4963

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 1 addition & 7 deletions gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,9 @@ rand = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
si-scale = { workspace = true }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
"macros",
"migrate",
"time",
] }
subtle-encoding = { workspace = true, features = ["bech32-preview"] }
thiserror = { workspace = true }
time = { workspace = true }
tokio = { workspace = true, features = [
"rt-multi-thread",
"net",
Expand Down
6 changes: 5 additions & 1 deletion gateway/src/node/client_handling/active_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::websocket::message_receiver::{IsActiveRequestSender, MixMessageSender
use crate::node::client_handling::embedded_clients::LocalEmbeddedClientHandle;
use dashmap::DashMap;
use nym_sphinx::DestinationAddressBytes;
use std::sync::Arc;
use std::{collections::HashSet, sync::Arc};
use tracing::warn;

enum ActiveClient {
Expand Down Expand Up @@ -165,4 +165,8 @@ impl ActiveClientsStore {
pub(crate) fn size(&self) -> usize {
self.inner.len()
}

pub(crate) fn client_list(&self) -> HashSet<DestinationAddressBytes> {
self.inner.iter().map(|entry| *entry.key()).collect()
}
}
31 changes: 31 additions & 0 deletions gateway/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ use nym_crypto::asymmetric::{encryption, identity};
use nym_mixnet_client::forwarder::{MixForwardingSender, PacketForwarder};
use nym_network_defaults::NymNetworkDetails;
use nym_network_requester::{LocalGateway, NRServiceProviderBuilder, RequestFilter};
use nym_node_http_api::state::metrics::SharedSessionStats;
use nym_task::{TaskClient, TaskHandle, TaskManager};
use nym_types::gateway::GatewayNodeDetailsResponse;
use nym_validator_client::nyxd::{Coin, CosmWasmClient};
use nym_validator_client::{nyxd, DirectSigningHttpRpcNyxdClient};
use rand::seq::SliceRandom;
use rand::thread_rng;
use statistics::collector::GatewayStatisticsCollector;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
Expand All @@ -36,6 +38,7 @@ use tracing::*;
pub(crate) mod client_handling;
pub(crate) mod helpers;
pub(crate) mod mixnet_handling;
pub(crate) mod statistics;

pub use nym_gateway_storage::{PersistentStorage, Storage};

Expand Down Expand Up @@ -147,6 +150,8 @@ pub struct Gateway<St = PersistentStorage> {

wireguard_data: Option<nym_wireguard::WireguardData>,

session_stats: Option<SharedSessionStats>,

run_http_server: bool,
task_client: Option<TaskClient>,
}
Expand All @@ -168,6 +173,7 @@ impl<St> Gateway<St> {
ip_packet_router_opts,
authenticator_opts: None,
wireguard_data: None,
session_stats: None,
run_http_server: true,
task_client: None,
})
Expand All @@ -191,6 +197,7 @@ impl<St> Gateway<St> {
sphinx_keypair,
storage,
wireguard_data: None,
session_stats: None,
run_http_server: true,
task_client: None,
}
Expand All @@ -204,6 +211,10 @@ impl<St> Gateway<St> {
self.task_client = Some(task_client)
}

pub fn set_session_stats(&mut self, session_stats: SharedSessionStats) {
self.session_stats = Some(session_stats);
}

pub fn set_wireguard_data(&mut self, wireguard_data: nym_wireguard::WireguardData) {
self.wireguard_data = Some(wireguard_data)
}
Expand Down Expand Up @@ -393,6 +404,19 @@ impl<St> Gateway<St> {
packet_sender
}

fn start_stats_collector(
&self,
active_clients_store: ActiveClientsStore,
shared_session_stats: SharedSessionStats,
shutdown: TaskClient,
) {
info!("Starting gateway stats collector...");

let mut stats_collector =
GatewayStatisticsCollector::new(active_clients_store, shared_session_stats);
tokio::spawn(async move { stats_collector.run(shutdown).await });
}

// TODO: rethink the logic in this function...
async fn start_network_requester(
&self,
Expand Down Expand Up @@ -636,6 +660,13 @@ impl<St> Gateway<St> {
shutdown.fork("mixnet_handling::Listener"),
);

let shared_session_stats = self.session_stats.take().unwrap_or_default();
self.start_stats_collector(
active_clients_store.clone(),
shared_session_stats,
shutdown.fork("statistics::sessionCollector"),
);

self.start_client_websocket_listener(
mix_forwarding_channel.clone(),
active_clients_store.clone(),
Expand Down
118 changes: 118 additions & 0 deletions gateway/src/node/statistics/collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2022 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: GPL-3.0-only

use nym_node_http_api::state::metrics::SharedSessionStats;
use nym_sphinx::DestinationAddressBytes;
use nym_task::TaskClient;
use std::{
collections::{HashMap, HashSet},
time::Duration,
};
use time::{Date, OffsetDateTime};
use tracing::trace;

use crate::node::client_handling::active_clients::ActiveClientsStore;

const PROBING_TIME: u64 = 60;
const SESSION_DURATION_INCREMENT: u64 = PROBING_TIME * 1000; //miliseconds
const STATISTICS_GATHERING_TIMER_INTERVAL: Duration = Duration::from_secs(PROBING_TIME); //probing time
const STATISTICS_UPDATE_TIMER_INTERVAL: Duration = Duration::from_secs(3600); //update timer, no need to check everytime

type SessionDuration = u64; //in miliseconds

pub(crate) struct GatewayStatisticsCollector {
gathering_interval: Duration,
update_interval: Duration,
last_update_day: Date,

//settion stats_gathering
active_clients_store: ActiveClientsStore,
shared_session_stats: SharedSessionStats,
active_sessions: HashMap<DestinationAddressBytes, SessionDuration>,
unique_users: HashSet<DestinationAddressBytes>, //might be a bloom filter if this takes too much space
sessions_started: u32,
finished_sessions: Vec<SessionDuration>,
}

impl GatewayStatisticsCollector {
pub fn new(
active_clients_store: ActiveClientsStore,
shared_session_stats: SharedSessionStats,
) -> Self {
GatewayStatisticsCollector {
active_clients_store,
shared_session_stats,
gathering_interval: STATISTICS_GATHERING_TIMER_INTERVAL,
update_interval: STATISTICS_UPDATE_TIMER_INTERVAL,
last_update_day: OffsetDateTime::now_utc().date(),
active_sessions: Default::default(),
unique_users: Default::default(),
sessions_started: 0,
finished_sessions: Default::default(),
}
}

async fn gather_stats(&mut self) {
let current_sessions = self.active_clients_store.client_list();
let past_sessions = self.active_sessions.keys().copied().collect::<HashSet<_>>();

//active and new sessions
for session in &current_sessions {
if let Some(duration) = self.active_sessions.get_mut(session) {
*duration += SESSION_DURATION_INCREMENT;
} else {
self.active_sessions
.insert(*session, SESSION_DURATION_INCREMENT);
self.unique_users.insert(*session);
self.sessions_started += 1;
}
}

//handling finished sessions
for client in past_sessions.difference(&current_sessions) {
if let Some(session_duration) = self.active_sessions.remove(client) {
self.finished_sessions.push(session_duration);
}
}
}
//update shared state once a day has passed, with data from the previous day
async fn update_shared_session_stats(&mut self) {
let mut shared_state = self.shared_session_stats.write().await;
shared_state.update_time = self.last_update_day;
shared_state.unique_active_users = self.unique_users.len() as u32;
shared_state.session_started = self.sessions_started;
shared_state.session_durations = self.finished_sessions.clone();
}

fn reset_stats(&mut self, reset_day: Date) {
self.last_update_day = reset_day;
self.unique_users = self.active_sessions.keys().copied().collect();
self.finished_sessions = Default::default();
self.sessions_started = 0;
}

pub async fn run(&mut self, mut shutdown: TaskClient) {
let mut gathering_interval = tokio::time::interval(self.gathering_interval);
let mut update_interval = tokio::time::interval(self.update_interval);
while !shutdown.is_shutdown() {
tokio::select! {
biased;
_ = shutdown.recv() => {
trace!("StatisticsCollector: Received shutdown");
},
_ = update_interval.tick() => {
let today = OffsetDateTime::now_utc().date();
if today != self.last_update_day {
self.update_shared_session_stats().await;
self.reset_stats(today);
}

},
_ = gathering_interval.tick() => {
self.gather_stats().await;
}

}
}
}
}
4 changes: 4 additions & 0 deletions gateway/src/node/statistics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright 2022 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: GPL-3.0-only

pub mod collector;
3 changes: 3 additions & 0 deletions nym-node/nym-node-http-api/src/router/api/v1/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::api::v1::metrics::mixing::mixing_stats;
use crate::api::v1::metrics::prometheus::prometheus_metrics;
use crate::api::v1::metrics::sessions::sessions_stats;
use crate::api::v1::metrics::verloc::verloc_stats;
use crate::state::metrics::MetricsAppState;
use axum::extract::FromRef;
Expand All @@ -12,6 +13,7 @@ use nym_node_requests::routes::api::v1::metrics;

pub mod mixing;
pub mod prometheus;
pub mod sessions;
pub mod verloc;

#[derive(Debug, Clone, Default)]
Expand All @@ -26,6 +28,7 @@ where
{
Router::new()
.route(metrics::MIXING, get(mixing_stats))
.route(metrics::SESSIONS, get(sessions_stats))
.route(metrics::VERLOC, get(verloc_stats))
.route(metrics::PROMETHEUS, get(prometheus_metrics))
}
33 changes: 33 additions & 0 deletions nym-node/nym-node-http-api/src/router/api/v1/metrics/sessions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: GPL-3.0-only

use crate::state::metrics::MetricsAppState;
use axum::extract::{Query, State};
use nym_http_api_common::{FormattedResponse, OutputParams};
use nym_node_requests::api::v1::metrics::models::SessionStats;

/// If applicable, returns sessions statistics information of this node.
/// This information is **PURELY** self-reported and in no way validated.
#[utoipa::path(
get,
path = "/sessions",
context_path = "/api/v1/metrics",
tag = "Metrics",
responses(
(status = 200, content(
("application/json" = SessionStats),
("application/yaml" = SessionStats)
))
),
params(OutputParams),
)]
pub(crate) async fn sessions_stats(
Query(output): Query<OutputParams>,
State(metrics_state): State<MetricsAppState>,
) -> SessionStatsResponse {
let output = output.output.unwrap_or_default();
let response = metrics_state.session_stats.read().await.as_response();
output.to_response(response)
}

pub type SessionStatsResponse = FormattedResponse<SessionStats>;
59 changes: 57 additions & 2 deletions nym-node/nym-node-http-api/src/state/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
use crate::state::AppState;
use axum::extract::FromRef;
use nym_node_requests::api::v1::metrics::models::{
MixingStats, VerlocResult, VerlocResultData, VerlocStats,
MixingStats, SessionStats, VerlocResult, VerlocResultData, VerlocStats,
};
use std::collections::HashMap;
use std::sync::Arc;
use time::OffsetDateTime;
use time::macros::time;
use time::{Date, OffsetDateTime};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};

pub use nym_node_requests::api::v1::metrics::models::{VerlocMeasurement, VerlocNodeResult};
Expand Down Expand Up @@ -132,12 +133,66 @@ impl VerlocStatsState {
}
}

#[derive(Clone, Debug, Default)]
pub struct SharedSessionStats {
inner: Arc<RwLock<SessionStatsState>>,
}

impl SharedSessionStats {
pub fn new() -> SharedSessionStats {
SharedSessionStats {
inner: Arc::new(RwLock::new(Default::default())),
}
}

pub async fn read(&self) -> RwLockReadGuard<'_, SessionStatsState> {
self.inner.read().await
}

pub async fn write(&self) -> RwLockWriteGuard<'_, SessionStatsState> {
self.inner.write().await
}
}

#[derive(Debug, Clone)]
pub struct SessionStatsState {
pub update_time: Date,
pub unique_active_users: u32,
pub session_started: u32,
pub session_durations: Vec<u64>,
}

impl SessionStatsState {
pub fn as_response(&self) -> SessionStats {
SessionStats {
update_time: self.update_time.with_time(time!(0:00)).assume_utc(),
unique_active_users: self.unique_active_users,
session_durations: self.session_durations.clone(),
sessions_started: self.session_started,
sessions_finished: self.session_durations.len() as u32,
}
}
}

impl Default for SessionStatsState {
fn default() -> Self {
SessionStatsState {
update_time: OffsetDateTime::UNIX_EPOCH.date(),
unique_active_users: 0,
session_started: 0,
session_durations: Default::default(),
}
}
}

#[derive(Debug, Clone, Default)]
pub struct MetricsAppState {
pub(crate) prometheus_access_token: Option<String>,

pub(crate) mixing_stats: SharedMixingStats,

pub(crate) session_stats: SharedSessionStats,

pub(crate) verloc: SharedVerlocStats,
}

Expand Down
Loading
Loading