From 83ba70d5200ea82ee16979b654db1be4af1b6f14 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Tue, 3 Sep 2024 13:04:26 +0300 Subject: [PATCH 01/17] require kdf connection string in the config file Signed-off-by: onur-ozkan --- assets/.config_test | 1 + src/ctx.rs | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/assets/.config_test b/assets/.config_test index c8b140f..76336e4 100644 --- a/assets/.config_test +++ b/assets/.config_test @@ -1,6 +1,7 @@ { "port": 6150, "redis_connection_string": "redis://redis:6379", + "kdf_connection_string": "http://127.0.0.1:7783", "pubkey_path": "/usr/src/komodo-defi-proxy/assets/.pubkey_test", "privkey_path": "/usr/src/komodo-defi-proxy/assets/.privkey_test", "token_expiration_time": 300, diff --git a/src/ctx.rs b/src/ctx.rs index 7bece5f..ea03ff8 100644 --- a/src/ctx.rs +++ b/src/ctx.rs @@ -23,6 +23,8 @@ pub(crate) struct AppConfig { pub(crate) port: Option, /// Redis database connection string. pub(crate) redis_connection_string: String, + /// komodo-defi-framework connection string. + pub(crate) kdf_connection_string: String, /// File path to the public key used for user verification and authentication. pub(crate) pubkey_path: String, /// File path to the private key used for user verification and authentication. @@ -107,6 +109,7 @@ pub(crate) fn get_app_config_test_instance() -> AppConfig { AppConfig { port: Some(6150), redis_connection_string: String::from("redis://redis:6379"), + kdf_connection_string: String::from("http://127.0.0.1:7783"), pubkey_path: String::from("/usr/src/komodo-defi-proxy/assets/.pubkey_test"), privkey_path: String::from("/usr/src/komodo-defi-proxy/assets/.privkey_test"), token_expiration_time: Some(300), @@ -185,6 +188,7 @@ fn test_app_config_serialzation_and_deserialization() { let json_config = serde_json::json!({ "port": 6150, "redis_connection_string": "redis://redis:6379", + "kdf_connection_string": "http://127.0.0.1:7783", "pubkey_path": "/usr/src/komodo-defi-proxy/assets/.pubkey_test", "privkey_path": "/usr/src/komodo-defi-proxy/assets/.privkey_test", "token_expiration_time": 300, From b0bbd12c08fd0a4be0a66ffd14b7921f33c6d647 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Wed, 4 Sep 2024 09:43:16 +0300 Subject: [PATCH 02/17] add {serializer, deserializer} wrappers for `RpcClient` Signed-off-by: onur-ozkan --- assets/.config_test | 2 +- src/ctx.rs | 30 +++++++++++++++++++++++++----- src/net/rpc.rs | 14 +++++--------- 3 files changed, 31 insertions(+), 15 deletions(-) diff --git a/assets/.config_test b/assets/.config_test index 76336e4..c274776 100644 --- a/assets/.config_test +++ b/assets/.config_test @@ -1,7 +1,7 @@ { "port": 6150, "redis_connection_string": "redis://redis:6379", - "kdf_connection_string": "http://127.0.0.1:7783", + "kdf_rpc_client": "http://127.0.0.1:7783", "pubkey_path": "/usr/src/komodo-defi-proxy/assets/.pubkey_test", "privkey_path": "/usr/src/komodo-defi-proxy/assets/.privkey_test", "token_expiration_time": 300, diff --git a/src/ctx.rs b/src/ctx.rs index ea03ff8..a8a2af7 100644 --- a/src/ctx.rs +++ b/src/ctx.rs @@ -1,7 +1,8 @@ use hyper::Uri; use once_cell::sync::OnceCell; use proxy::ProxyType; -use serde::{Deserialize, Serialize}; +use rpc::RpcClient; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::env; pub(crate) use super::*; @@ -16,6 +17,21 @@ pub(crate) fn get_app_config() -> &'static AppConfig { }) } +fn deserialize_rpc_client<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let connection_string = String::deserialize(deserializer)?; + Ok(RpcClient::new(connection_string)) +} + +fn serialize_rpc_client(v: &RpcClient, serializer: S) -> Result +where + S: Serializer, +{ + serializer.serialize_str(&v.url) +} + /// Configuration settings for the application, loaded typically from a JSON configuration file. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub(crate) struct AppConfig { @@ -23,8 +39,12 @@ pub(crate) struct AppConfig { pub(crate) port: Option, /// Redis database connection string. pub(crate) redis_connection_string: String, - /// komodo-defi-framework connection string. - pub(crate) kdf_connection_string: String, + /// RPC client for komodo-defi-framework. + #[serde( + serialize_with = "serialize_rpc_client", + deserialize_with = "deserialize_rpc_client" + )] + pub(crate) kdf_rpc_client: RpcClient, /// File path to the public key used for user verification and authentication. pub(crate) pubkey_path: String, /// File path to the private key used for user verification and authentication. @@ -109,7 +129,7 @@ pub(crate) fn get_app_config_test_instance() -> AppConfig { AppConfig { port: Some(6150), redis_connection_string: String::from("redis://redis:6379"), - kdf_connection_string: String::from("http://127.0.0.1:7783"), + kdf_rpc_client: RpcClient::new("http://127.0.0.1:7783".into()), pubkey_path: String::from("/usr/src/komodo-defi-proxy/assets/.pubkey_test"), privkey_path: String::from("/usr/src/komodo-defi-proxy/assets/.privkey_test"), token_expiration_time: Some(300), @@ -188,7 +208,7 @@ fn test_app_config_serialzation_and_deserialization() { let json_config = serde_json::json!({ "port": 6150, "redis_connection_string": "redis://redis:6379", - "kdf_connection_string": "http://127.0.0.1:7783", + "kdf_rpc_client": "http://127.0.0.1:7783", "pubkey_path": "/usr/src/komodo-defi-proxy/assets/.pubkey_test", "privkey_path": "/usr/src/komodo-defi-proxy/assets/.privkey_test", "token_expiration_time": 300, diff --git a/src/net/rpc.rs b/src/net/rpc.rs index 6897d82..d9f1d75 100644 --- a/src/net/rpc.rs +++ b/src/net/rpc.rs @@ -1,5 +1,3 @@ -#![allow(dead_code)] // TODO: remove this - use bytes::Buf; use ctx::AppConfig; use hyper::{body::aggregate, header, Body, Request}; @@ -12,9 +10,7 @@ use crate::proxy::{insert_jwt_to_http_header, APPLICATION_JSON}; use super::*; -pub(crate) type Json = serde_json::Value; - -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub(crate) struct RpcClient { pub(crate) url: String, } @@ -30,7 +26,7 @@ pub(crate) enum Id { #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub(crate) struct RpcPayload { pub(crate) method: String, - pub(crate) params: serde_json::value::Value, + pub(crate) params: serde_json::Value, pub(crate) id: Id, pub(crate) jsonrpc: String, } @@ -41,7 +37,7 @@ pub(crate) struct RpcPayload { #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub(crate) struct RpcSocketPayload { pub(crate) method: String, - pub(crate) params: serde_json::value::Value, + pub(crate) params: serde_json::Value, pub(crate) id: Id, pub(crate) jsonrpc: String, pub(crate) proxy_sign: ProxySign, @@ -68,9 +64,9 @@ impl RpcClient { pub(crate) async fn send( &self, cfg: &AppConfig, - payload: Json, + payload: serde_json::Value, is_authorized: bool, - ) -> GenericResult { + ) -> GenericResult { let mut req = Request::post(&self.url).body(Body::from(payload.to_string()))?; req.headers_mut() .append(header::CONTENT_TYPE, APPLICATION_JSON.parse()?); From b14b8734cd490279ed16f00af9bd4bfc1b039195 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Wed, 4 Sep 2024 09:52:32 +0300 Subject: [PATCH 03/17] implement expirable hashmap Signed-off-by: onur-ozkan --- Cargo.lock | 7 +++ Cargo.toml | 1 + src/expirable_map.rs | 141 +++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 1 + 4 files changed, 150 insertions(+) create mode 100644 src/expirable_map.rs diff --git a/Cargo.lock b/Cargo.lock index 9861d54..78417ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1067,6 +1067,7 @@ dependencies = [ "once_cell", "proxy_signature", "redis", + "rustc-hash", "serde", "serde_json", "sha3", @@ -1571,6 +1572,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc-hex" version = "2.1.0" diff --git a/Cargo.toml b/Cargo.toml index 24dfac3..c3c62b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ log = "0.4.17" once_cell = "1.12.0" url = { version = "2.2.2", features = ["serde"] } redis = { version = "0.21.5", default-features = false, features = ["tokio-comp"] } +rustc-hash = "1.1.0" serde = "1.0.137" serde_json = { version = "1.0.81", features = ["preserve_order", "raw_value"] } sha3 = "0.9" diff --git a/src/expirable_map.rs b/src/expirable_map.rs new file mode 100644 index 0000000..60acbca --- /dev/null +++ b/src/expirable_map.rs @@ -0,0 +1,141 @@ +//! Provides a map that associates values with keys and supports expiring entries. +//! +//! Designed for performance-oriented use-cases utilizing `FxHashMap` under the hood, +//! and is not suitable for cryptographic purposes. + +use rustc_hash::FxHashMap; +use std::{ + hash::Hash, + time::{Duration, Instant}, +}; + +#[derive(Clone, Debug)] +pub struct ExpirableEntry { + pub(crate) value: V, + pub(crate) expires_at: Instant, +} + +impl ExpirableEntry { + pub fn get_element(&self) -> &V { + &self.value + } + + pub fn update_expiration(&mut self, expires_at: Instant) { + self.expires_at = expires_at + } +} + +impl Default for ExpirableMap { + fn default() -> Self { + Self::new() + } +} + +/// A map that allows associating values with keys and expiring entries. +/// It is important to note that this implementation does not automatically +/// remove any entries; it is the caller's responsibility to invoke `clear_expired_entries` +/// at specified intervals. +/// +/// WARNING: This is designed for performance-oriented use-cases utilizing `FxHashMap` +/// under the hood and is not suitable for cryptographic purposes. +#[derive(Clone, Debug)] +pub struct ExpirableMap(FxHashMap>); + +impl ExpirableMap { + /// Creates a new empty `ExpirableMap` + #[inline] + pub fn new() -> Self { + Self(FxHashMap::default()) + } + + /// Returns the associated value if present. + #[inline] + pub fn get(&mut self, k: &K) -> Option<&V> { + self.0.get(k).map(|v| &v.value) + } + + /// Inserts a key-value pair with an expiration duration. + /// + /// If a value already exists for the given key, it will be updated and then + /// the old one will be returned. + pub fn insert(&mut self, k: K, v: V, exp: Duration) -> Option { + let entry = ExpirableEntry { + expires_at: Instant::now() + exp, + value: v, + }; + + self.0.insert(k, entry).map(|v| v.value) + } + + /// Removes expired entries from the map. + pub fn clear_expired_entries(&mut self) { + self.0.retain(|_k, v| Instant::now() < v.expires_at); + } + + /// Removes a key-value pair from the map and returns the associated value if present. + #[inline] + pub fn remove(&mut self, k: &K) -> Option { + self.0.remove(k).map(|v| v.value) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_clear_expired_entries() { + let mut expirable_map = ExpirableMap::new(); + let value = "test_value"; + let exp = Duration::from_secs(1); + + // Insert 2 entries with 1 sec expiration time + expirable_map.insert("key1".to_string(), value.to_string(), exp); + expirable_map.insert("key2".to_string(), value.to_string(), exp); + + // Wait for entries to expire + tokio::time::sleep(Duration::from_secs(2)).await; + + // Clear expired entries + expirable_map.clear_expired_entries(); + + // We waited for 2 seconds, so we shouldn't have any entry accessible + assert_eq!(expirable_map.0.len(), 0); + + // Insert 5 entries + expirable_map.insert( + "key1".to_string(), + value.to_string(), + Duration::from_secs(5), + ); + expirable_map.insert( + "key2".to_string(), + value.to_string(), + Duration::from_secs(4), + ); + expirable_map.insert( + "key3".to_string(), + value.to_string(), + Duration::from_secs(7), + ); + expirable_map.insert( + "key4".to_string(), + value.to_string(), + Duration::from_secs(2), + ); + expirable_map.insert( + "key5".to_string(), + value.to_string(), + Duration::from_millis(3750), + ); + + // Wait 2 seconds to expire some entries + tokio::time::sleep(Duration::from_secs(2)).await; + + // Clear expired entries + expirable_map.clear_expired_entries(); + + // We waited for 2 seconds, only one entry should expire + assert_eq!(expirable_map.0.len(), 4); + } +} diff --git a/src/main.rs b/src/main.rs index 6902fc5..0122fe6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ use server::serve; mod address_status; mod ctx; mod db; +mod expirable_map; #[path = "security/jwt.rs"] mod jwt; mod logger; From 99bbf6615687ce6d2121195e986a132d0c64eaec Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Wed, 4 Sep 2024 12:49:50 +0300 Subject: [PATCH 04/17] implement peer status check logic into the middleware Signed-off-by: onur-ozkan --- assets/.config_test | 1 + src/ctx.rs | 4 +++ src/proxy/http/mod.rs | 61 ++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 63 insertions(+), 3 deletions(-) diff --git a/assets/.config_test b/assets/.config_test index c274776..c289c2f 100644 --- a/assets/.config_test +++ b/assets/.config_test @@ -2,6 +2,7 @@ "port": 6150, "redis_connection_string": "redis://redis:6379", "kdf_rpc_client": "http://127.0.0.1:7783", + "kdf_rpc_password": "testpass", "pubkey_path": "/usr/src/komodo-defi-proxy/assets/.pubkey_test", "privkey_path": "/usr/src/komodo-defi-proxy/assets/.privkey_test", "token_expiration_time": 300, diff --git a/src/ctx.rs b/src/ctx.rs index a8a2af7..5eb24ef 100644 --- a/src/ctx.rs +++ b/src/ctx.rs @@ -45,6 +45,8 @@ pub(crate) struct AppConfig { deserialize_with = "deserialize_rpc_client" )] pub(crate) kdf_rpc_client: RpcClient, + /// `rpc_userpass` which is required for kdf RPCs. + pub(crate) kdf_rpc_password: String, /// File path to the public key used for user verification and authentication. pub(crate) pubkey_path: String, /// File path to the private key used for user verification and authentication. @@ -130,6 +132,7 @@ pub(crate) fn get_app_config_test_instance() -> AppConfig { port: Some(6150), redis_connection_string: String::from("redis://redis:6379"), kdf_rpc_client: RpcClient::new("http://127.0.0.1:7783".into()), + kdf_rpc_password: String::from("testpass"), pubkey_path: String::from("/usr/src/komodo-defi-proxy/assets/.pubkey_test"), privkey_path: String::from("/usr/src/komodo-defi-proxy/assets/.privkey_test"), token_expiration_time: Some(300), @@ -209,6 +212,7 @@ fn test_app_config_serialzation_and_deserialization() { "port": 6150, "redis_connection_string": "redis://redis:6379", "kdf_rpc_client": "http://127.0.0.1:7783", + "kdf_rpc_password": "testpass", "pubkey_path": "/usr/src/komodo-defi-proxy/assets/.pubkey_test", "privkey_path": "/usr/src/komodo-defi-proxy/assets/.privkey_test", "token_expiration_time": 300, diff --git a/src/proxy/http/mod.rs b/src/proxy/http/mod.rs index 5c46c45..77517d3 100644 --- a/src/proxy/http/mod.rs +++ b/src/proxy/http/mod.rs @@ -1,12 +1,13 @@ -use std::net::SocketAddr; - use hyper::{StatusCode, Uri}; use proxy_signature::ProxySign; +use std::{net::SocketAddr, sync::LazyLock, time::Duration}; +use tokio::sync::Mutex; use crate::{ address_status::{AddressStatus, AddressStatusOperations}, ctx::{AppConfig, ProxyRoute}, db::Db, + expirable_map::ExpirableMap, logger::tracked_log, rate_limiter::RateLimitOperations, }; @@ -14,7 +15,6 @@ use crate::{ pub(crate) mod get; pub(crate) mod post; -// TODO: Query peers on KDF seeds pub(crate) async fn validation_middleware( cfg: &AppConfig, signed_message: &ProxySign, @@ -24,6 +24,61 @@ pub(crate) async fn validation_middleware( ) -> Result<(), StatusCode> { let mut db = Db::create_instance(cfg).await; + // Once we know a peer is connected to the KDF network, we can assume they are connected + // for 10 seconds without asking again. + const KNOW_PEER_EXPIRATION: Duration = Duration::from_secs(10); + static KNOWN_PEERS: LazyLock>> = + LazyLock::new(|| Mutex::new(ExpirableMap::new())); + + let mut know_peers = KNOWN_PEERS.lock().await; + + know_peers.clear_expired_entries(); + let is_known = know_peers.get(&signed_message.address).is_some(); + + if !is_known { + let payload = serde_json::json!({ + "userpass": cfg.kdf_rpc_password, + "method": "peer_connection_healthcheck", + "mmrpc": "2.0", + "params": { + "peer_id": signed_message.address + } + }); + + match cfg.kdf_rpc_client.send(cfg, payload, false).await { + Ok(response) => { + if response["result"] == serde_json::json!(true) { + know_peers.insert(signed_message.address.clone(), (), KNOW_PEER_EXPIRATION); + } else { + tracked_log( + log::Level::Warn, + remote_addr.ip(), + &signed_message.address, + req_uri, + "Peer isn't connected to KDF network, returning 401", + ); + + return Err(StatusCode::UNAUTHORIZED); + } + } + Err(error) => { + tracked_log( + log::Level::Error, + remote_addr.ip(), + &signed_message.address, + req_uri, + format!( + "`peer_connection_healthcheck` RPC failed, returning 500. Error: {}", + error + ), + ); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + } + } + + drop(know_peers); + match db.read_address_status(&signed_message.address).await { AddressStatus::Trusted => Ok(()), AddressStatus::Blocked => Err(StatusCode::FORBIDDEN), From 031f33ea1788df0633f6635894e2bab704b6f42c Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Wed, 4 Sep 2024 13:01:47 +0300 Subject: [PATCH 05/17] check if KDF is available on app initialization Signed-off-by: onur-ozkan --- src/main.rs | 5 +++++ src/net/kdf.rs | 26 ++++++++++++++++++++++++++ src/proxy/http/mod.rs | 12 ++---------- 3 files changed, 33 insertions(+), 10 deletions(-) create mode 100644 src/net/kdf.rs diff --git a/src/main.rs b/src/main.rs index 0122fe6..a89ac5c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ use ctx::get_app_config; use db::get_redis_connection; +use kdf::version_rpc; use server::serve; #[path = "security/address_status.rs"] @@ -9,6 +10,8 @@ mod db; mod expirable_map; #[path = "security/jwt.rs"] mod jwt; +#[path = "net/kdf.rs"] +mod kdf; mod logger; mod proxy; #[path = "security/rate_limiter.rs"] @@ -38,5 +41,7 @@ async fn main() -> GenericResult<()> { // to panic if redis is not available get_redis_connection(cfg).await; + version_rpc(cfg).await.expect("KDF is not available."); + serve(cfg).await } diff --git a/src/net/kdf.rs b/src/net/kdf.rs new file mode 100644 index 0000000..d229a50 --- /dev/null +++ b/src/net/kdf.rs @@ -0,0 +1,26 @@ +use crate::{ctx::AppConfig, GenericResult}; + +pub(crate) async fn version_rpc(cfg: &AppConfig) -> GenericResult { + let payload = serde_json::json!({ + "userpass": cfg.kdf_rpc_password, + "method": "version", + }); + + cfg.kdf_rpc_client.send(cfg, payload, false).await +} + +pub(crate) async fn peer_connection_healthcheck_rpc( + cfg: &AppConfig, + peer_id: &str, +) -> GenericResult { + let payload = serde_json::json!({ + "userpass": cfg.kdf_rpc_password, + "method": "peer_connection_healthcheck", + "mmrpc": "2.0", + "params": { + "peer_id": peer_id + } + }); + + cfg.kdf_rpc_client.send(cfg, payload, false).await +} diff --git a/src/proxy/http/mod.rs b/src/proxy/http/mod.rs index 77517d3..899a3fa 100644 --- a/src/proxy/http/mod.rs +++ b/src/proxy/http/mod.rs @@ -8,6 +8,7 @@ use crate::{ ctx::{AppConfig, ProxyRoute}, db::Db, expirable_map::ExpirableMap, + kdf::peer_connection_healthcheck_rpc, logger::tracked_log, rate_limiter::RateLimitOperations, }; @@ -36,16 +37,7 @@ pub(crate) async fn validation_middleware( let is_known = know_peers.get(&signed_message.address).is_some(); if !is_known { - let payload = serde_json::json!({ - "userpass": cfg.kdf_rpc_password, - "method": "peer_connection_healthcheck", - "mmrpc": "2.0", - "params": { - "peer_id": signed_message.address - } - }); - - match cfg.kdf_rpc_client.send(cfg, payload, false).await { + match peer_connection_healthcheck_rpc(cfg, &signed_message.address).await { Ok(response) => { if response["result"] == serde_json::json!(true) { know_peers.insert(signed_message.address.clone(), (), KNOW_PEER_EXPIRATION); From 7a985ad2ad3c9f5b57eb99b8fb89cffc140929bd Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Wed, 4 Sep 2024 13:04:01 +0300 Subject: [PATCH 06/17] update kdf rpc module Signed-off-by: onur-ozkan --- src/main.rs | 6 +++--- src/net/{kdf.rs => kdf_rpc_interface.rs} | 0 src/proxy/http/mod.rs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) rename src/net/{kdf.rs => kdf_rpc_interface.rs} (100%) diff --git a/src/main.rs b/src/main.rs index a89ac5c..1c859af 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use ctx::get_app_config; use db::get_redis_connection; -use kdf::version_rpc; +use kdf_rpc_interface::version_rpc; use server::serve; #[path = "security/address_status.rs"] @@ -10,8 +10,8 @@ mod db; mod expirable_map; #[path = "security/jwt.rs"] mod jwt; -#[path = "net/kdf.rs"] -mod kdf; +#[path = "net/kdf_rpc_interface.rs"] +mod kdf_rpc_interface; mod logger; mod proxy; #[path = "security/rate_limiter.rs"] diff --git a/src/net/kdf.rs b/src/net/kdf_rpc_interface.rs similarity index 100% rename from src/net/kdf.rs rename to src/net/kdf_rpc_interface.rs diff --git a/src/proxy/http/mod.rs b/src/proxy/http/mod.rs index 899a3fa..f831572 100644 --- a/src/proxy/http/mod.rs +++ b/src/proxy/http/mod.rs @@ -8,7 +8,7 @@ use crate::{ ctx::{AppConfig, ProxyRoute}, db::Db, expirable_map::ExpirableMap, - kdf::peer_connection_healthcheck_rpc, + kdf_rpc_interface::peer_connection_healthcheck_rpc, logger::tracked_log, rate_limiter::RateLimitOperations, }; From 2e1f2548f7bef3f59f161222d3b39fdb8f1dde0a Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Mon, 9 Sep 2024 08:38:06 +0300 Subject: [PATCH 07/17] allow dead-code for various `expirable_map` functions Signed-off-by: onur-ozkan --- src/expirable_map.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/expirable_map.rs b/src/expirable_map.rs index 60acbca..33db871 100644 --- a/src/expirable_map.rs +++ b/src/expirable_map.rs @@ -16,10 +16,12 @@ pub struct ExpirableEntry { } impl ExpirableEntry { + #[allow(dead_code)] pub fn get_element(&self) -> &V { &self.value } + #[allow(dead_code)] pub fn update_expiration(&mut self, expires_at: Instant) { self.expires_at = expires_at } @@ -74,6 +76,7 @@ impl ExpirableMap { /// Removes a key-value pair from the map and returns the associated value if present. #[inline] + #[allow(dead_code)] pub fn remove(&mut self, k: &K) -> Option { self.0.remove(k).map(|v| v.value) } From a59a5a1ca6b92387c32ef05e48574dbc90858e02 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Mon, 9 Sep 2024 12:03:05 +0300 Subject: [PATCH 08/17] move `peer_connection_healthcheck` priority Signed-off-by: onur-ozkan --- src/proxy/http/mod.rs | 101 +++++++++++++++++++++++------------------- 1 file changed, 55 insertions(+), 46 deletions(-) diff --git a/src/proxy/http/mod.rs b/src/proxy/http/mod.rs index f831572..ca4e17d 100644 --- a/src/proxy/http/mod.rs +++ b/src/proxy/http/mod.rs @@ -25,56 +25,12 @@ pub(crate) async fn validation_middleware( ) -> Result<(), StatusCode> { let mut db = Db::create_instance(cfg).await; - // Once we know a peer is connected to the KDF network, we can assume they are connected - // for 10 seconds without asking again. - const KNOW_PEER_EXPIRATION: Duration = Duration::from_secs(10); - static KNOWN_PEERS: LazyLock>> = - LazyLock::new(|| Mutex::new(ExpirableMap::new())); - - let mut know_peers = KNOWN_PEERS.lock().await; - - know_peers.clear_expired_entries(); - let is_known = know_peers.get(&signed_message.address).is_some(); - - if !is_known { - match peer_connection_healthcheck_rpc(cfg, &signed_message.address).await { - Ok(response) => { - if response["result"] == serde_json::json!(true) { - know_peers.insert(signed_message.address.clone(), (), KNOW_PEER_EXPIRATION); - } else { - tracked_log( - log::Level::Warn, - remote_addr.ip(), - &signed_message.address, - req_uri, - "Peer isn't connected to KDF network, returning 401", - ); - - return Err(StatusCode::UNAUTHORIZED); - } - } - Err(error) => { - tracked_log( - log::Level::Error, - remote_addr.ip(), - &signed_message.address, - req_uri, - format!( - "`peer_connection_healthcheck` RPC failed, returning 500. Error: {}", - error - ), - ); - return Err(StatusCode::INTERNAL_SERVER_ERROR); - } - } - } - - drop(know_peers); - match db.read_address_status(&signed_message.address).await { AddressStatus::Trusted => Ok(()), AddressStatus::Blocked => Err(StatusCode::FORBIDDEN), AddressStatus::None => { + peer_connection_healthcheck(cfg, signed_message, req_uri, remote_addr).await?; + if !signed_message.is_valid_message() { tracked_log( log::Level::Warn, @@ -140,6 +96,59 @@ pub(crate) async fn validation_middleware( } } +async fn peer_connection_healthcheck( + cfg: &AppConfig, + signed_message: &ProxySign, + req_uri: &Uri, + remote_addr: &SocketAddr, +) -> Result<(), StatusCode> { + // Once we know a peer is connected to the KDF network, we can assume they are connected + // for 10 seconds without asking again. + const KNOW_PEER_EXPIRATION: Duration = Duration::from_secs(10); + static KNOWN_PEERS: LazyLock>> = + LazyLock::new(|| Mutex::new(ExpirableMap::new())); + + let mut know_peers = KNOWN_PEERS.lock().await; + + know_peers.clear_expired_entries(); + let is_known = know_peers.get(&signed_message.address).is_some(); + + if !is_known { + match peer_connection_healthcheck_rpc(cfg, &signed_message.address).await { + Ok(response) => { + if response["result"] == serde_json::json!(true) { + know_peers.insert(signed_message.address.clone(), (), KNOW_PEER_EXPIRATION); + } else { + tracked_log( + log::Level::Warn, + remote_addr.ip(), + &signed_message.address, + req_uri, + "Peer isn't connected to KDF network, returning 401", + ); + + return Err(StatusCode::UNAUTHORIZED); + } + } + Err(error) => { + tracked_log( + log::Level::Error, + remote_addr.ip(), + &signed_message.address, + req_uri, + format!( + "`peer_connection_healthcheck` RPC failed, returning 500. Error: {}", + error + ), + ); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + } + } + + Ok(()) +} + #[cfg(test)] mod tests { use hyper::{header, Body, Request, StatusCode}; From 46259139821ed2f66dfece59d2c73dcdb21d0c4e Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Mon, 9 Sep 2024 12:05:19 +0300 Subject: [PATCH 09/17] update execution flow docs Signed-off-by: onur-ozkan --- README.md | 41 ++++++++++++++--------------------------- 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index ecfda32..2a5a462 100644 --- a/README.md +++ b/README.md @@ -16,12 +16,14 @@ Create the configuration file for app runtime. "pubkey_path": "/path_to_publick_key.pem", "privkey_path": "/path_to_private_key.pem", "redis_connection_string": "redis://localhost", + "kdf_rpc_client": "http://127.0.0.1:7783", + "kdf_rpc_password": "testpass", "token_expiration_time": 300, "proxy_routes": [ { "inbound_route": "/dev", "outbound_route": "http://localhost:8000", - "proxy_type": "quicknode", + "proxy_type": "quicknode", # available types are: "quicknode", "moralis", "block_pi" "authorized": false, "allowed_rpc_methods": [ "eth_blockNumber", @@ -42,15 +44,12 @@ Create the configuration file for app runtime. Expose configuration file's path as an environment variable in `AUTH_APP_CONFIG_PATH`. -***Important Note:*** The environment where the application will be deployed, the timezone MUST be as UTC. Also, make sure redis is version `6.*` +***Important Note:*** The environment where the application will be deployed, the timezone MUST be as UTC. Also, make sure redis is version `7.*` ### Architecture (TODO: OUTDATED) ![arch2](https://github.com/KomodoPlatform/komodo-defi-proxy/assets/39852038/be7fe7ae-2f2a-4f68-afa8-ce4938c570a7) - -**Execution flow (TODO: OUTDATED):** - 1) Client sends the request. 2) Redirect either to websocket or http handler. @@ -58,29 +57,17 @@ Expose configuration file's path as an environment variable in `AUTH_APP_CONFIG_ 3) If the incoming request comes from the same network, step 4 will be by-passed. 4) Request Handling in the Middleware: + - **Status Checker**: + - **Blocked**: Return `403 Forbidden`. + - **Allowed**: Process continues with the rate limiter. + - **Trusted**: Bypass rate limiter and proof of funding. - **For Quicknode:** - - **Status Checker**: - - **Blocked**: Return `403 Forbidden` immediately. - - **Allowed**: Process continues with the rate limiter. - - **Trusted**: Bypass rate limiter and proof of funding. - - - **Rate Limiter**: - - First, verify the signed message. If not valid, return `401 Unauthorized` immediately. - - If valid, calculate the request count with the time interval specified in the application configuration. If the wallet address has sent too many requests than the expected amount, process continues with the proof of funding. If not, bypass the proof of funding. - - - **Proof of Funding**: - - Return `406 Not Acceptable` if the wallet has a 0 balance. Otherwise, assume the request is valid and process it as usual. - - **For Moralis:** - - **Status Checker**: - - **Blocked**: Return `403 Forbidden` immediately. - - **Allowed**: Process continues with the rate limiter. - - **Trusted**: Bypass the rate limiter. + - **Peer Status Checker**: + - The requesting peer must be active in the KDF network. Validate this by executing the `peer_connection_healthcheck` KDF RPC. If the peer is not connected to the network, return `401 Unauthorized`. - - **Rate Limiter**: - - First, verify the signed message. If not valid, return `401 Unauthorized` immediately. - - If valid, calculate the request count with the time interval specified in the application configuration. If the wallet address has sent too many requests, return an error `406 Not Acceptable` indicating that the wallet address must wait for some time before making more requests. + - **Rate Limiter**: + - First, verify the signed message. If not valid, return `401 Unauthorized`. + - If valid, calculate the request count with the time interval specified in the application configuration. If the wallet address has sent too many requests than the expected amount, process continues with the proof of funding. If not, bypass the proof of funding. 5) Find target route by requested endpoint. @@ -102,7 +89,7 @@ curl -v --url "'$mm2_address'" -s --data '{ "params": { "ticker": "ETH", "nodes": [ - {"url": "'$atomicdex_gui_auth_address'", "gui_auth": true } + {"url": "'$atomicdex_gui_auth_address'", "komodo_proxy": true } ], "swap_contract_address": "0x24ABE4c71FC658C91313b6552cd40cD808b3Ea80", "erc20_tokens_requests": [ From 29457b898e2318e53bb36000802d9610cad2e6bb Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Mon, 9 Sep 2024 14:13:28 +0300 Subject: [PATCH 10/17] update drawio document file Signed-off-by: onur-ozkan --- docs/arch.drawio | 111 ++++++++++++++++++++++++++++++----------------- 1 file changed, 70 insertions(+), 41 deletions(-) diff --git a/docs/arch.drawio b/docs/arch.drawio index 0a2e96e..e40c5df 100644 --- a/docs/arch.drawio +++ b/docs/arch.drawio @@ -1,126 +1,155 @@ - + - + - + - + - + - + - + - + - + - - + + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + + + + + + + - + - - + + - + - - + + + + + + + + - + + + + + + + + + + + + + - + - - + - - + + - + + + + + + + From 0d3f340430cc934bfebf26624703db60aa247196 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Onur=20=C3=96zkan?= Date: Mon, 9 Sep 2024 14:14:47 +0300 Subject: [PATCH 11/17] Update README.md --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 2a5a462..ea7032f 100644 --- a/README.md +++ b/README.md @@ -46,9 +46,9 @@ Expose configuration file's path as an environment variable in `AUTH_APP_CONFIG_ ***Important Note:*** The environment where the application will be deployed, the timezone MUST be as UTC. Also, make sure redis is version `7.*` -### Architecture (TODO: OUTDATED) +### Architecture -![arch2](https://github.com/KomodoPlatform/komodo-defi-proxy/assets/39852038/be7fe7ae-2f2a-4f68-afa8-ce4938c570a7) +![2024-09-09_14-09](https://github.com/user-attachments/assets/2775d73e-8003-4bfe-89e1-2c64da9e3004) 1) Client sends the request. @@ -127,4 +127,4 @@ If you want to test features locally, you can run Docker containers using Docker 4. **Stop the Containers**: ```sh docker compose down - ``` \ No newline at end of file + ``` From 1294ab27b868c9d3e3ebabf70a8e469652b78513 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Wed, 18 Sep 2024 11:24:30 +0300 Subject: [PATCH 12/17] keep `RpcSocketPayload` private Signed-off-by: onur-ozkan --- src/net/rpc.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/net/rpc.rs b/src/net/rpc.rs index d9f1d75..3194ba6 100644 --- a/src/net/rpc.rs +++ b/src/net/rpc.rs @@ -36,11 +36,11 @@ pub(crate) struct RpcPayload { /// for authentication and validation, facilitating secure and validated interactions with the Quicknode service. #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub(crate) struct RpcSocketPayload { - pub(crate) method: String, - pub(crate) params: serde_json::Value, - pub(crate) id: Id, - pub(crate) jsonrpc: String, - pub(crate) proxy_sign: ProxySign, + method: String, + params: serde_json::Value, + id: Id, + jsonrpc: String, + proxy_sign: ProxySign, } impl RpcSocketPayload { From dbc75fc9b526f6d8fd76bc630a007a99c9e84c8a Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Thu, 19 Sep 2024 13:13:57 +0300 Subject: [PATCH 13/17] update README Signed-off-by: onur-ozkan --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index ea7032f..3569ef8 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ Create the configuration file for app runtime. Expose configuration file's path as an environment variable in `AUTH_APP_CONFIG_PATH`. -***Important Note:*** The environment where the application will be deployed, the timezone MUST be as UTC. Also, make sure redis is version `7.*` +***Important Note:*** Make sure redis is version `7.*` ### Architecture From 9ba90c1690e6568ca59a134f99287fbae0954088 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Onur=20=C3=96zkan?= Date: Tue, 24 Sep 2024 08:27:14 +0300 Subject: [PATCH 14/17] Update README.md --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 3569ef8..e83c6c8 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,7 @@ +# Komodo Defi Proxy + +Decentralized P2P applications have some limitations by their nature and one of them is the use application/API keys. If an API key is used in the application, any user could retrieve it by simply debugging the app. Some of the blockchain services we use in [komodo-defi-framework](https://github.com/KomodoPlatform/komodo-defi-framework) are paid services and we want to prevent abuse, such as users copying the API key for personal use. To address this problem, we created this project, komodo-defi-proxy. It takes the request, handles the API key, forwards the request to the actual service, and returns the result without modifying the original request. This keeps our secret application keys secure and hidden from end users. + ### Dev Requirements Creating rsa key pairs From 4f4f4eb0d6f64c38b6a237351efed30df598c9bb Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Tue, 24 Sep 2024 10:01:09 +0300 Subject: [PATCH 15/17] extend configuration interface with `peer_healthcheck_caching_secs` Signed-off-by: onur-ozkan --- src/ctx.rs | 16 +++++++++++++++- src/proxy/http/mod.rs | 5 +++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/ctx.rs b/src/ctx.rs index 5eb24ef..2a48dd2 100644 --- a/src/ctx.rs +++ b/src/ctx.rs @@ -9,6 +9,11 @@ pub(crate) use super::*; const DEFAULT_TOKEN_EXPIRATION_TIME: i64 = 3600; pub(crate) const DEFAULT_PORT: u16 = 5000; + +const fn default_peer_caching_secs() -> u64 { + 10 +} + static CONFIG: OnceCell = OnceCell::new(); pub(crate) fn get_app_config() -> &'static AppConfig { @@ -58,6 +63,13 @@ pub(crate) struct AppConfig { pub(crate) proxy_routes: Vec, /// The default rate limiting rules for maintaining the frequency of incoming traffic for per client. pub(crate) rate_limiter: RateLimiter, + /// The number of seconds to cache a known peer. + /// + /// When a peer is identified as connected with `peer_connection_healthcheck` RPC, + /// this value determines how long to cache that peer as known-peer to avoid + /// sending repeated `peer_connection_healthcheck` requests for every proxy request. + #[serde(default = "default_peer_caching_secs")] + pub(crate) peer_healthcheck_caching_secs: u64, } /// Defines a routing rule for proxying requests from an inbound route to an outbound URL @@ -203,6 +215,7 @@ pub(crate) fn get_app_config_test_instance() -> AppConfig { rp_30_min: 555, rp_60_min: 555, }, + peer_healthcheck_caching_secs: 10, } } @@ -282,7 +295,8 @@ fn test_app_config_serialzation_and_deserialization() { "rp_15_min": 555, "rp_30_min": 555, "rp_60_min": 555 - } + }, + "peer_healthcheck_caching_secs": 10, }); let actual_config: AppConfig = serde_json::from_str(&json_config.to_string()).unwrap(); diff --git a/src/proxy/http/mod.rs b/src/proxy/http/mod.rs index ca4e17d..4dcdff6 100644 --- a/src/proxy/http/mod.rs +++ b/src/proxy/http/mod.rs @@ -104,7 +104,8 @@ async fn peer_connection_healthcheck( ) -> Result<(), StatusCode> { // Once we know a peer is connected to the KDF network, we can assume they are connected // for 10 seconds without asking again. - const KNOW_PEER_EXPIRATION: Duration = Duration::from_secs(10); + let know_peer_expiration = Duration::from_secs(cfg.peer_healthcheck_caching_secs); + static KNOWN_PEERS: LazyLock>> = LazyLock::new(|| Mutex::new(ExpirableMap::new())); @@ -117,7 +118,7 @@ async fn peer_connection_healthcheck( match peer_connection_healthcheck_rpc(cfg, &signed_message.address).await { Ok(response) => { if response["result"] == serde_json::json!(true) { - know_peers.insert(signed_message.address.clone(), (), KNOW_PEER_EXPIRATION); + know_peers.insert(signed_message.address.clone(), (), know_peer_expiration); } else { tracked_log( log::Level::Warn, From b7fa008a4b3f2c8524fe0c421207463e6e9e83b3 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Tue, 24 Sep 2024 10:03:17 +0300 Subject: [PATCH 16/17] update README Signed-off-by: onur-ozkan --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e83c6c8..7ef2697 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,8 @@ Create the configuration file for app runtime. "rp_15_min": 200, "rp_30_min": 350, "rp_60_min": 575 - } + }, + "peer_healthcheck_caching_secs": 10 } ``` From d8a75e660be3782e306b8f5d2adc69726fcd5b92 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Thu, 26 Sep 2024 11:33:04 +0300 Subject: [PATCH 17/17] sync the upstream changes Signed-off-by: onur-ozkan --- src/net/kdf_rpc_interface.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/net/kdf_rpc_interface.rs b/src/net/kdf_rpc_interface.rs index d229a50..fa09dcb 100644 --- a/src/net/kdf_rpc_interface.rs +++ b/src/net/kdf_rpc_interface.rs @@ -11,14 +11,14 @@ pub(crate) async fn version_rpc(cfg: &AppConfig) -> GenericResult GenericResult { let payload = serde_json::json!({ "userpass": cfg.kdf_rpc_password, "method": "peer_connection_healthcheck", "mmrpc": "2.0", "params": { - "peer_id": peer_id + "peer_address": peer_address } });