diff --git a/Cargo.lock b/Cargo.lock index e2c648015..01b5d84f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2509,7 +2509,7 @@ dependencies = [ "reth-rpc-types", "rlp", "rlp-derive", - "rstest 0.16.0", + "rstest", "ruint", "serde", "serde-this-or-that", @@ -2604,9 +2604,9 @@ dependencies = [ [[package]] name = "fallible-iterator" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" [[package]] name = "fallible-streaming-iterator" @@ -3037,22 +3037,13 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap 2.1.0", + "indexmap 2.2.3", "slab", "tokio", "tokio-util 0.7.10", "tracing", ] -[[package]] -name = "hashbrown" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" -dependencies = [ - "ahash 0.7.7", -] - [[package]] name = "hashbrown" version = "0.12.3" @@ -3084,18 +3075,18 @@ dependencies = [ [[package]] name = "hashlink" -version = "0.7.0" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf" +checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" dependencies = [ - "hashbrown 0.11.2", + "hashbrown 0.14.3", ] [[package]] name = "hashlink" -version = "0.8.4" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" +checksum = "692eaaf7f7607518dd3cef090f1474b61edc5301d8012f09579920df68b725ee" dependencies = [ "hashbrown 0.14.3", ] @@ -3455,9 +3446,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.1.0" +version = "2.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" +checksum = "233cf39063f058ea2caae4091bf4a3ef70a653afbc026f5c4a4135d114e3c177" dependencies = [ "equivalent", "hashbrown 0.14.3", @@ -3904,9 +3895,9 @@ dependencies = [ [[package]] name = "libsqlite3-sys" -version = "0.23.2" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2cafc7c74096c336d9d27145f7ebd4f4b6f95ba16aa5a282387267e6925cb58" +checksum = "0c10584274047cb335c23d3e61bcef8e323adae7c5c8c760540f73610177fc3f" dependencies = [ "cc", "pkg-config", @@ -4614,7 +4605,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" dependencies = [ "fixedbitset", - "indexmap 2.1.0", + "indexmap 2.2.3", ] [[package]] @@ -4817,7 +4808,7 @@ dependencies = [ "rand 0.8.5", "rlp", "roxmltree", - "rstest 0.18.2", + "rstest", "serde", "serde_json", "serde_yaml", @@ -4861,7 +4852,7 @@ dependencies = [ "quickcheck", "rand 0.8.5", "rlp", - "rstest 0.18.2", + "rstest", "serde", "serde_json", "serial_test", @@ -5096,12 +5087,13 @@ dependencies = [ [[package]] name = "r2d2_sqlite" -version = "0.19.0" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54ca3c9468a76fc2ad724c486a59682fc362efeac7b18d1c012958bc19f34800" +checksum = "6a982edf65c129796dba72f8775b292ef482b40d035e827a9825b3bc07ccc5f2" dependencies = [ "r2d2", "rusqlite", + "uuid 1.6.1", ] [[package]] @@ -5697,18 +5689,6 @@ dependencies = [ "url", ] -[[package]] -name = "rstest" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b07f2d176c472198ec1e6551dc7da28f1c089652f66a7b722676c2238ebc0edf" -dependencies = [ - "futures 0.3.30", - "futures-timer", - "rstest_macros 0.16.0", - "rustc_version 0.4.0", -] - [[package]] name = "rstest" version = "0.18.2" @@ -5717,22 +5697,8 @@ checksum = "97eeab2f3c0a199bc4be135c36c924b6590b88c377d416494288c14f2db30199" dependencies = [ "futures 0.3.30", "futures-timer", - "rstest_macros 0.18.2", - "rustc_version 0.4.0", -] - -[[package]] -name = "rstest_macros" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7229b505ae0706e64f37ffc54a9c163e11022a6636d58fe1f3f52018257ff9f7" -dependencies = [ - "cfg-if 1.0.0", - "proc-macro2", - "quote", + "rstest_macros", "rustc_version 0.4.0", - "syn 1.0.109", - "unicode-ident", ] [[package]] @@ -5784,16 +5750,15 @@ checksum = "e666a5496a0b2186dbcd0ff6106e29e093c15591bde62c20d3842007c6978a09" [[package]] name = "rusqlite" -version = "0.26.3" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ba4d3462c8b2e4d7f4fcfcf2b296dc6b65404fbbc7b63daa37fd485c149daf7" +checksum = "b838eba278d213a8beaf485bd313fd580ca4505a00d5871caeb1457c55322cae" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.4.1", "fallible-iterator", "fallible-streaming-iterator", - "hashlink 0.7.0", + "hashlink 0.9.0", "libsqlite3-sys", - "memchr", "smallvec 1.12.0", ] @@ -6215,7 +6180,7 @@ version = "1.0.111" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "176e46fa42316f18edd598015a5166857fc835ec732f5215eac6b7bdbf0a84f4" dependencies = [ - "indexmap 2.1.0", + "indexmap 2.2.3", "itoa", "ryu", "serde", @@ -6263,7 +6228,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.1.0", + "indexmap 2.2.3", "serde", "serde_json", "serde_with_macros", @@ -6284,11 +6249,11 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.9.30" +version = "0.9.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1bf28c79a99f70ee1f1d83d10c875d2e70618417fda01ad1785e027579d9d38" +checksum = "8fd075d994154d4a774f95b51fb96bdc2832b0ea48425c92546073816cda1f2f" dependencies = [ - "indexmap 2.1.0", + "indexmap 2.2.3", "itoa", "ryu", "serde", @@ -7305,7 +7270,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.1.0", + "indexmap 2.2.3", "toml_datetime", "winnow", ] @@ -7316,7 +7281,7 @@ version = "0.20.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70f427fce4d84c72b5b732388bf4a9f4531b53f74e2887e3ecb2481f68f66d81" dependencies = [ - "indexmap 2.1.0", + "indexmap 2.2.3", "toml_datetime", "winnow", ] @@ -7327,7 +7292,7 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d34d383cd00a163b4a5b85053df514d45bc330f6de7737edfe0a93311d1eaa03" dependencies = [ - "indexmap 2.1.0", + "indexmap 2.2.3", "serde", "serde_spanned", "toml_datetime", @@ -7608,7 +7573,7 @@ dependencies = [ "r2d2", "r2d2_sqlite", "rand 0.8.5", - "rstest 0.18.2", + "rstest", "serde_json", "serial_test", "ssz_types", @@ -7646,20 +7611,20 @@ dependencies = [ "env_logger 0.9.3", "ethereum-types", "ethportal-api", + "keccak-hash", "parking_lot 0.11.2", "portalnet", - "r2d2", - "r2d2_sqlite", + "serde", "serde_json", + "serde_yaml", "serial_test", "test-log", "tokio", "tracing", "tracing-subscriber", - "trin-metrics", "trin-storage", + "trin-utils", "trin-validation", - "utp-rs", ] [[package]] @@ -7672,6 +7637,8 @@ dependencies = [ "ethportal-api", "r2d2", "r2d2_sqlite", + "rand 0.8.5", + "rstest", "rusqlite", "strum 0.26.1", "tempfile", @@ -7704,7 +7671,7 @@ dependencies = [ "quickcheck", "quickcheck_macros", "rlp", - "rstest 0.18.2", + "rstest", "rust-embed", "serde", "serde_json", @@ -7950,6 +7917,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560" dependencies = [ "getrandom 0.2.12", + "rand 0.8.5", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 88fd6f026..0138ff037 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,7 +52,7 @@ utp-rs = { git = "https://github.com/ethereum/utp", tag = "v0.1.0-alpha.9" } ethers-core = { version = "2.0", default-features = false} ethers-providers = { version = "2.0", default-features = false, features = ["ipc"] } ethportal-peertest = { path = "ethportal-peertest" } -serde_yaml = "0.9.25" +serde_yaml = "0.9" serial_test = "0.5.1" ureq = { version = "2.5.0", features = ["json"] } diff --git a/ethportal-api/Cargo.toml b/ethportal-api/Cargo.toml index 29ff49aba..66bfda6ed 100644 --- a/ethportal-api/Cargo.toml +++ b/ethportal-api/Cargo.toml @@ -36,7 +36,7 @@ ruint = { version = "1.9.0", features = ["primitive-types"] } serde = { version = "1.0.150", features = ["derive"] } serde_json = "1.0.89" serde-this-or-that = "0.4.2" -serde_yaml = "0.9.17" +serde_yaml = "0.9" sha2 = "0.10.1" sha3 = "0.9.1" snap = "1.1.0" @@ -53,7 +53,7 @@ validator = { version = "0.13.0", features = ["derive"] } [dev-dependencies] env_logger = "0.9.0" quickcheck = "1.0.3" -rstest = "0.16.0" +rstest = "0.18.2" test-log = { version = "0.2.11", features = ["trace"] } tracing = "0.1.36" tracing-subscriber = "0.3.15" diff --git a/ethportal-api/src/types/content_key/overlay.rs b/ethportal-api/src/types/content_key/overlay.rs index 3b964a993..c3746bdf3 100644 --- a/ethportal-api/src/types/content_key/overlay.rs +++ b/ethportal-api/src/types/content_key/overlay.rs @@ -3,7 +3,7 @@ use crate::{ utils::bytes::{hex_decode, hex_encode, hex_encode_compact}, }; use quickcheck::{Arbitrary, Gen}; -use std::fmt; +use std::{fmt, ops::Deref}; /// Types whose values represent keys to lookup content items in an overlay network. /// Keys are serializable. @@ -29,7 +29,7 @@ pub trait OverlayContentKey: /// A content key type whose content id is the inner value. Allows for the construction /// of a content key with an arbitrary content ID. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct IdentityContentKey { value: [u8; 32], } @@ -39,6 +39,10 @@ impl IdentityContentKey { pub fn new(value: [u8; 32]) -> Self { Self { value } } + + pub fn random() -> Self { + Self::new(rand::random()) + } } impl Arbitrary for IdentityContentKey { @@ -88,6 +92,14 @@ impl Into> for IdentityContentKey { } } +impl Deref for IdentityContentKey { + type Target = [u8; 32]; + + fn deref(&self) -> &Self::Target { + &self.value + } +} + impl OverlayContentKey for IdentityContentKey { fn content_id(&self) -> [u8; 32] { self.value diff --git a/ethportal-api/src/types/content_key/state.rs b/ethportal-api/src/types/content_key/state.rs index ed60db83d..2f1596f80 100644 --- a/ethportal-api/src/types/content_key/state.rs +++ b/ethportal-api/src/types/content_key/state.rs @@ -42,20 +42,20 @@ pub struct AccountTrieNodeKey { #[derive(Clone, Debug, Decode, Encode, Eq, PartialEq)] pub struct ContractStorageTrieNodeKey { /// Address of the account. - address: Address, + pub address: Address, /// Trie path of the node. - path: Nibbles, + pub path: Nibbles, /// Hash of the node. - node_hash: H256, + pub node_hash: H256, } /// A key for an account's contract bytecode. #[derive(Clone, Debug, Decode, Encode, Eq, PartialEq)] pub struct ContractBytecodeKey { /// Address of the account. - address: Address, + pub address: Address, /// Hash of the bytecode. - code_hash: H256, + pub code_hash: H256, } impl OverlayContentKey for StateContentKey { diff --git a/ethportal-peertest/Cargo.toml b/ethportal-peertest/Cargo.toml index c583553c0..56e26c6dd 100644 --- a/ethportal-peertest/Cargo.toml +++ b/ethportal-peertest/Cargo.toml @@ -24,7 +24,7 @@ rand = "0.8.4" reth-ipc = { tag = "v0.1.0-alpha.10", git = "https://github.com/paradigmxyz/reth.git"} rpc = { path = "../rpc" } serde_json = "1.0.89" -serde_yaml = "0.9.25" +serde_yaml = "0.9" tempfile = "3.3.0" tokio = {version = "1.14.0", features = ["full"]} tracing = "0.1.36" diff --git a/light-client/Cargo.toml b/light-client/Cargo.toml index e49a8e355..9c07fe149 100644 --- a/light-client/Cargo.toml +++ b/light-client/Cargo.toml @@ -26,7 +26,7 @@ portalnet = { path = "../portalnet" } reqwest = { version = "0.11.13", default-features = false, features = ["json", "rustls-tls"] } serde = { version = "1.0.143", features = ["derive"] } serde_json = "1.0.85" -serde_yaml = "0.9.14" +serde_yaml = "0.9" serde-this-or-that = "0.4.2" ssz-rs = { git = "https://github.com/ralexstokes/ssz-rs", rev = "d09f55b4f8554491e3431e01af1c32347a8781cd" } ssz_types = "0.5.4" diff --git a/trin-beacon/Cargo.toml b/trin-beacon/Cargo.toml index e66c5218b..a6838cc60 100644 --- a/trin-beacon/Cargo.toml +++ b/trin-beacon/Cargo.toml @@ -19,8 +19,8 @@ ethportal-api = { path = "../ethportal-api" } parking_lot = "0.11.2" portalnet = { path = "../portalnet" } r2d2 = "0.8.9" -r2d2_sqlite = "0.19.0" -rusqlite = { version = "0.26.3", features = ["bundled"] } +r2d2_sqlite = "0.24.0" +rusqlite = { version = "0.31.0", features = ["bundled"] } light-client = { path = "../light-client" } serde_json = "1.0.89" ssz_types = "0.5.4" diff --git a/trin-beacon/src/storage.rs b/trin-beacon/src/storage.rs index 105c8dde1..3438ea166 100644 --- a/trin-beacon/src/storage.rs +++ b/trin-beacon/src/storage.rs @@ -20,7 +20,7 @@ use ssz::{Decode, Encode}; use ssz_types::{typenum::U128, VariableList}; use std::path::PathBuf; use tracing::debug; -use trin_metrics::{portalnet::PORTALNET_METRICS, storage::StorageMetricsReporter}; +use trin_metrics::storage::StorageMetricsReporter; use trin_storage::{ error::ContentStoreError, sql::{ @@ -263,15 +263,11 @@ impl ContentStore for BeaconStorage { impl BeaconStorage { pub fn new(config: PortalStorageConfig) -> Result { - let metrics = StorageMetricsReporter { - storage_metrics: PORTALNET_METRICS.storage(), - protocol: ProtocolId::Beacon.to_string(), - }; let storage = Self { node_data_dir: config.node_data_dir, sql_connection_pool: config.sql_connection_pool, storage_capacity_in_bytes: config.storage_capacity_mb * BYTES_IN_MB_U64, - metrics, + metrics: StorageMetricsReporter::new(ProtocolId::Beacon), cache: BeaconStorageCache::new(), }; diff --git a/trin-history/Cargo.toml b/trin-history/Cargo.toml index 3ac264f52..b0f2015a9 100644 --- a/trin-history/Cargo.toml +++ b/trin-history/Cargo.toml @@ -20,7 +20,7 @@ ethportal-api = {path = "../ethportal-api"} parking_lot = "0.11.2" portalnet = { path = "../portalnet" } r2d2 = "0.8.9" -r2d2_sqlite = "0.19.0" +r2d2_sqlite = "0.24.0" serde_json = "1.0.89" tokio = { version = "1.14.0", features = ["full"] } tracing = "0.1.36" diff --git a/trin-history/src/storage.rs b/trin-history/src/storage.rs index 78066c078..a9a70d0db 100644 --- a/trin-history/src/storage.rs +++ b/trin-history/src/storage.rs @@ -9,7 +9,7 @@ use r2d2::Pool; use r2d2_sqlite::{rusqlite, SqliteConnectionManager}; use std::path::PathBuf; use tracing::debug; -use trin_metrics::{portalnet::PORTALNET_METRICS, storage::StorageMetricsReporter}; +use trin_metrics::storage::StorageMetricsReporter; use trin_storage::{ error::ContentStoreError, sql::{ @@ -89,11 +89,6 @@ impl HistoryStorage { config: PortalStorageConfig, protocol: ProtocolId, ) -> Result { - // Initialize the instance - let metrics = StorageMetricsReporter { - storage_metrics: PORTALNET_METRICS.storage(), - protocol: protocol.to_string(), - }; let mut storage = Self { node_id: config.node_id, node_data_dir: config.node_data_dir, @@ -101,7 +96,7 @@ impl HistoryStorage { radius: Distance::MAX, sql_connection_pool: config.sql_connection_pool, distance_fn: config.distance_fn, - metrics, + metrics: StorageMetricsReporter::new(protocol), storage_occupied_in_bytes: 0, }; @@ -493,12 +488,6 @@ pub mod test { const CAPACITY_MB: u64 = 2; - fn generate_random_content_key() -> IdentityContentKey { - let mut key = [0u8; 32]; - rand::thread_rng().fill_bytes(&mut key); - IdentityContentKey::new(key) - } - fn get_active_node_id(temp_dir: PathBuf) -> NodeId { let (_, mut pk) = configure_node_data_dir(temp_dir, None).unwrap(); let pk = CombinedKey::secp256k1_from_bytes(pk.0.as_mut_slice()).unwrap(); @@ -538,7 +527,7 @@ pub mod test { PortalStorageConfig::new(CAPACITY_MB, temp_dir.path().to_path_buf(), node_id) .unwrap(); let mut storage = HistoryStorage::new(storage_config, ProtocolId::History).unwrap(); - let content_key = generate_random_content_key(); + let content_key = IdentityContentKey::random(); let mut value = [0u8; 32]; rand::thread_rng().fill_bytes(&mut value); storage.store(&content_key, &value.to_vec()).unwrap(); @@ -583,7 +572,7 @@ pub mod test { PortalStorageConfig::new(CAPACITY_MB, temp_dir.path().to_path_buf(), node_id).unwrap(); let mut storage = HistoryStorage::new(storage_config, ProtocolId::History)?; - let content_key = generate_random_content_key(); + let content_key = IdentityContentKey::random(); let value: Vec = "OGFWs179fWnqmjvHQFGHszXloc3Wzdb4".into(); storage.store(&content_key, &value)?; @@ -606,7 +595,7 @@ pub mod test { let mut storage = HistoryStorage::new(storage_config, ProtocolId::History)?; for _ in 0..50 { - let content_key = generate_random_content_key(); + let content_key = IdentityContentKey::random(); let value: Vec = vec![0; 32000]; storage.store(&content_key, &value)?; assert_eq!( @@ -657,7 +646,7 @@ pub mod test { PortalStorageConfig::new(CAPACITY_MB, temp_dir.path().to_path_buf(), node_id).unwrap(); let mut storage = HistoryStorage::new(storage_config, ProtocolId::History)?; - let content_key = generate_random_content_key(); + let content_key = IdentityContentKey::random(); let value: Vec = vec![0; 32000]; storage.store(&content_key, &value)?; assert_eq!( @@ -686,7 +675,7 @@ pub mod test { let mut storage = HistoryStorage::new(storage_config, ProtocolId::History)?; for _ in 0..50 { - let content_key = generate_random_content_key(); + let content_key = IdentityContentKey::random(); let value: Vec = vec![0; 32000]; storage.store(&content_key, &value)?; assert_eq!( @@ -730,7 +719,7 @@ pub mod test { // Fill up the storage. for _ in 0..32 { - let content_key = generate_random_content_key(); + let content_key = IdentityContentKey::random(); let value: Vec = vec![0; 32000]; storage.store(&content_key, &value)?; assert_eq!( @@ -779,7 +768,7 @@ pub mod test { let mut storage = HistoryStorage::new(storage_config, ProtocolId::History)?; for _ in 0..50 { - let content_key = generate_random_content_key(); + let content_key = IdentityContentKey::random(); let value: Vec = vec![0; 32000]; storage.store(&content_key, &value)?; } @@ -821,7 +810,7 @@ pub mod test { PortalStorageConfig::new(0, temp_dir.path().to_path_buf(), node_id).unwrap(); let mut storage = HistoryStorage::new(storage_config, ProtocolId::History)?; - let content_key = generate_random_content_key(); + let content_key = IdentityContentKey::random(); let value: Vec = "OGFWs179fWnqmjvHQFGHszXloc3Wzdb4".into(); assert!(storage.store(&content_key, &value).is_err()); diff --git a/trin-metrics/src/storage.rs b/trin-metrics/src/storage.rs index d432348e3..2b6212027 100644 --- a/trin-metrics/src/storage.rs +++ b/trin-metrics/src/storage.rs @@ -1,4 +1,4 @@ -use ethportal_api::types::distance::Distance; +use ethportal_api::types::{distance::Distance, portal_wire::ProtocolId}; use prometheus_exporter::{ self, prometheus::{ @@ -8,7 +8,7 @@ use prometheus_exporter::{ }, }; -use crate::timer::DiscardOnDropHistogramTimer; +use crate::{portalnet::PORTALNET_METRICS, timer::DiscardOnDropHistogramTimer}; /// Contains metrics reporters for portalnet storage. #[derive(Clone, Debug)] @@ -88,6 +88,13 @@ pub struct StorageMetricsReporter { } impl StorageMetricsReporter { + pub fn new(protocol_id: ProtocolId) -> Self { + Self { + storage_metrics: PORTALNET_METRICS.storage(), + protocol: protocol_id.to_string(), + } + } + pub fn start_process_timer(&self, storage_function: &str) -> DiscardOnDropHistogramTimer { DiscardOnDropHistogramTimer::new( self.storage_metrics diff --git a/trin-state/Cargo.toml b/trin-state/Cargo.toml index b7606ecde..283a8e9a7 100644 --- a/trin-state/Cargo.toml +++ b/trin-state/Cargo.toml @@ -16,20 +16,20 @@ async-trait = "0.1.53" discv5 = { version = "0.4.0", features = ["serde"] } ethereum-types = "0.14.1" ethportal-api = { path = "../ethportal-api" } +keccak-hash = "0.10.0" parking_lot = "0.11.2" portalnet = { path = "../portalnet" } -r2d2 = "0.8.9" -r2d2_sqlite = "0.19.0" -tracing = "0.1.36" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0.89" tokio = {version = "1.14.0", features = ["full"]} +tracing = "0.1.36" trin-storage = { path = "../trin-storage" } -trin-metrics = { path = "../trin-metrics" } trin-validation = { path = "../trin-validation" } -utp-rs = { git = "https://github.com/ethereum/utp", tag = "v0.1.0-alpha.9" } -serde_json = "1.0.89" -serial_test = "0.5.1" [dev-dependencies] env_logger = "0.9.0" +serde_yaml = "0.9" +serial_test = "0.5.1" test-log = { version = "0.2.11", features = ["trace"] } tracing-subscriber = "0.3.15" +trin-utils = { path = "../trin-utils" } diff --git a/trin-state/src/storage.rs b/trin-state/src/storage.rs index 09439976b..387a6bd9e 100644 --- a/trin-state/src/storage.rs +++ b/trin-state/src/storage.rs @@ -1,119 +1,296 @@ -use discv5::enr::NodeId; use ethportal_api::{ - types::{distance::Distance, portal_wire::ProtocolId}, - OverlayContentKey, + types::{ + content_key::state::{AccountTrieNodeKey, ContractBytecodeKey, ContractStorageTrieNodeKey}, + content_value::state::{ContractBytecode, TrieNode}, + distance::Distance, + portal_wire::ProtocolId, + }, + ContentValue, OverlayContentKey, StateContentKey, StateContentValue, }; -use r2d2::Pool; -use r2d2_sqlite::SqliteConnectionManager; -use std::path::PathBuf; -use trin_metrics::{portalnet::PORTALNET_METRICS, storage::StorageMetricsReporter}; +use keccak_hash::keccak; use trin_storage::{ - error::ContentStoreError, ContentStore, DistanceFunction, PortalStorageConfig, - ShouldWeStoreContent, BYTES_IN_MB_U64, + error::ContentStoreError, + versioned::{create_store, ContentType, IdIndexedV1Store, IdIndexedV1StoreConfig}, + ContentId, ContentStore, PortalStorageConfig, ShouldWeStoreContent, BYTES_IN_MB_U64, }; /// Storage layer for the state network. Encapsulates state network specific data and logic. -#[allow(dead_code)] // Remove this once we have implemented the state network. #[derive(Debug)] pub struct StateStorage { - node_id: NodeId, - node_data_dir: PathBuf, - storage_capacity_in_bytes: u64, - radius: Distance, - sql_connection_pool: Pool, - distance_fn: DistanceFunction, - metrics: StorageMetricsReporter, - network: ProtocolId, + store: IdIndexedV1Store, } impl ContentStore for StateStorage { - fn get(&self, _key: &K) -> Result>, ContentStoreError> { - unimplemented!() + fn get(&self, key: &K) -> Result>, ContentStoreError> { + self.store.lookup_content_value(&key.content_id().into()) } fn put>( &mut self, - _key: K, - _value: V, + key: K, + value: V, ) -> Result<(), ContentStoreError> { - unimplemented!() + let key = StateContentKey::try_from(key.to_bytes())?; + let value = StateContentValue::decode(value.as_ref())?; + + match &key { + StateContentKey::AccountTrieNode(account_trie_node_key) => { + self.put_account_trie_node(&key, account_trie_node_key, value) + } + StateContentKey::ContractStorageTrieNode(contract_storage_trie_key) => { + self.put_contract_storage_trie_node(&key, contract_storage_trie_key, value) + } + StateContentKey::ContractBytecode(contract_bytecode_key) => { + self.put_contract_bytecode(&key, contract_bytecode_key, value) + } + } } fn is_key_within_radius_and_unavailable( &self, - _key: &K, + key: &K, ) -> Result { - unimplemented!() + let content_id = ContentId::from(key.content_id()); + if self.store.distance_to_content_id(&content_id) > self.store.radius() { + Ok(ShouldWeStoreContent::NotWithinRadius) + } else if self.store.has_content(&content_id)? { + Ok(ShouldWeStoreContent::AlreadyStored) + } else { + Ok(ShouldWeStoreContent::Store) + } } fn radius(&self) -> Distance { - self.radius + IdIndexedV1Store::radius(&self.store) } } impl StateStorage { pub fn new(config: PortalStorageConfig) -> Result { - let metrics = StorageMetricsReporter { - storage_metrics: PORTALNET_METRICS.storage(), - protocol: ProtocolId::State.to_string(), - }; - - Ok(Self { + let sql_connection_pool = config.sql_connection_pool.clone(); + let config = IdIndexedV1StoreConfig { + content_type: ContentType::State, + network: ProtocolId::State, node_id: config.node_id, node_data_dir: config.node_data_dir, - storage_capacity_in_bytes: config.storage_capacity_mb * BYTES_IN_MB_U64, - radius: Distance::MAX, - sql_connection_pool: config.sql_connection_pool, distance_fn: config.distance_fn, - metrics, - network: ProtocolId::State, + sql_connection_pool: config.sql_connection_pool, + storage_capacity_bytes: config.storage_capacity_mb * BYTES_IN_MB_U64, + }; + Ok(Self { + store: create_store(ContentType::State, config, sql_connection_pool)?, }) } /// Get a summary of the current state of storage pub fn get_summary_info(&self) -> String { - self.metrics.get_summary() + self.store.get_summary_info() + } + + fn put_account_trie_node( + &mut self, + content_key: &StateContentKey, + key: &AccountTrieNodeKey, + value: StateContentValue, + ) -> Result<(), ContentStoreError> { + let StateContentValue::AccountTrieNodeWithProof(value) = value else { + return Err(ContentStoreError::InvalidData { + message: format!( + "Expected AccountTrieNodeWithProof, but received {value:?} instead" + ), + }); + }; + let Some(last_trie_node) = value.proof.last() else { + return Err(ContentStoreError::InvalidData { + message: "Expected Trie nodes in the proof but none were present".to_string(), + }); + }; + let last_node_hash = keccak(&last_trie_node[..]); + + if last_node_hash != key.node_hash { + return Err(ContentStoreError::InvalidData { + message: format!( + "Hash of the trie node ({last_node_hash}) doesn't match key's node_hash ({})", + key.node_hash + ), + }); + } + + let trie_node = TrieNode { + node: last_trie_node.clone(), + }; + self.store + .insert(content_key, StateContentValue::TrieNode(trie_node).encode()) + } + + fn put_contract_storage_trie_node( + &mut self, + content_key: &StateContentKey, + key: &ContractStorageTrieNodeKey, + value: StateContentValue, + ) -> Result<(), ContentStoreError> { + let StateContentValue::ContractStorageTrieNodeWithProof(value) = value else { + return Err(ContentStoreError::InvalidData { + message: format!( + "Expected ContractStorageTrieNodeWithProof, but received {value:?} instead" + ), + }); + }; + let Some(last_trie_node) = value.storage_proof.last() else { + return Err(ContentStoreError::InvalidData { + message: "Expected Trie nodes in the proof but none were present".to_string(), + }); + }; + let last_node_hash = keccak(&last_trie_node[..]); + + if last_node_hash != key.node_hash { + return Err(ContentStoreError::InvalidData { + message: format!( + "Hash of the trie node ({last_node_hash}) doesn't match key's node_hash ({})", + key.node_hash + ), + }); + } + + let trie_node = TrieNode { + node: last_trie_node.clone(), + }; + self.store + .insert(content_key, StateContentValue::TrieNode(trie_node).encode()) + } + + fn put_contract_bytecode( + &mut self, + content_key: &StateContentKey, + key: &ContractBytecodeKey, + value: StateContentValue, + ) -> Result<(), ContentStoreError> { + let StateContentValue::ContractBytecodeWithProof(value) = value else { + return Err(ContentStoreError::InvalidData { + message: format!( + "Expected ContractBytecodeWithProof, but received {value:?} instead" + ), + }); + }; + let bytes_hash = keccak(&value.code[..]); + + if bytes_hash != key.code_hash { + return Err(ContentStoreError::InvalidData { + message: format!( + "Hash of the code ({bytes_hash}) doesn't match key's code_hash ({})", + key.code_hash + ), + }); + } + + let contract_code = ContractBytecode { code: value.code }; + + self.store.insert( + content_key, + StateContentValue::ContractBytecode(contract_code).encode(), + ) } } #[cfg(test)] #[allow(clippy::unwrap_used)] pub mod test { - use discv5::{enr::CombinedKey, Enr}; - use ethportal_api::types::distance::Distance; - use portalnet::utils::db::{configure_node_data_dir, setup_temp_dir}; - use serial_test::serial; + use std::path::PathBuf; + + use anyhow::{anyhow, Result}; + use ethportal_api::utils::bytes::hex_decode; + use serde::Deserialize; + use trin_storage::test_utils::create_test_portal_storage_config_with_capacity; + use trin_utils::submodules::read_portal_spec_tests_file; use super::*; - const CAPACITY_MB: u64 = 2; + const STORAGE_CAPACITY_MB: u64 = 10; + + #[test] + fn account_trie_node() -> Result<()> { + let (temp_dir, config) = + create_test_portal_storage_config_with_capacity(STORAGE_CAPACITY_MB)?; + let mut storage = StateStorage::new(config)?; + + let key_yaml = read_yaml_file("account_trie_node_key.yaml")?; + let key = StateContentKey::deserialize(&key_yaml["content_key"])?; - fn get_active_node_id(temp_dir: PathBuf) -> NodeId { - let (_, mut pk) = configure_node_data_dir(temp_dir, None).unwrap(); - let pk = CombinedKey::secp256k1_from_bytes(pk.0.as_mut_slice()).unwrap(); - Enr::empty(&pk).unwrap().node_id() + let value_yaml = read_yaml_file("account_trie_node_with_proof.yaml")?; + let value = StateContentValue::deserialize(&value_yaml["content_value"])?; + + let expected_stored_value = "0x04000000f8719d20a65bd257638cf8cf09b8238888947cc3c0bea2aa2cc3f1c4ac7a3002b851f84f018b02b4f32ee2f03d31ee3fbba046d5eb15d44b160805e80d05e2a47d434053e6c4b3ef9d1111773039e9586661a0d0a06b12ac47863b5c7be4185c2deaad1c61557033f56c7d4ea74429cbb25e23"; + + storage.put(key.clone(), value.encode())?; + + assert_eq!( + storage.get(&key).unwrap(), + Some(hex_decode(expected_stored_value)?) + ); + + drop(temp_dir); + Ok(()) } - #[test_log::test(tokio::test)] - #[serial] - async fn test_new() -> Result<(), ContentStoreError> { - let temp_dir = setup_temp_dir().unwrap(); - let node_id = get_active_node_id(temp_dir.path().to_path_buf()); + #[test] + fn contract_storage_trie_node() -> Result<()> { + let (temp_dir, config) = + create_test_portal_storage_config_with_capacity(STORAGE_CAPACITY_MB)?; + let mut storage = StateStorage::new(config)?; + + let key_yaml = read_yaml_file("contract_storage_trie_node_key.yaml")?; + let key = StateContentKey::deserialize(&key_yaml["content_key"])?; - let storage_config = - PortalStorageConfig::new(CAPACITY_MB, temp_dir.path().to_path_buf(), node_id).unwrap(); - let storage = StateStorage::new(storage_config)?; + let value_yaml = read_yaml_file("contract_storage_trie_node_with_proof.yaml")?; + let value = StateContentValue::deserialize(&value_yaml["content_value"])?; + + let expected_stored_value = + "0x04000000e09e20fa12a823e0f2b7631cc41b3ba8828b3321ca811111fa75cd3aa3bb5ace12"; + + storage.put(key.clone(), value.encode())?; - // Assert that configs match the storage object's fields - assert_eq!(storage.node_id, node_id); assert_eq!( - storage.storage_capacity_in_bytes, - CAPACITY_MB * BYTES_IN_MB_U64 + storage.get(&key).unwrap(), + Some(hex_decode(expected_stored_value)?) ); - assert_eq!(storage.radius, Distance::MAX); - std::mem::drop(storage); - temp_dir.close()?; + drop(temp_dir); Ok(()) } + + #[test] + fn contract_bytecode() -> Result<()> { + let (temp_dir, config) = + create_test_portal_storage_config_with_capacity(STORAGE_CAPACITY_MB)?; + let mut storage = StateStorage::new(config)?; + + let key_yaml = read_yaml_file("contract_bytecode_key.yaml")?; + let key = StateContentKey::deserialize(&key_yaml["content_key"])?; + + let value_yaml = read_yaml_file("contract_bytecode_with_proof.yaml")?; + let value = StateContentValue::deserialize(&value_yaml["content_value"])?; + + let expected_stored_value = "0x040000006060604052600436106100af576000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff16806306fdde03146100b9578063095ea7b31461014757806318160ddd146101a157806323b872dd146101ca5780632e1a7d4d14610243578063313ce5671461026657806370a082311461029557806395d89b41146102e2578063a9059cbb14610370578063d0e30db0146103ca578063dd62ed3e146103d4575b6100b7610440565b005b34156100c457600080fd5b6100cc6104dd565b6040518080602001828103825283818151815260200191508051906020019080838360005b8381101561010c5780820151818401526020810190506100f1565b50505050905090810190601f1680156101395780820380516001836020036101000a031916815260200191505b509250505060405180910390f35b341561015257600080fd5b610187600480803573ffffffffffffffffffffffffffffffffffffffff1690602001909190803590602001909190505061057b565b604051808215151515815260200191505060405180910390f35b34156101ac57600080fd5b6101b461066d565b6040518082815260200191505060405180910390f35b34156101d557600080fd5b610229600480803573ffffffffffffffffffffffffffffffffffffffff1690602001909190803573ffffffffffffffffffffffffffffffffffffffff1690602001909190803590602001909190505061068c565b604051808215151515815260200191505060405180910390f35b341561024e57600080fd5b61026460048080359060200190919050506109d9565b005b341561027157600080fd5b610279610b05565b604051808260ff1660ff16815260200191505060405180910390f35b34156102a057600080fd5b6102cc600480803573ffffffffffffffffffffffffffffffffffffffff16906020019091905050610b18565b6040518082815260200191505060405180910390f35b34156102ed57600080fd5b6102f5610b30565b6040518080602001828103825283818151815260200191508051906020019080838360005b8381101561033557808201518184015260208101905061031a565b50505050905090810190601f1680156103625780820380516001836020036101000a031916815260200191505b509250505060405180910390f35b341561037b57600080fd5b6103b0600480803573ffffffffffffffffffffffffffffffffffffffff16906020019091908035906020019091905050610bce565b604051808215151515815260200191505060405180910390f35b6103d2610440565b005b34156103df57600080fd5b61042a600480803573ffffffffffffffffffffffffffffffffffffffff1690602001909190803573ffffffffffffffffffffffffffffffffffffffff16906020019091905050610be3565b6040518082815260200191505060405180910390f35b34600360003373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff168152602001908152602001600020600082825401925050819055503373ffffffffffffffffffffffffffffffffffffffff167fe1fffcc4923d04b559f4d29a8bfc6cda04eb5b0d3c460751c2402c5c5cc9109c346040518082815260200191505060405180910390a2565b60008054600181600116156101000203166002900480601f0160208091040260200160405190810160405280929190818152602001828054600181600116156101000203166002900480156105735780601f1061054857610100808354040283529160200191610573565b820191906000526020600020905b81548152906001019060200180831161055657829003601f168201915b505050505081565b600081600460003373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff16815260200190815260200160002060008573ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff168152602001908152602001600020819055508273ffffffffffffffffffffffffffffffffffffffff163373ffffffffffffffffffffffffffffffffffffffff167f8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925846040518082815260200191505060405180910390a36001905092915050565b60003073ffffffffffffffffffffffffffffffffffffffff1631905090565b600081600360008673ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff16815260200190815260200160002054101515156106dc57600080fd5b3373ffffffffffffffffffffffffffffffffffffffff168473ffffffffffffffffffffffffffffffffffffffff16141580156107b457507fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff600460008673ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff16815260200190815260200160002060003373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff1681526020019081526020016000205414155b156108cf5781600460008673ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff16815260200190815260200160002060003373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff168152602001908152602001600020541015151561084457600080fd5b81600460008673ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff16815260200190815260200160002060003373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff168152602001908152602001600020600082825403925050819055505b81600360008673ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff1681526020019081526020016000206000828254039250508190555081600360008573ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff168152602001908152602001600020600082825401925050819055508273ffffffffffffffffffffffffffffffffffffffff168473ffffffffffffffffffffffffffffffffffffffff167fddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef846040518082815260200191505060405180910390a3600190509392505050565b80600360003373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff1681526020019081526020016000205410151515610a2757600080fd5b80600360003373ffffffffffffffffffffffffffffffffffffffff1673ffffffffffffffffffffffffffffffffffffffff168152602001908152602001600020600082825403925050819055503373ffffffffffffffffffffffffffffffffffffffff166108fc829081150290604051600060405180830381858888f193505050501515610ab457600080fd5b3373ffffffffffffffffffffffffffffffffffffffff167f7fcf532c15f0a6db0bd6d0e038bea71d30d808c7d98cb3bf7268a95bf5081b65826040518082815260200191505060405180910390a250565b600260009054906101000a900460ff1681565b60036020528060005260406000206000915090505481565b60018054600181600116156101000203166002900480601f016020809104026020016040519081016040528092919081815260200182805460018160011615610100020316600290048015610bc65780601f10610b9b57610100808354040283529160200191610bc6565b820191906000526020600020905b815481529060010190602001808311610ba957829003601f168201915b505050505081565b6000610bdb33848461068c565b905092915050565b60046020528160005260406000206020528060005260406000206000915091505054815600a165627a7a72305820deb4c2ccab3c2fdca32ab3f46728389c2fe2c165d5fafa07661e4e004f6c344a0029"; + + storage.put(key.clone(), value.encode())?; + + assert_eq!( + storage.get(&key).unwrap(), + Some(hex_decode(expected_stored_value)?) + ); + + drop(temp_dir); + Ok(()) + } + + fn read_yaml_file(filename: &str) -> Result { + let file = read_portal_spec_tests_file( + PathBuf::from("tests/mainnet/state/serialization").join(filename), + )?; + let value: serde_yaml::Value = serde_yaml::from_str(&file)?; + value + .as_mapping() + .map(Clone::clone) + .ok_or(anyhow!("Expected mapping at the root of a file")) + } } diff --git a/trin-storage/Cargo.toml b/trin-storage/Cargo.toml index 1b088d6f3..2cc43ae78 100644 --- a/trin-storage/Cargo.toml +++ b/trin-storage/Cargo.toml @@ -16,10 +16,14 @@ discv5 = { version = "0.4.0", features = ["serde"] } ethereum-types = "0.14.1" ethportal-api = {path = "../ethportal-api"} r2d2 = "0.8.9" -r2d2_sqlite = "0.19.0" -rusqlite = { version = "0.26.3", features = ["bundled"] } +r2d2_sqlite = "0.24.0" +rand = "0.8.4" +rusqlite = { version = "0.31.0", features = ["bundled"] } strum = { version = "0.26.1", features = ["derive"] } tempfile = "3.3.0" thiserror = "1.0.29" tracing = "0.1.36" trin-metrics = { path = "../trin-metrics" } + +[dev-dependencies] +rstest = "0.18.2" diff --git a/trin-storage/src/error.rs b/trin-storage/src/error.rs index dd7ace7b2..5a32c8abe 100644 --- a/trin-storage/src/error.rs +++ b/trin-storage/src/error.rs @@ -1,6 +1,7 @@ use ethportal_api::{ types::{content_key::error::ContentKeyError, distance::Distance}, utils::bytes::ByteUtilsError, + ContentValueError, }; use thiserror::Error; @@ -38,6 +39,9 @@ pub enum ContentStoreError { #[error("unable to use content key {0}")] ContentKey(#[from] ContentKeyError), + #[error("unable to use content value {0}")] + ContentValue(#[from] ContentValueError), + #[error("Invalid store version '{version}' for table '{content_type}'")] InvalidStoreVersion { content_type: ContentType, diff --git a/trin-storage/src/lib.rs b/trin-storage/src/lib.rs index f1ee87c86..8a5ce4603 100644 --- a/trin-storage/src/lib.rs +++ b/trin-storage/src/lib.rs @@ -1,5 +1,6 @@ pub mod error; pub mod sql; +pub mod test_utils; pub mod utils; pub mod versioned; @@ -159,8 +160,8 @@ impl PortalStorageConfig { storage_capacity_mb: u64, node_data_dir: PathBuf, node_id: NodeId, - ) -> anyhow::Result { - let sql_connection_pool: Pool = setup_sql(&node_data_dir)?; + ) -> Result { + let sql_connection_pool = setup_sql(&node_data_dir)?; Ok(Self { storage_capacity_mb, node_id, diff --git a/trin-storage/src/test_utils.rs b/trin-storage/src/test_utils.rs new file mode 100644 index 000000000..6ecf7ff41 --- /dev/null +++ b/trin-storage/src/test_utils.rs @@ -0,0 +1,18 @@ +use discv5::enr::NodeId; +use tempfile::TempDir; + +use crate::{error::ContentStoreError, PortalStorageConfig}; + +/// Creates temporary directory and PortalStorageConfig. +pub fn create_test_portal_storage_config_with_capacity( + capacity_mb: u64, +) -> Result<(TempDir, PortalStorageConfig), ContentStoreError> { + let temp_dir = TempDir::new()?; + let config = + PortalStorageConfig::new(capacity_mb, temp_dir.path().to_path_buf(), NodeId::random())?; + Ok((temp_dir, config)) +} + +pub fn generate_random_bytes(length: usize) -> Vec { + (0..length).map(|_| rand::random::()).collect() +} diff --git a/trin-storage/src/versioned/id_indexed_v1/mod.rs b/trin-storage/src/versioned/id_indexed_v1/mod.rs new file mode 100644 index 000000000..230394ac1 --- /dev/null +++ b/trin-storage/src/versioned/id_indexed_v1/mod.rs @@ -0,0 +1,44 @@ +mod sql; +mod store; +mod usage_stats; + +use std::path::PathBuf; + +use discv5::enr::NodeId; +use ethportal_api::types::portal_wire::ProtocolId; +use r2d2::Pool; +use r2d2_sqlite::SqliteConnectionManager; + +pub use store::IdIndexedV1Store; + +use crate::DistanceFunction; + +use super::ContentType; + +/// The fraction of the storage capacity that we should aim for when pruning. +const TARGET_CAPACITY_FRACTION: f64 = 0.9; + +/// The fraction of the storage capacity that we need to pass in order to start pruning. +const PRUNING_CAPACITY_THRESHOLD_FRACTION: f64 = 0.95; + +/// The config for the IdIndexedV1Store +#[derive(Debug, Clone)] +pub struct IdIndexedV1StoreConfig { + pub content_type: ContentType, + pub network: ProtocolId, + pub node_id: NodeId, + pub node_data_dir: PathBuf, + pub storage_capacity_bytes: u64, + pub sql_connection_pool: Pool, + pub distance_fn: DistanceFunction, +} + +impl IdIndexedV1StoreConfig { + fn target_capacity(&self) -> u64 { + (self.storage_capacity_bytes as f64 * TARGET_CAPACITY_FRACTION).round() as u64 + } + + fn pruning_capacity_threshold(&self) -> u64 { + (self.storage_capacity_bytes as f64 * PRUNING_CAPACITY_THRESHOLD_FRACTION).round() as u64 + } +} diff --git a/trin-storage/src/versioned/id_indexed_v1/sql.rs b/trin-storage/src/versioned/id_indexed_v1/sql.rs new file mode 100644 index 000000000..e603b4c72 --- /dev/null +++ b/trin-storage/src/versioned/id_indexed_v1/sql.rs @@ -0,0 +1,89 @@ +use crate::versioned::ContentType; + +/// The name of the sql table. The `ii1` stands for `id_indexed_v1`. +fn table_name(content_type: &ContentType) -> String { + format!("ii1_{content_type}") +} + +pub fn create_table(content_type: &ContentType) -> String { + format!( + " + CREATE TABLE IF NOT EXISTS {0} ( + content_id BLOB PRIMARY KEY, + content_key BLOB NOT NULL, + content_value BLOB NOT NULL, + distance_short INTEGER NOT NULL, + content_size INTEGER NOT NULL + ); + CREATE INDEX IF NOT EXISTS {0}_distance_short_idx ON {0} (distance_short); + CREATE INDEX IF NOT EXISTS {0}_content_size_idx ON {0} (content_size); + ", + table_name(content_type) + ) +} + +pub fn insert(content_type: &ContentType) -> String { + format!( + " + INSERT OR IGNORE INTO {} ( + content_id, + content_key, + content_value, + distance_short, + content_size + ) + VALUES ( + :content_id, + :content_key, + :content_value, + :distance_short, + :content_size + )", + table_name(content_type) + ) +} + +pub fn delete(content_type: &ContentType) -> String { + format!( + "DELETE FROM {} WHERE content_id = :content_id", + table_name(content_type) + ) +} + +pub fn lookup_key(content_type: &ContentType) -> String { + format!( + "SELECT content_key FROM {} WHERE content_id = :content_id LIMIT 1", + table_name(content_type) + ) +} + +pub fn lookup_value(content_type: &ContentType) -> String { + format!( + "SELECT content_value FROM {} WHERE content_id = :content_id LIMIT 1", + table_name(content_type) + ) +} + +pub fn delete_farthest(content_type: &ContentType) -> String { + format!( + "DELETE FROM {0} WHERE rowid IN ( + SELECT rowid FROM {0} ORDER BY distance_short DESC LIMIT :limit)", + table_name(content_type) + ) +} + +pub fn lookup_farthest(content_type: &ContentType) -> String { + format!( + "SELECT content_id, distance_short FROM {} + ORDER BY distance_short DESC + LIMIT :limit", + table_name(content_type) + ) +} + +pub fn entry_count_and_size(content_type: &ContentType) -> String { + format!( + "SELECT COUNT(*) as count, TOTAL(content_size) as used_capacity FROM {}", + table_name(content_type) + ) +} diff --git a/trin-storage/src/versioned/id_indexed_v1/store.rs b/trin-storage/src/versioned/id_indexed_v1/store.rs new file mode 100644 index 000000000..31fb4309b --- /dev/null +++ b/trin-storage/src/versioned/id_indexed_v1/store.rs @@ -0,0 +1,824 @@ +use ethportal_api::types::distance::Distance; +use r2d2::Pool; +use r2d2_sqlite::SqliteConnectionManager; +use rusqlite::{named_params, types::Type, OptionalExtension}; +use tracing::{debug, error, warn}; +use trin_metrics::storage::StorageMetricsReporter; + +use super::{sql, usage_stats::UsageStats, IdIndexedV1StoreConfig}; +use crate::{ + error::ContentStoreError, + utils::get_total_size_of_directory_in_bytes, + versioned::{ContentType, StoreVersion, VersionedContentStore}, + ContentId, +}; + +struct FarthestQueryResult { + content_id: ContentId, + distance_u32: u32, +} + +/// The store for storing content key/value pairs. +/// +/// Different SQL table is created for each `ContentType`, with content-id as a primary key. +/// It has a configurable capacity and it will prune data that is farthest from the `NodeId` once +/// it's close to that capacity. +#[derive(Debug)] +pub struct IdIndexedV1Store { + /// The config. + config: IdIndexedV1StoreConfig, + /// Estimated number of new inserts required for pruning. + inserts_until_pruning: u64, + /// The maximum distance between `NodeId` and content id that store should keep. Updated + /// dynamically after pruning to the farthest distance still stored. + radius: Distance, + /// The Metrics for tracking performance. + metrics: StorageMetricsReporter, +} + +impl VersionedContentStore for IdIndexedV1Store { + type Config = IdIndexedV1StoreConfig; + + fn version() -> StoreVersion { + StoreVersion::IdIndexedV1 + } + + fn migrate_from( + _content_type: &ContentType, + old_version: StoreVersion, + _config: &Self::Config, + ) -> Result<(), ContentStoreError> { + Err(ContentStoreError::UnsupportedStoreMigration { + old_version, + new_version: Self::version(), + }) + } + + fn create(content_type: ContentType, config: Self::Config) -> Result { + maybe_create_table_and_indexes(&content_type, &config.sql_connection_pool)?; + + let protocol_id = config.network; + + let mut store = Self { + config, + inserts_until_pruning: 0, + radius: Distance::MAX, + metrics: StorageMetricsReporter::new(protocol_id), + }; + store.init()?; + Ok(store) + } +} + +impl IdIndexedV1Store { + /// Initializes variables and metrics, and runs necessary checks. + fn init(&mut self) -> Result<(), ContentStoreError> { + self.metrics + .report_storage_capacity_bytes(self.config.storage_capacity_bytes as f64); + + let usage_stats = self.get_usage_stats()?; + + if usage_stats.is_above_target_capacity(&self.config) { + debug!( + Db = %self.config.content_type, + "Used capacity ({}) is over target capacity ({}) -> Pruning", + usage_stats.used_storage_bytes, + self.config.target_capacity() + ); + self.prune(usage_stats)?; + } else { + debug!( + Db = %self.config.content_type, + "Used capacity ({}) is under target capacity ({}) -> Using MAX radius", + usage_stats.used_storage_bytes, + self.config.target_capacity() + ); + self.radius = Distance::MAX; + self.metrics.report_radius(self.radius); + self.update_inserts_until_pruning(&usage_stats); + } + + // Check that distance to the farthest content is what is stored. This is a simple check + // that the NodeId didn't change. + let farthest = self.lookup_farthest()?; + if let Some(farthest) = farthest { + let distance = self.distance_to_content_id(&farthest.content_id); + if farthest.distance_u32 != distance.big_endian_u32() { + return Err(ContentStoreError::Database(format!( + "Distance to the farthest (short: 0x{:08X}) didn't match expected distance ({distance})!", + farthest.distance_u32 + ))); + } + } + + Ok(()) + } + + // PUBLIC FUNCTIONS + + /// Returns radius that it will accept to store. + pub fn radius(&self) -> Distance { + self.radius + } + + /// Returns distance to the content id. + pub fn distance_to_content_id(&self, content_id: &ContentId) -> Distance { + self.config + .distance_fn + .distance(&self.config.node_id, content_id.as_fixed_bytes()) + } + + /// Returns whether data associated with the content id is already stored. + pub fn has_content(&self, content_id: &ContentId) -> Result { + let timer = self.metrics.start_process_timer("has_content"); + + let has_content = self + .config + .sql_connection_pool + .get()? + .prepare(&sql::lookup_key(&self.config.content_type))? + .exists(named_params! { + ":content_id": content_id.as_fixed_bytes().to_vec(), + })?; + + self.metrics.stop_process_timer(timer); + Ok(has_content) + } + + /// Returns content key data is stored. + pub fn lookup_content_key( + &self, + content_id: &ContentId, + ) -> Result, ContentStoreError> { + let timer = self.metrics.start_process_timer("lookup_content_key"); + + let key = self + .config + .sql_connection_pool + .get()? + .query_row( + &sql::lookup_key(&self.config.content_type), + named_params! { + ":content_id": content_id.as_fixed_bytes().to_vec(), + }, + |row| { + let bytes: Vec = row.get("content_key")?; + K::try_from(bytes).map_err(|e| { + rusqlite::Error::FromSqlConversionFailure(0, Type::Blob, e.into()) + }) + }, + ) + .optional()?; + + self.metrics.stop_process_timer(timer); + Ok(key) + } + + /// Returns content value data is stored. + pub fn lookup_content_value( + &self, + content_id: &ContentId, + ) -> Result>, ContentStoreError> { + let timer = self.metrics.start_process_timer("lookup_content_value"); + + let value = self + .config + .sql_connection_pool + .get()? + .query_row( + &sql::lookup_value(&self.config.content_type), + named_params! { + ":content_id": content_id.as_fixed_bytes().to_vec(), + }, + |row| row.get::<&str, Vec>("content_value"), + ) + .optional()?; + + self.metrics.stop_process_timer(timer); + Ok(value) + } + + /// Inserts content key/value pair into storage and prunes the db if necessary. It will return + /// `InsufficientRadius` error if content is outside radius. + pub fn insert( + &mut self, + content_key: &K, + content_value: Vec, + ) -> Result<(), ContentStoreError> { + let insert_with_pruning_timer = self.metrics.start_process_timer("insert_with_pruning"); + + let content_id = content_key.content_id(); + + let distance = self.distance_to_content_id(&content_id.into()); + if self.radius < distance { + return Err(ContentStoreError::InsufficientRadius { + radius: self.radius, + distance, + }); + } + + let content_id = content_id.to_vec(); + let content_key = content_key.to_bytes(); + let content_size = content_id.len() + content_key.len() + content_value.len(); + + let insert_timer = self.metrics.start_process_timer("insert"); + self.config.sql_connection_pool.get()?.execute( + &sql::insert(&self.config.content_type), + named_params! { + ":content_id": content_id, + ":content_key": content_key, + ":content_value": content_value, + ":distance_short": distance.big_endian_u32(), + ":content_size": content_size, + }, + )?; + self.metrics.stop_process_timer(insert_timer); + self.metrics.increase_entry_count(); + + if self.inserts_until_pruning > 1 { + self.inserts_until_pruning -= 1; + } else { + let usage_stats = self.get_usage_stats()?; + if usage_stats.is_above_pruning_capacity_threshold(&self.config) { + self.prune(usage_stats)? + } else { + self.update_inserts_until_pruning(&usage_stats); + } + } + + self.metrics.stop_process_timer(insert_with_pruning_timer); + Ok(()) + } + + /// Deletes content with the given content id. + pub fn delete(&mut self, content_id: &ContentId) -> Result<(), ContentStoreError> { + let timer = self.metrics.start_process_timer("delete"); + self.config.sql_connection_pool.get()?.execute( + &sql::delete(&self.config.content_type), + named_params! { + ":content_id": content_id.as_fixed_bytes().to_vec(), + }, + )?; + self.metrics.decrease_entry_count(); + + self.metrics.stop_process_timer(timer); + Ok(()) + } + + /// Updates metrics and returns summary. + pub fn get_summary_info(&self) -> String { + // Call `get_usage_stats` to update metrics. + if let Err(err) = self.get_usage_stats() { + warn!(Db = %self.config.content_type, "Error while getting summary info: {err}"); + } + self.metrics.get_summary() + } + + // INTERNAL FUNCTIONS + + /// Returns usage stats and updates relevant metrics. + fn get_usage_stats(&self) -> Result { + let timer = self.metrics.start_process_timer("get_usage_stats"); + + let conn = self.config.sql_connection_pool.get()?; + let usage_stats = conn.query_row( + &sql::entry_count_and_size(&self.config.content_type), + [], + |row| { + let used_capacity: f64 = row.get("used_capacity")?; + Ok(UsageStats { + content_count: row.get("count")?, + used_storage_bytes: used_capacity.round() as u64, + }) + }, + )?; + + self.metrics.report_entry_count(usage_stats.content_count); + self.metrics + .report_content_data_storage_bytes(usage_stats.used_storage_bytes as f64); + + // This reports size of the entire database. + self.metrics + .report_total_storage_usage_bytes(get_total_size_of_directory_in_bytes( + &self.config.node_data_dir, + )? as f64); + + self.metrics.stop_process_timer(timer); + Ok(usage_stats) + } + + /// Returns the farthest content in the table. + fn lookup_farthest(&self) -> Result, ContentStoreError> { + let timer = self.metrics.start_process_timer("lookup_farthest"); + let farthest = self + .config + .sql_connection_pool + .get()? + .query_row( + &sql::lookup_farthest(&self.config.content_type), + named_params! { + ":limit": 1, + }, + |row| { + Ok(FarthestQueryResult { + content_id: row.get("content_id")?, + distance_u32: row.get("distance_short")?, + }) + }, + ) + .optional()?; + + self.metrics.stop_process_timer(timer); + Ok(farthest) + } + + /// Prunes database, and updates radius and inserts_until_pruning. + fn prune(&mut self, usage_stats: UsageStats) -> Result<(), ContentStoreError> { + let timer = self.metrics.start_process_timer("prune"); + + if !usage_stats.is_above_target_capacity(&self.config) { + warn!(Db = %self.config.content_type, "Pruning requested but we are below target capacity. Skipping"); + return Ok(()); + } + + let conn = self.config.sql_connection_pool.get()?; + let mut delete_query = conn.prepare(&sql::delete_farthest(&self.config.content_type))?; + + let mut usage_stats = usage_stats; + while usage_stats.is_above_pruning_capacity_threshold(&self.config) { + let to_delete = usage_stats.delete_until_target(&self.config); + + if to_delete == 0 { + warn!(Db = %self.config.content_type, "Should delete 0. This is not expected to happen (we should be above pruning capacity)."); + return Ok(()); + } + + let deleted = delete_query.execute(named_params! { + ":limit": to_delete + })? as u64; + + if to_delete != deleted { + error!(Db = %self.config.content_type, "Attempted to delete {to_delete} but deleted {deleted}"); + break; + } + + usage_stats = self.get_usage_stats()?; + } + // Free connection. + drop(delete_query); + drop(conn); + + self.update_inserts_until_pruning(&usage_stats); + + // Update radius to the current farthest content + match self.lookup_farthest()? { + None => { + error!(Db = %self.config.content_type, "Farthest not found after pruning!"); + } + Some(farthest) => { + self.radius = self.distance_to_content_id(&farthest.content_id); + } + } + self.metrics.report_radius(self.radius); + + self.metrics.stop_process_timer(timer); + Ok(()) + } + + /// Updated `inserts_until_pruning` based on current usage stats. We aim to prune once we reach + /// full capacity, but in reality we will prune if we are above `pruning_capacity_threshold()`. + fn update_inserts_until_pruning(&mut self, usage_stats: &UsageStats) { + self.inserts_until_pruning = usage_stats.estimate_insert_until_full(&self.config); + } +} + +/// Creates table and indexes if they don't already exist. +fn maybe_create_table_and_indexes( + content_type: &ContentType, + pool: &Pool, +) -> Result<(), ContentStoreError> { + let conn = pool.get()?; + conn.execute_batch(&sql::create_table(content_type))?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use anyhow::Result; + use discv5::enr::NodeId; + use ethportal_api::{types::portal_wire::ProtocolId, IdentityContentKey, OverlayContentKey}; + use tempfile::TempDir; + + use crate::{test_utils::generate_random_bytes, utils::setup_sql, DistanceFunction}; + + use super::*; + + const STORAGE_CAPACITY_10KB_IN_BYTES: u64 = 10_000; + const CONTENT_SIZE_100_BYTES: u64 = STORAGE_CAPACITY_10KB_IN_BYTES / 100; + + fn create_config(temp_dir: &TempDir) -> IdIndexedV1StoreConfig { + IdIndexedV1StoreConfig { + content_type: ContentType::State, + network: ProtocolId::State, + node_id: NodeId::random(), + node_data_dir: temp_dir.path().to_path_buf(), + distance_fn: DistanceFunction::Xor, + sql_connection_pool: setup_sql(temp_dir.path()).unwrap(), + storage_capacity_bytes: STORAGE_CAPACITY_10KB_IN_BYTES, + } + } + + /// Creates content key/value pair that are exactly 1% of the Storage capacity. + fn generate_key_value( + config: &IdIndexedV1StoreConfig, + distance: u8, + ) -> (IdentityContentKey, Vec) { + generate_key_value_with_content_size(config, distance, STORAGE_CAPACITY_10KB_IN_BYTES / 100) + } + + fn generate_key_value_with_content_size( + config: &IdIndexedV1StoreConfig, + distance: u8, + content_size: u64, + ) -> (IdentityContentKey, Vec) { + let mut key = rand::random::<[u8; 32]>(); + key[0] = config.node_id.raw()[0] ^ distance; + let key = IdentityContentKey::new(key); + + if content_size < 2 * 32 { + panic!("Content size of at least 64 bytes is required (32 for id + 32 for key)") + } + let value = generate_random_bytes((content_size - 2 * 32) as usize); + (key, value) + } + + // Creates table and content at approximate middle distance (first byte distance is 0.80). + fn create_and_populate_table(config: &IdIndexedV1StoreConfig, count: u64) -> Result<()> { + maybe_create_table_and_indexes(&config.content_type, &config.sql_connection_pool)?; + for _ in 0..count { + let (key, value) = generate_key_value(config, 0x80); + let id = key.content_id(); + let content_size = id.len() + key.to_bytes().len() + value.len(); + config + .sql_connection_pool + .get()? + .execute(&sql::insert(&config.content_type), named_params! { + ":content_id": id.as_slice(), + ":content_key": key.to_bytes(), + ":content_value": value, + ":distance_short": config.distance_fn.distance(&config.node_id, &id).big_endian_u32(), + ":content_size": content_size, + })?; + } + Ok(()) + } + + #[test] + fn create_empty() -> Result<()> { + let temp_dir = TempDir::new()?; + let config = create_config(&temp_dir); + let store = IdIndexedV1Store::create(ContentType::State, config)?; + assert_eq!(store.radius(), Distance::MAX); + assert_eq!(store.inserts_until_pruning, 0); + Ok(()) + } + + #[test] + fn create_low_usage_full() -> Result<()> { + let temp_dir = TempDir::new()?; + let config = create_config(&temp_dir); + + let item_count = 20; + create_and_populate_table(&config, item_count)?; + + let store = IdIndexedV1Store::create(ContentType::State, config)?; + let usage_stats = store.get_usage_stats()?; + + assert_eq!(usage_stats.content_count, item_count); + assert_eq!( + usage_stats.used_storage_bytes, + item_count * CONTENT_SIZE_100_BYTES + ); + assert_eq!(store.radius(), Distance::MAX); + assert_eq!(store.inserts_until_pruning, item_count); + Ok(()) + } + + #[test] + fn create_half_full() -> Result<()> { + let temp_dir = TempDir::new()?; + let config = create_config(&temp_dir); + + let item_count = 50; + create_and_populate_table(&config, item_count)?; + + let store = IdIndexedV1Store::create(ContentType::State, config)?; + let usage_stats = store.get_usage_stats()?; + + assert_eq!(usage_stats.content_count, item_count); + assert_eq!( + usage_stats.used_storage_bytes, + item_count * CONTENT_SIZE_100_BYTES + ); + assert_eq!(store.radius(), Distance::MAX); + assert_eq!(store.inserts_until_pruning, 100 - item_count); + Ok(()) + } + + #[test] + fn create_at_target_capacity() -> Result<()> { + let temp_dir = TempDir::new()?; + let config = create_config(&temp_dir); + + let target_capacity_count = config.target_capacity() / CONTENT_SIZE_100_BYTES; + create_and_populate_table(&config, target_capacity_count)?; + + let store = IdIndexedV1Store::create(ContentType::State, config)?; + let usage_stats = store.get_usage_stats()?; + assert_eq!(usage_stats.content_count, target_capacity_count); + assert_eq!( + usage_stats.used_storage_bytes, + target_capacity_count * CONTENT_SIZE_100_BYTES + ); + assert_eq!(store.radius(), Distance::MAX); + assert_eq!(store.inserts_until_pruning, 100 - target_capacity_count); + Ok(()) + } + + #[test] + fn create_at_pruning_capacity() -> Result<()> { + let temp_dir = TempDir::new()?; + let config = create_config(&temp_dir); + + let pruning_capacity_count = config.pruning_capacity_threshold() / CONTENT_SIZE_100_BYTES; + create_and_populate_table(&config, pruning_capacity_count)?; + + let store = IdIndexedV1Store::create(ContentType::State, config)?; + let usage_stats = store.get_usage_stats()?; + + // no pruning should happen + assert_eq!(usage_stats.content_count, pruning_capacity_count); + assert_eq!( + usage_stats.used_storage_bytes, + pruning_capacity_count * CONTENT_SIZE_100_BYTES + ); + assert!(store.radius() < Distance::MAX); + assert_eq!(store.inserts_until_pruning, 100 - pruning_capacity_count); + Ok(()) + } + + #[test] + fn create_above_pruning_capacity() -> Result<()> { + let temp_dir = TempDir::new()?; + let config = create_config(&temp_dir); + + let above_pruning_capacity_count = + 1 + config.pruning_capacity_threshold() / CONTENT_SIZE_100_BYTES; + let target_capacity_count = config.target_capacity() / CONTENT_SIZE_100_BYTES; + + create_and_populate_table(&config, above_pruning_capacity_count)?; + + let store = IdIndexedV1Store::create(ContentType::State, config)?; + let usage_stats = store.get_usage_stats()?; + + // should prune until target capacity + assert_eq!(usage_stats.content_count, target_capacity_count); + assert_eq!( + usage_stats.used_storage_bytes, + target_capacity_count * CONTENT_SIZE_100_BYTES + ); + assert!(store.radius() < Distance::MAX); + assert_eq!(store.inserts_until_pruning, 100 - target_capacity_count); + Ok(()) + } + + #[test] + fn create_above_full_capacity() -> Result<()> { + let temp_dir = TempDir::new()?; + let config = create_config(&temp_dir); + + let above_full_capacity_count = 10 + config.storage_capacity_bytes / CONTENT_SIZE_100_BYTES; + let target_capacity_count = config.target_capacity() / CONTENT_SIZE_100_BYTES; + + create_and_populate_table(&config, above_full_capacity_count)?; + + let store = IdIndexedV1Store::create(ContentType::State, config)?; + let usage_stats = store.get_usage_stats()?; + + // should prune until target capacity + assert_eq!(usage_stats.content_count, target_capacity_count); + assert_eq!( + usage_stats.used_storage_bytes, + target_capacity_count * CONTENT_SIZE_100_BYTES + ); + assert!(store.radius() < Distance::MAX); + assert_eq!(store.inserts_until_pruning, 100 - target_capacity_count); + Ok(()) + } + + #[test] + fn simple_insert_and_lookup() -> Result<()> { + let temp_dir = TempDir::new()?; + let config = create_config(&temp_dir); + let mut store = IdIndexedV1Store::create(ContentType::State, config.clone())?; + + let (key, value) = generate_key_value(&config, 0); + let id = ContentId::from(key.content_id()); + + assert!(!store.has_content(&id)?); + + store.insert(&key, value.clone())?; + + assert!(store.has_content(&id)?); + assert_eq!(store.lookup_content_key(&id)?, Some(key)); + assert_eq!(store.lookup_content_value(&id)?, Some(value)); + + Ok(()) + } + + #[test] + fn simple_insert_and_delete() -> Result<()> { + let temp_dir = TempDir::new()?; + let config = create_config(&temp_dir); + let mut store = IdIndexedV1Store::create(ContentType::State, config.clone())?; + + let (key, value) = generate_key_value(&config, 0); + let id = ContentId::from(key.content_id()); + + assert!(!store.has_content(&id)?); + + store.insert(&key, value)?; + assert!(store.has_content(&id)?); + + store.delete(&id)?; + assert!(!store.has_content(&id)?); + + Ok(()) + } + + #[test] + fn inserts_until_pruning_from_empty_to_full() -> Result<()> { + let temp_dir = TempDir::new()?; + let config = create_config(&temp_dir); + let mut store = IdIndexedV1Store::create(ContentType::State, config.clone())?; + + assert_eq!(store.inserts_until_pruning, 0); + + let mut insert_and_check = + |insert_count: u64, expected_count: u64, expected_inserts_until_pruning: u64| { + for _ in 0..insert_count { + let (key, value) = generate_key_value(&config, 0); + store.insert(&key, value).unwrap(); + } + let usage_stats = store.get_usage_stats().unwrap(); + assert_eq!( + usage_stats.content_count, expected_count, + "UsageStats: {usage_stats:?}. Testing count" + ); + assert_eq!( + store.inserts_until_pruning, expected_inserts_until_pruning, + "UsageStats: {usage_stats:?}. Testing inserts_until_pruning" + ); + }; + + // The inserts_until_pruning shouldn't be bigger that stored count + insert_and_check( + /* insert_count= */ 1, /* expected_count= */ 1, + /* expected_inserts_until_pruning= */ 1, + ); + insert_and_check( + /* insert_count= */ 1, /* expected_count= */ 2, + /* expected_inserts_until_pruning= */ 2, + ); + insert_and_check( + /* insert_count= */ 2, /* expected_count= */ 4, + /* expected_inserts_until_pruning= */ 4, + ); + insert_and_check( + /* insert_count= */ 4, /* expected_count= */ 8, + /* expected_inserts_until_pruning= */ 8, + ); + insert_and_check( + /* insert_count= */ 8, /* expected_count= */ 16, + /* expected_inserts_until_pruning= */ 16, + ); + insert_and_check( + /* insert_count= */ 16, /* expected_count= */ 32, + /* expected_inserts_until_pruning= */ 32, + ); + + // The inserts_until_pruning should estimate when we reach full capacity + insert_and_check( + /* insert_count= */ 32, /* expected_count= */ 64, + /* expected_inserts_until_pruning= */ 36, + ); + + // We shouldn't trigger pruning for next `inserts_until_pruning - 1` inserts. + insert_and_check( + /* insert_count= */ 35, /* expected_count= */ 99, + /* expected_inserts_until_pruning= */ 1, + ); + + // Inserting one more should trigger pruning and we should be down to target capacity. + insert_and_check( + /* insert_count= */ 1, /* expected_count= */ 90, + /* expected_inserts_until_pruning= */ 10, + ); + + Ok(()) + } + + #[test] + fn pruning_with_one_large_item() -> Result<()> { + let temp_dir = TempDir::new()?; + let config = create_config(&temp_dir); + + // fill 50% of storage with 50 items, 1% each + create_and_populate_table(&config, 50)?; + let mut store = IdIndexedV1Store::create(ContentType::State, config.clone())?; + assert_eq!(store.inserts_until_pruning, 50); + + // Insert key/value such that: + // - key shouldn't be pruned (close distance) + // - value takes 50% of the storage + let (big_value_key, value) = generate_key_value_with_content_size( + &config, + /* distance = */ 0, + STORAGE_CAPACITY_10KB_IN_BYTES / 2, + ); + store.insert(&big_value_key, value)?; + + // Add another 48 small items (1% each) and check that: + // - we didn't prune + // - we are at 148% total capacity + for _ in 0..48 { + let (key, value) = generate_key_value(&config, 0x80); + store.insert(&key, value).unwrap(); + } + assert_eq!(store.inserts_until_pruning, 1); + assert_eq!( + store.get_usage_stats()?.used_storage_bytes, + STORAGE_CAPACITY_10KB_IN_BYTES * 148 / 100 + ); + + // Add one more and check that: + // - we pruned enough to be under pruning capacity + // - the big_value_key is still stored + // - inserts_until_pruning is set to correct value + let (key, value) = generate_key_value(&config, 1); + store.insert(&key, value).unwrap(); + assert!(store.get_usage_stats()?.used_storage_bytes <= config.pruning_capacity_threshold()); + assert!(store.has_content(&big_value_key.content_id().into())?); + assert_eq!(store.inserts_until_pruning, 2); + + Ok(()) + } + + #[test] + fn pruning_with_many_close_large_item() -> Result<()> { + let temp_dir = TempDir::new()?; + let config = create_config(&temp_dir); + + // Fill 50% of storage with 50 items, 1% each + create_and_populate_table(&config, 50)?; + let mut store = IdIndexedV1Store::create(ContentType::State, config.clone())?; + assert_eq!(store.inserts_until_pruning, 50); + + // Add 49 items with small distance and large size (50% total storage each) and check that: + // - pruning didn't happen + // - we exceed storage capacity 25 times at this point + for _ in 0..49 { + let (key, value) = generate_key_value_with_content_size( + &config, + /* distance = */ 0, + STORAGE_CAPACITY_10KB_IN_BYTES / 2, + ); + store.insert(&key, value)?; + } + assert_eq!(store.inserts_until_pruning, 1); + assert_eq!( + store.get_usage_stats()?.used_storage_bytes, + 25 * STORAGE_CAPACITY_10KB_IN_BYTES + ); + + // Add one more big item and check that: + // - we pruned all but one big one + // - inserts_until_pruning is set to 1 + let (key, value) = generate_key_value_with_content_size( + &config, + /* distance = */ 0, + STORAGE_CAPACITY_10KB_IN_BYTES / 2, + ); + store.insert(&key, value)?; + assert_eq!( + store.get_usage_stats()?, + UsageStats { + content_count: 1, + used_storage_bytes: STORAGE_CAPACITY_10KB_IN_BYTES / 2 + } + ); + assert_eq!(store.inserts_until_pruning, 1); + + Ok(()) + } +} diff --git a/trin-storage/src/versioned/id_indexed_v1/usage_stats.rs b/trin-storage/src/versioned/id_indexed_v1/usage_stats.rs new file mode 100644 index 000000000..834bb15db --- /dev/null +++ b/trin-storage/src/versioned/id_indexed_v1/usage_stats.rs @@ -0,0 +1,263 @@ +use super::IdIndexedV1StoreConfig; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct UsageStats { + /// The total count of the content items stored. + pub content_count: u64, + /// The sum of the `content_size` of all stored content items. + pub used_storage_bytes: u64, +} + +impl UsageStats { + pub fn is_above_pruning_capacity_threshold(&self, config: &IdIndexedV1StoreConfig) -> bool { + self.used_storage_bytes > config.pruning_capacity_threshold() + } + + pub fn is_above_target_capacity(&self, config: &IdIndexedV1StoreConfig) -> bool { + self.used_storage_bytes > config.target_capacity() + } + + pub fn estimated_full_capacity_count(&self, config: &IdIndexedV1StoreConfig) -> Option { + self.average_content_size_bytes() + .map(|average_content_size_bytes| { + (config.storage_capacity_bytes as f64 / average_content_size_bytes).floor() as u64 + }) + } + + pub fn estimated_target_capacity_count(&self, config: &IdIndexedV1StoreConfig) -> Option { + self.average_content_size_bytes() + .map(|average_content_size_bytes| { + (config.target_capacity() as f64 / average_content_size_bytes).floor() as u64 + }) + } + + /// Returns the estimated number of items to insert to reach full capacity. This value will not + /// exceed the number of currently stored items. + pub fn estimate_insert_until_full(&self, config: &IdIndexedV1StoreConfig) -> u64 { + self.estimated_full_capacity_count(config) + .map(|full_capacity_count| { + if full_capacity_count > self.content_count { + (full_capacity_count - self.content_count).min(self.content_count) + } else { + 0 + } + }) + .unwrap_or(0) + } + + /// Returns the estimated number of items to delete to reach target capacity. If we are below + /// target capacity, it will return 0. + pub fn delete_until_target(&self, config: &IdIndexedV1StoreConfig) -> u64 { + self.estimated_target_capacity_count(config) + .map(|target_capacity_count| { + if self.content_count > target_capacity_count { + self.content_count - target_capacity_count + } else { + 0 + } + }) + .unwrap_or(0) + } + + fn average_content_size_bytes(&self) -> Option { + if self.content_count == 0 { + Option::None + } else { + Option::Some(self.used_storage_bytes as f64 / self.content_count as f64) + } + } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use discv5::enr::NodeId; + use ethportal_api::types::portal_wire::ProtocolId; + use r2d2::Pool; + use r2d2_sqlite::SqliteConnectionManager; + use rstest::rstest; + + use crate::{versioned::ContentType, DistanceFunction}; + + use super::*; + + const STORAGE_CAPACITY_BYTES: u64 = 1000; + + fn create_config() -> IdIndexedV1StoreConfig { + IdIndexedV1StoreConfig { + content_type: ContentType::State, + network: ProtocolId::State, + node_id: NodeId::random(), + node_data_dir: PathBuf::default(), + storage_capacity_bytes: STORAGE_CAPACITY_BYTES, + sql_connection_pool: Pool::new(SqliteConnectionManager::memory()).unwrap(), + distance_fn: DistanceFunction::Xor, + } + } + + #[rstest] + #[case::no_usage(0, 0, false, false)] + #[case::low_usage(10, 100, false, false)] + #[case::just_below_target_capacity(89, 890, false, false)] + #[case::target_capacity(90, 900, false, false)] + #[case::between_target_and_pruning(92, 920, false, true)] + #[case::pruning(95, 950, false, true)] + #[case::between_pruning_and_full(97, 970, true, true)] + #[case::full(100, 1000, true, true)] + #[case::above_full(110, 1100, true, true)] + fn is_above_capacity( + #[case] content_count: u64, + #[case] used_storage_bytes: u64, + #[case] is_above_pruning_capacity_threshold: bool, + #[case] is_above_target_capacity: bool, + ) { + let config = create_config(); + let usage_stats = UsageStats { + content_count, + used_storage_bytes, + }; + + assert_eq!( + usage_stats.is_above_pruning_capacity_threshold(&config), + is_above_pruning_capacity_threshold, + "testing is_above_pruning_capacity_threshold" + ); + assert_eq!( + usage_stats.is_above_target_capacity(&config), + is_above_target_capacity, + "is_above_target_capacity" + ); + } + + #[test] + fn estimate_capacity_count_no_usage() { + let config = create_config(); + let usage_stats = UsageStats { + content_count: 0, + used_storage_bytes: 0, + }; + + assert_eq!( + usage_stats.estimated_full_capacity_count(&config), + None, + "testing estimated_full_capacity_count" + ); + assert_eq!( + usage_stats.estimated_target_capacity_count(&config), + None, + "estimated_target_capacity_count" + ); + } + + #[rstest] + #[case::low_usage_1(10, 100, 100, 90)] + #[case::low_usage_2(20, 100, 200, 180)] + #[case::low_usage_3(50, 100, 500, 450)] + #[case::mid_usage_1(10, 500, 20, 18)] + #[case::mid_usage_2(25, 500, 50, 45)] + #[case::mid_usage_3(50, 500, 100, 90)] + #[case::between_target_and_pruning_1(10, 920, 10, 9)] + #[case::between_target_and_pruning_2(20, 920, 21, 19)] + #[case::between_target_and_pruning_3(50, 920, 54, 48)] + #[case::between_pruning_and_full_1(10, 970, 10, 9)] + #[case::between_pruning_and_full_2(20, 970, 20, 18)] + #[case::between_pruning_and_full_3(50, 970, 51, 46)] + #[case::above_full_1(10, 1050, 9, 8)] + #[case::above_full_2(20, 1050, 19, 17)] + #[case::above_full_3(50, 1050, 47, 42)] + fn estimate_capacity_count( + #[case] content_count: u64, + #[case] used_storage_bytes: u64, + #[case] estimated_full_capacity_count: u64, + #[case] estimated_target_capacity_count: u64, + ) { + let config = create_config(); + let usage_stats = UsageStats { + content_count, + used_storage_bytes, + }; + + assert_eq!( + usage_stats.estimated_full_capacity_count(&config), + Some(estimated_full_capacity_count), + "testing estimated_full_capacity_count" + ); + assert_eq!( + usage_stats.estimated_target_capacity_count(&config), + Some(estimated_target_capacity_count), + "estimated_target_capacity_count" + ); + } + + #[rstest] + #[case::low_usage_1(0, 0, 0)] + #[case::low_usage_1(10, 100, 10)] + #[case::low_usage_2(20, 100, 20)] + #[case::low_usage_3(50, 100, 50)] + #[case::mid_usage_1(10, 500, 10)] + #[case::mid_usage_2(25, 500, 25)] + #[case::mid_usage_3(50, 500, 50)] + #[case::between_target_and_pruning_1(10, 920, 0)] + #[case::between_target_and_pruning_2(20, 920, 1)] + #[case::between_target_and_pruning_3(50, 920, 4)] + #[case::between_target_and_pruning_4(100, 920, 8)] + #[case::between_pruning_and_full_1(10, 970, 0)] + #[case::between_pruning_and_full_2(20, 970, 0)] + #[case::between_pruning_and_full_3(50, 970, 1)] + #[case::between_pruning_and_full_4(100, 970, 3)] + #[case::above_full_1(10, 1050, 0)] + #[case::above_full_2(20, 1050, 0)] + #[case::above_full_3(50, 1050, 0)] + fn insert_until_full( + #[case] content_count: u64, + #[case] used_storage_bytes: u64, + #[case] expected_insert_until_full: u64, + ) { + let config = create_config(); + let usage_stats = UsageStats { + content_count, + used_storage_bytes, + }; + + assert_eq!( + usage_stats.estimate_insert_until_full(&config), + expected_insert_until_full + ); + } + + #[rstest] + #[case::low_usage_1(10, 100, 0)] + #[case::low_usage_2(20, 100, 0)] + #[case::low_usage_3(50, 100, 0)] + #[case::mid_usage_1(10, 500, 0)] + #[case::mid_usage_2(25, 500, 0)] + #[case::mid_usage_3(50, 500, 0)] + #[case::between_target_and_pruning_1(10, 920, 1)] + #[case::between_target_and_pruning_2(20, 920, 1)] + #[case::between_target_and_pruning_3(50, 920, 2)] + #[case::between_target_and_pruning_4(100, 920, 3)] + #[case::between_pruning_and_full_1(10, 970, 1)] + #[case::between_pruning_and_full_2(20, 970, 2)] + #[case::between_pruning_and_full_3(50, 970, 4)] + #[case::between_pruning_and_full_4(100, 970, 8)] + #[case::above_full_1(10, 1050, 2)] + #[case::above_full_2(20, 1050, 3)] + #[case::above_full_3(50, 1050, 8)] + fn delete_until_target( + #[case] content_count: u64, + #[case] used_storage_bytes: u64, + #[case] expected_delete_until_target: u64, + ) { + let config = create_config(); + let usage_stats = UsageStats { + content_count, + used_storage_bytes, + }; + + assert_eq!( + usage_stats.delete_until_target(&config), + expected_delete_until_target + ); + } +} diff --git a/trin-storage/src/versioned/mod.rs b/trin-storage/src/versioned/mod.rs index 499085172..32d8ea9e1 100644 --- a/trin-storage/src/versioned/mod.rs +++ b/trin-storage/src/versioned/mod.rs @@ -1,3 +1,4 @@ +mod id_indexed_v1; pub mod sql; pub mod store; mod utils; @@ -5,6 +6,7 @@ mod utils; use rusqlite::types::{FromSql, FromSqlError, ValueRef}; use strum::{AsRefStr, Display, EnumString}; +pub use id_indexed_v1::{IdIndexedV1Store, IdIndexedV1StoreConfig}; pub use store::VersionedContentStore; pub use utils::create_store; @@ -16,7 +18,6 @@ pub use utils::create_store; #[derive(Clone, Debug, Display, Eq, PartialEq, AsRefStr)] #[strum(serialize_all = "snake_case")] pub enum ContentType { - Beacon, History, State, } diff --git a/trin-storage/src/versioned/store.rs b/trin-storage/src/versioned/store.rs index ab5a5e6e8..95f3f288e 100644 --- a/trin-storage/src/versioned/store.rs +++ b/trin-storage/src/versioned/store.rs @@ -1,12 +1,12 @@ -use trin_metrics::storage::StorageMetricsReporter; - -use crate::{error::ContentStoreError, PortalStorageConfig}; +use crate::error::ContentStoreError; use super::{ContentType, StoreVersion}; /// A trait for the versioned content store. Instance of it should be created using /// `create_store` function. pub trait VersionedContentStore: Sized { + type Config; + /// Returns the version of the store. fn version() -> StoreVersion; @@ -14,17 +14,10 @@ pub trait VersionedContentStore: Sized { fn migrate_from( content_type: &ContentType, old_version: StoreVersion, - config: &PortalStorageConfig, + config: &Self::Config, ) -> Result<(), ContentStoreError>; /// Creates the instance of the store. This shouldn't be used directly. Store should be /// created using `create_store` function. - fn create( - content_type: ContentType, - config: PortalStorageConfig, - metrics: StorageMetricsReporter, - ) -> Result; - - /// Returns the summary info of the store. - fn get_summary_info(&self) -> String; + fn create(content_type: ContentType, config: Self::Config) -> Result; } diff --git a/trin-storage/src/versioned/utils.rs b/trin-storage/src/versioned/utils.rs index 4f2d79ff2..05082d06a 100644 --- a/trin-storage/src/versioned/utils.rs +++ b/trin-storage/src/versioned/utils.rs @@ -1,10 +1,8 @@ -use ethportal_api::types::portal_wire::ProtocolId; -use r2d2::PooledConnection; +use r2d2::{Pool, PooledConnection}; use r2d2_sqlite::SqliteConnectionManager; use rusqlite::{named_params, OptionalExtension}; -use trin_metrics::{portalnet::PORTALNET_METRICS, storage::StorageMetricsReporter}; -use crate::{error::ContentStoreError, PortalStorageConfig}; +use crate::error::ContentStoreError; use super::{sql, store::VersionedContentStore, ContentType, StoreVersion}; @@ -12,34 +10,25 @@ use super::{sql, store::VersionedContentStore, ContentType, StoreVersion}; /// that's not the case). pub fn create_store( content_type: ContentType, - network: ProtocolId, - config: PortalStorageConfig, + config: S::Config, + sql_connection_pool: Pool, ) -> Result { - let conn = config.sql_connection_pool.get()?; - - let old_version = lookup_store_version(&content_type, &conn)?; + let old_version = lookup_store_version(&content_type, &sql_connection_pool.get()?)?; match old_version { Some(old_version) => { // Migrate if version doesn't match if S::version() != old_version { S::migrate_from(&content_type, old_version, &config)?; - update_store_info(&content_type, S::version(), &conn)?; + update_store_info(&content_type, S::version(), &sql_connection_pool.get()?)?; } } None => { - update_store_info(&content_type, S::version(), &conn)?; + update_store_info(&content_type, S::version(), &sql_connection_pool.get()?)?; } } - S::create( - content_type, - config, - StorageMetricsReporter { - protocol: network.to_string(), - storage_metrics: PORTALNET_METRICS.storage(), - }, - ) + S::create(content_type, config) } fn lookup_store_version( @@ -76,36 +65,19 @@ fn update_store_info( #[cfg(test)] #[allow(clippy::unwrap_used)] pub mod test { - use std::{env, fs}; - use anyhow::Result; - use discv5::enr::NodeId; - use tempfile::TempDir; - - use super::*; - const STORAGE_CAPACITY_1MB: u64 = 1; + use crate::{test_utils::create_test_portal_storage_config_with_capacity, PortalStorageConfig}; - fn setup_temp_dir() -> Result { - let os_temp = env::temp_dir(); - fs::create_dir_all(&os_temp)?; - let temp_dir = TempDir::new_in(&os_temp)?; + use super::*; - Ok(temp_dir) - } - - fn setup_config(temp_dir: &TempDir) -> Result { - PortalStorageConfig::new( - STORAGE_CAPACITY_1MB, - temp_dir.path().to_path_buf(), - NodeId::random(), - ) - } + const STORAGE_CAPACITY_MB: u64 = 10; #[test] fn lookup_no_store_version() -> Result<()> { - let temp_dir = setup_temp_dir()?; - let conn = setup_config(&temp_dir)?.sql_connection_pool.get()?; + let (_temp_dir, config) = + create_test_portal_storage_config_with_capacity(STORAGE_CAPACITY_MB)?; + let conn = config.sql_connection_pool.get()?; assert_eq!(lookup_store_version(&ContentType::State, &conn)?, None); Ok(()) @@ -113,8 +85,9 @@ pub mod test { #[test] fn insert_store_verion() -> Result<()> { - let temp_dir = setup_temp_dir()?; - let conn = setup_config(&temp_dir)?.sql_connection_pool.get()?; + let (_temp_dir, config) = + create_test_portal_storage_config_with_capacity(STORAGE_CAPACITY_MB)?; + let conn = config.sql_connection_pool.get()?; update_store_info(&ContentType::State, StoreVersion::IdIndexedV1, &conn)?; @@ -127,8 +100,9 @@ pub mod test { #[test] fn update_store_verion() -> Result<()> { - let temp_dir = setup_temp_dir()?; - let conn = setup_config(&temp_dir)?.sql_connection_pool.get()?; + let (_temp_dir, config) = + create_test_portal_storage_config_with_capacity(STORAGE_CAPACITY_MB)?; + let conn = config.sql_connection_pool.get()?; // Set store version update_store_info(&ContentType::State, StoreVersion::IdIndexedLegacy, &conn)?; @@ -149,14 +123,19 @@ pub mod test { #[test] fn create_store_no_old_version() -> Result<()> { - let temp_dir = setup_temp_dir()?; - let config = setup_config(&temp_dir)?; + let (_temp_dir, config) = + create_test_portal_storage_config_with_capacity(STORAGE_CAPACITY_MB)?; + let sql_connection_pool = config.sql_connection_pool.clone(); // Should be successful - create_store::(ContentType::State, ProtocolId::State, config.clone())?; + create_store::( + ContentType::State, + config.clone(), + sql_connection_pool.clone(), + )?; assert_eq!( - lookup_store_version(&ContentType::State, &config.sql_connection_pool.get()?)?, + lookup_store_version(&ContentType::State, &sql_connection_pool.get()?)?, Some(StoreVersion::IdIndexedV1) ); @@ -165,17 +144,18 @@ pub mod test { #[test] fn create_store_same_old_version() -> Result<()> { - let temp_dir = setup_temp_dir()?; - let config = setup_config(&temp_dir)?; + let (_temp_dir, config) = + create_test_portal_storage_config_with_capacity(STORAGE_CAPACITY_MB)?; + let sql_connection_pool = config.sql_connection_pool.clone(); update_store_info( &ContentType::State, StoreVersion::IdIndexedV1, - &config.sql_connection_pool.get()?, + &sql_connection_pool.get()?, )?; // Should be successful - create_store::(ContentType::State, ProtocolId::State, config.clone())?; + create_store::(ContentType::State, config.clone(), sql_connection_pool)?; assert_eq!( lookup_store_version(&ContentType::State, &config.sql_connection_pool.get()?)?, @@ -188,23 +168,26 @@ pub mod test { #[test] #[should_panic = "UnsupportedStoreMigration"] fn create_store_different_old_version() { - let temp_dir = setup_temp_dir().unwrap(); - let config = setup_config(&temp_dir).unwrap(); + let (_temp_dir, config) = + create_test_portal_storage_config_with_capacity(STORAGE_CAPACITY_MB).unwrap(); + let sql_connection_pool = config.sql_connection_pool.clone(); update_store_info( &ContentType::State, StoreVersion::IdIndexedLegacy, - &config.sql_connection_pool.get().unwrap(), + &sql_connection_pool.get().unwrap(), ) .unwrap(); // Should panic - MockContentStore doesn't support migration. - create_store::(ContentType::State, ProtocolId::State, config).unwrap(); + create_store::(ContentType::State, config, sql_connection_pool).unwrap(); } pub struct MockContentStore; impl VersionedContentStore for MockContentStore { + type Config = PortalStorageConfig; + fn version() -> StoreVersion { StoreVersion::IdIndexedV1 } @@ -223,13 +206,8 @@ pub mod test { fn create( _content_type: ContentType, _config: PortalStorageConfig, - _metrics: StorageMetricsReporter, ) -> Result { Ok(Self {}) } - - fn get_summary_info(&self) -> String { - "MockVersionedContentStore summary".to_string() - } } } diff --git a/trin-utils/src/lib.rs b/trin-utils/src/lib.rs index ec071f1d4..db1b73b92 100644 --- a/trin-utils/src/lib.rs +++ b/trin-utils/src/lib.rs @@ -2,4 +2,5 @@ #![warn(clippy::uninlined_format_args)] pub mod log; +pub mod submodules; pub mod version; diff --git a/trin-utils/src/submodules.rs b/trin-utils/src/submodules.rs new file mode 100644 index 000000000..85d2eee93 --- /dev/null +++ b/trin-utils/src/submodules.rs @@ -0,0 +1,12 @@ +use std::{ + fs::read_to_string, + io, + path::{Path, PathBuf}, +}; + +pub const PORTAL_SPEC_TESTS_SUBMODULE_PATH: &str = "../portal-spec-tests"; + +/// Reads a file from a "portal-spec-tests" submodule. +pub fn read_portal_spec_tests_file>(path: P) -> io::Result { + read_to_string(PathBuf::from(PORTAL_SPEC_TESTS_SUBMODULE_PATH).join(path)) +}