From abf05b09a1482e8f05b36261b98d048317549676 Mon Sep 17 00:00:00 2001 From: Kolby Moroz Liebl <31669092+KolbyML@users.noreply.github.com> Date: Tue, 13 Aug 2024 12:46:18 -0600 Subject: [PATCH] feat: add stream mode for e2store files (#1373) --- Cargo.lock | 1 + e2store/Cargo.toml | 1 + e2store/README.md | 8 +- e2store/src/{e2s.rs => e2store/memory.rs} | 125 ++---------------- e2store/src/e2store/mod.rs | 3 + e2store/src/e2store/stream.rs | 87 ++++++++++++ e2store/src/e2store/types.rs | 154 ++++++++++++++++++++++ e2store/src/era.rs | 14 +- e2store/src/era1.rs | 54 ++------ e2store/src/lib.rs | 2 +- 10 files changed, 277 insertions(+), 172 deletions(-) rename e2store/src/{e2s.rs => e2store/memory.rs} (51%) create mode 100644 e2store/src/e2store/mod.rs create mode 100644 e2store/src/e2store/stream.rs create mode 100644 e2store/src/e2store/types.rs diff --git a/Cargo.lock b/Cargo.lock index fcfc1a39a..b1eb148e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2169,6 +2169,7 @@ dependencies = [ "scraper", "snap", "surf", + "tempfile", ] [[package]] diff --git a/e2store/Cargo.toml b/e2store/Cargo.toml index 03654f79a..e5e914b26 100644 --- a/e2store/Cargo.toml +++ b/e2store/Cargo.toml @@ -25,4 +25,5 @@ surf = { version = "2.3.2", default-features = false, features = ["h1-client-rus [dev-dependencies] rstest = "0.18.2" +tempfile = "3.3.0" diff --git a/e2store/README.md b/e2store/README.md index 59150e848..e7ec1d56e 100644 --- a/e2store/README.md +++ b/e2store/README.md @@ -11,6 +11,10 @@ E2store is a format originally developed by Nimbus as a framework for building o ## What is era? era is a format for storing beacon chain data more information can be found here https://github.com/status-im/nimbus-eth2/blob/stable/docs/e2store.md#era-files -## What is era1 +## What is era1? -era1 is a format for storing all of Ethereum's post merge blocks. It contains block headers, block bodies, and receipts for pre-merge block history which ranges block 0-15537394 +era1 is a format for storing all of Ethereum's pre merge blocks. It contains block headers, block bodies, and receipts for pre-merge block history which ranges block 0-15537394 + +## What is the difference between `e2store/memory.rs` and `e2store/stream.rs` + +`e2store/memory.rs` provides an api to load a full e2store file such as `.era`/`.era1` and manipulate it in memory. For smaller e2store files this approach works well. The issue comes when dealing with e2store files of much greater size loading the whole file into memory at once often isn't possible. This is where `e2store/stream.rs` comes in where you can stream the data you need from a e2store file as you need it. This will be required in `.era2` a format for storing full flat state snapshots. diff --git a/e2store/src/e2s.rs b/e2store/src/e2store/memory.rs similarity index 51% rename from e2store/src/e2s.rs rename to e2store/src/e2store/memory.rs index c685336cd..86df22a44 100644 --- a/e2store/src/e2s.rs +++ b/e2store/src/e2store/memory.rs @@ -1,23 +1,19 @@ use anyhow::anyhow; -use ssz_derive::{Decode, Encode}; -const _SLOTS_PER_HISTORICAL_ROOT: usize = 8192; -const HEADER_SIZE: u16 = 8; -const VALUE_SIZE_LIMIT: usize = 1024 * 1024 * 50; // 50 MB +use super::types::{Entry, Header}; -pub struct E2StoreFile { +pub struct E2StoreMemory { pub entries: Vec, } -impl TryFrom for Vec { +impl TryFrom for Vec { type Error = anyhow::Error; - fn try_from(e2store_file: E2StoreFile) -> Result, Self::Error> { + fn try_from(e2store_file: E2StoreMemory) -> Result, Self::Error> { e2store_file.serialize() } } -#[allow(dead_code)] -impl E2StoreFile { +impl E2StoreMemory { /// Serialize to a byte vector. fn serialize(&self) -> anyhow::Result> { let length = self.entries.iter().map(|e| e.length() as u32).sum::() as usize; @@ -51,7 +47,7 @@ impl E2StoreFile { bytes[offset + 3], bytes[offset + 4], bytes[offset + 5], - ]) + HEADER_SIZE as u32; + ]) + Header::SERIALIZED_SIZE as u32; let terminating_entry_index = offset + entry_length as usize; if bytes.len() < terminating_entry_index { return Err(anyhow!( @@ -68,111 +64,6 @@ impl E2StoreFile { } } -/// Represents an e2store `Entry` -#[derive(Default, Debug, Eq, PartialEq, Clone)] -pub struct Entry { - pub header: Header, - pub value: Vec, -} - -#[allow(dead_code)] -impl Entry { - pub fn new(type_: u16, value: Vec) -> Self { - Self { - header: Header { - type_, - length: value.len() as u32, - reserved: 0, - }, - value, - } - } - - pub fn length(&self) -> usize { - HEADER_SIZE as usize + self.header.length as usize - } - - /// Serialize to a byte vector. - fn serialize(&self) -> anyhow::Result> { - let length = self.length(); - let mut buf = vec![0; length]; - self.write(&mut buf)?; - Ok(buf) - } - - /// Write to a byte slice. - fn write(&self, buf: &mut [u8]) -> anyhow::Result<()> { - if self.length() != buf.len() { - return Err(anyhow!( - "found invalid buf length for entry: {} - expected {}", - buf.len(), - self.length() - )); - } - if self.length() > VALUE_SIZE_LIMIT { - return Err(anyhow!( - "entry value size limit exceeded: {} - {}", - self.length(), - VALUE_SIZE_LIMIT - )); - } - self.header.write(buf); - buf[8..].copy_from_slice(&self.value); - Ok(()) - } - - /// Deserialize from a byte slice. - pub fn deserialize(bytes: &[u8]) -> anyhow::Result { - let header = Header::deserialize(&bytes[0..8])?; - if header.length as usize + HEADER_SIZE as usize != bytes.len() { - return Err(anyhow!( - "found invalid buf length for entry: {} - expected {}", - bytes.len(), - header.length as usize + HEADER_SIZE as usize - )); - } - Ok(Self { - header: Header::deserialize(&bytes[0..8])?, - value: bytes[8..].to_vec(), - }) - } -} - -/// Represents the header of an e2store `Entry` -#[derive(Clone, Debug, Decode, Encode, Default, Eq, PartialEq)] -pub struct Header { - pub type_: u16, - pub length: u32, - pub reserved: u16, -} - -impl Header { - /// Write to a byte slice. - fn write(&self, buf: &mut [u8]) { - buf[0..2].copy_from_slice(&self.type_.to_le_bytes()); - buf[2..6].copy_from_slice(&self.length.to_le_bytes()); - buf[6..8].copy_from_slice(&self.reserved.to_le_bytes()); - } - - /// Deserialize from a byte slice. - fn deserialize(bytes: &[u8]) -> anyhow::Result { - if bytes.len() != HEADER_SIZE as usize { - return Err(anyhow!("invalid header size: {}", bytes.len())); - } - let type_ = u16::from_le_bytes([bytes[0], bytes[1]]); - let length = u32::from_le_bytes([bytes[2], bytes[3], bytes[4], bytes[5]]); - let reserved = u16::from_le_bytes([bytes[6], bytes[7]]); - if reserved != 0 { - return Err(anyhow!("invalid reserved value: {} - expected 0", reserved)); - } - Ok(Self { - type_, - length, - reserved, - }) - } -} - #[cfg(test)] mod test { use super::*; @@ -207,7 +98,7 @@ mod test { #[test] fn test_entry_multiple() { let expected = "0x2a00020000000000beef0900040000000000abcdabcd"; - let file = E2StoreFile::deserialize(&hex_decode(expected).unwrap()).unwrap(); + let file = E2StoreMemory::deserialize(&hex_decode(expected).unwrap()).unwrap(); assert_eq!(file.entries.len(), 2); assert_eq!(file.entries[0].header.type_, 0x2a); // 42 assert_eq!(file.entries[0].header.length, 2); @@ -226,6 +117,6 @@ mod test { #[case("0xbeef010000000000")] // length exceeds buffer fn test_entry_invalid_decoding(#[case] input: &str) { let buf = hex_decode(input).unwrap(); - assert!(E2StoreFile::deserialize(&buf).is_err()); + assert!(E2StoreMemory::deserialize(&buf).is_err()); } } diff --git a/e2store/src/e2store/mod.rs b/e2store/src/e2store/mod.rs new file mode 100644 index 000000000..81ade019d --- /dev/null +++ b/e2store/src/e2store/mod.rs @@ -0,0 +1,3 @@ +pub mod memory; +pub mod stream; +pub mod types; diff --git a/e2store/src/e2store/stream.rs b/e2store/src/e2store/stream.rs new file mode 100644 index 000000000..446667e8e --- /dev/null +++ b/e2store/src/e2store/stream.rs @@ -0,0 +1,87 @@ +use std::{ + fs::File, + io::{Read, Write}, + path::PathBuf, +}; + +use super::types::{Entry, Header}; + +/// e2store/memory.rs was built to load full .era/.era2 files into memory and provide a simple API +/// to access the data. The issue for this is for larger files this wouldn't be feasible, as the +/// entire file would need to be loaded into memory. This is where e2store_file.rs comes in, it +/// provides a way to read and write e2store files in a streaming fashion. +pub struct E2StoreStream { + pub e2store_file: File, +} + +impl E2StoreStream { + pub fn open(e2store_path: &PathBuf) -> anyhow::Result { + let e2store_file = File::open(e2store_path)?; + Ok(Self { e2store_file }) + } + + pub fn create(e2store_path: &PathBuf) -> anyhow::Result { + let e2store_file = File::create(e2store_path)?; + Ok(Self { e2store_file }) + } + + pub fn next_entry(&mut self) -> anyhow::Result { + let mut buf = vec![0; 8]; + self.e2store_file.read_exact(&mut buf)?; + let header = Header::deserialize(&buf)?; + let mut value = vec![0; header.length as usize]; + self.e2store_file.read_exact(&mut value)?; + Ok(Entry { header, value }) + } + + /// Append an entry to the e2store file. + pub fn append_entry(&mut self, entry: &Entry) -> anyhow::Result<()> { + let buf = entry.serialize()?; + self.e2store_file.write_all(&buf)?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use rand::Rng; + use tempfile::TempDir; + + use crate::e2store::types::VersionEntry; + + use super::*; + + #[test] + fn test_e2store_stream_write_and_read() -> anyhow::Result<()> { + // setup + let mut rng = rand::thread_rng(); + let tmp_dir = TempDir::new()?; + let random_number: u16 = rng.gen(); + let tmp_path = tmp_dir + .as_ref() + .to_path_buf() + .join(format!("{}.e2store_stream_test", random_number)); + + // create a new e2store file and write some data to it + let mut e2store_write_stream = E2StoreStream::create(&tmp_path)?; + + let version = VersionEntry::default(); + e2store_write_stream.append_entry(&version.clone().into())?; + + let value: Vec = (0..100).map(|_| rng.gen_range(0..20)).collect(); + let entry = Entry::new(0, value); + e2store_write_stream.append_entry(&entry)?; + drop(e2store_write_stream); + + // read results and see if they match + let mut e2store_read_stream = E2StoreStream::open(&tmp_path)?; + let read_version_entry = VersionEntry::try_from(&e2store_read_stream.next_entry()?)?; + assert_eq!(version, read_version_entry); + let read_entry = e2store_read_stream.next_entry()?; + assert_eq!(entry, read_entry); + + // cleanup + tmp_dir.close()?; + Ok(()) + } +} diff --git a/e2store/src/e2store/types.rs b/e2store/src/e2store/types.rs new file mode 100644 index 000000000..da7d76692 --- /dev/null +++ b/e2store/src/e2store/types.rs @@ -0,0 +1,154 @@ +use anyhow::{anyhow, ensure}; +use ssz_derive::{Decode, Encode}; + +/// Represents an e2store `Entry` +#[derive(Default, Debug, Eq, PartialEq, Clone)] +pub struct Entry { + pub header: Header, + pub value: Vec, +} + +impl Entry { + pub fn new(type_: u16, value: Vec) -> Self { + Self { + header: Header { + type_, + length: value.len() as u32, + reserved: 0, + }, + value, + } + } + + pub fn length(&self) -> usize { + Header::SERIALIZED_SIZE as usize + self.header.length as usize + } + + /// Serialize to a byte vector. + pub fn serialize(&self) -> anyhow::Result> { + let length = self.length(); + let mut buf = vec![0; length]; + self.write(&mut buf)?; + Ok(buf) + } + + /// Write to a byte slice. + pub fn write(&self, buf: &mut [u8]) -> anyhow::Result<()> { + if self.length() != buf.len() { + return Err(anyhow!( + "found invalid buf length for entry: {} - expected {}", + buf.len(), + self.length() + )); + } + if self.length() > u32::MAX as usize { + return Err(anyhow!( + "entry value size limit exceeded: {} - {}", + self.length(), + u32::MAX + )); + } + self.header.write(buf); + buf[Header::SERIALIZED_SIZE as usize..].copy_from_slice(&self.value); + Ok(()) + } + + /// Deserialize from a byte slice. + pub fn deserialize(bytes: &[u8]) -> anyhow::Result { + let header = Header::deserialize(&bytes[0..8])?; + if header.length as usize + Header::SERIALIZED_SIZE as usize != bytes.len() { + return Err(anyhow!( + "found invalid buf length for entry: {} - expected {}", + bytes.len(), + header.length as usize + Header::SERIALIZED_SIZE as usize + )); + } + Ok(Self { + header, + value: bytes[Header::SERIALIZED_SIZE as usize..].to_vec(), + }) + } +} + +/// Represents the header of an e2store `Entry` +#[derive(Clone, Debug, Decode, Encode, Default, Eq, PartialEq)] +pub struct Header { + pub type_: u16, + pub length: u32, + pub reserved: u16, +} + +impl Header { + pub const SERIALIZED_SIZE: u16 = 8; + + /// Write to a byte slice. + fn write(&self, buf: &mut [u8]) { + buf[0..2].copy_from_slice(&self.type_.to_le_bytes()); + buf[2..6].copy_from_slice(&self.length.to_le_bytes()); + buf[6..8].copy_from_slice(&self.reserved.to_le_bytes()); + } + + /// Deserialize from a byte slice. + pub fn deserialize(bytes: &[u8]) -> anyhow::Result { + if bytes.len() != Header::SERIALIZED_SIZE as usize { + return Err(anyhow!("invalid header size: {}", bytes.len())); + } + let type_ = u16::from_le_bytes([bytes[0], bytes[1]]); + let length = u32::from_le_bytes([bytes[2], bytes[3], bytes[4], bytes[5]]); + let reserved = u16::from_le_bytes([bytes[6], bytes[7]]); + ensure!( + reserved == 0, + "invalid reserved value: {reserved} - expected 0" + ); + Ok(Self { + type_, + length, + reserved, + }) + } +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct VersionEntry { + version: Entry, +} + +impl Default for VersionEntry { + fn default() -> Self { + Self { + version: Entry::new(0x3265, vec![]), + } + } +} + +impl TryFrom<&Entry> for VersionEntry { + type Error = anyhow::Error; + + fn try_from(entry: &Entry) -> anyhow::Result { + ensure!( + entry.header.type_ == 0x3265, + "invalid version entry: incorrect header type" + ); + ensure!( + entry.header.length == 0, + "invalid version entry: incorrect header length" + ); + ensure!( + entry.header.reserved == 0, + "invalid version entry: incorrect header reserved bytes" + ); + ensure!( + entry.value.is_empty(), + "invalid version entry: non-empty value" + ); + Ok(Self { + version: entry.clone(), + }) + } +} + +impl From for Entry { + fn from(val: VersionEntry) -> Self { + val.version + } +} diff --git a/e2store/src/era.rs b/e2store/src/era.rs index 81a9b50fe..2145805d5 100644 --- a/e2store/src/era.rs +++ b/e2store/src/era.rs @@ -1,6 +1,6 @@ -use crate::{ - e2s::{E2StoreFile, Entry}, - era1::VersionEntry, +use crate::e2store::{ + memory::E2StoreMemory, + types::{Entry, VersionEntry}, }; use anyhow::{anyhow, ensure}; use ethportal_api::consensus::{ @@ -40,7 +40,7 @@ impl Era { } pub fn deserialize(buf: &[u8]) -> anyhow::Result { - let file = E2StoreFile::deserialize(buf)?; + let file = E2StoreMemory::deserialize(buf)?; let version = VersionEntry::try_from(&file.entries[0])?; let entries_length = file.entries.len(); let mut blocks = vec![]; @@ -71,7 +71,7 @@ impl Era { /// Deserialize the `BeaconState` from the `Era` file. pub fn deserialize_to_beacon_state(buf: &[u8]) -> anyhow::Result { - let file = E2StoreFile::deserialize(buf)?; + let file = E2StoreMemory::deserialize(buf)?; // The compressed `BeaconState` is the second to last entry in the file. let entries_length = file.entries.len(); let slot_index_state = SlotIndexStateEntry::try_from(&file.entries[entries_length - 1])?; @@ -83,7 +83,7 @@ impl Era { #[allow(dead_code)] fn write(&self) -> anyhow::Result> { let mut entries: Vec = vec![]; - let version_entry: Entry = self.version.clone().try_into()?; + let version_entry: Entry = self.version.clone().into(); entries.push(version_entry); for block in &self.blocks { let block_entry: Entry = block.clone().try_into()?; @@ -95,7 +95,7 @@ impl Era { entries.push(slot_index_block_entry); let slot_index_state_entry: Entry = self.slot_index_state.clone().try_into()?; entries.push(slot_index_state_entry); - let file = E2StoreFile { entries }; + let file = E2StoreMemory { entries }; let file_length = file.length(); let mut buf = vec![0; file_length]; diff --git a/e2store/src/era1.rs b/e2store/src/era1.rs index fcdd08221..a20af10ac 100644 --- a/e2store/src/era1.rs +++ b/e2store/src/era1.rs @@ -1,4 +1,7 @@ -use crate::e2s::{E2StoreFile, Entry}; +use crate::e2store::{ + memory::E2StoreMemory, + types::{Entry, VersionEntry}, +}; use alloy_primitives::{B256, U256}; use alloy_rlp::Decodable; use anyhow::ensure; @@ -41,7 +44,7 @@ impl Era1 { /// this is useful for processing large era1 files without storing the entire /// deserialized era1 object in memory. pub fn iter_tuples(raw_era1: Vec) -> impl Iterator { - let file = E2StoreFile::deserialize(&raw_era1).expect("invalid era1 file"); + let file = E2StoreMemory::deserialize(&raw_era1).expect("invalid era1 file"); let block_index = BlockIndexEntry::try_from(file.entries.last().expect("missing block index entry")) .expect("invalid block index entry") @@ -56,7 +59,7 @@ impl Era1 { } pub fn get_tuple_by_index(raw_era1: &[u8], index: u64) -> BlockTuple { - let file = E2StoreFile::deserialize(raw_era1).expect("invalid era1 file"); + let file = E2StoreMemory::deserialize(raw_era1).expect("invalid era1 file"); let mut entries: [Entry; 4] = Default::default(); for (j, entry) in entries.iter_mut().enumerate() { file.entries[index as usize * 4 + j + 1].clone_into(entry); @@ -65,7 +68,7 @@ impl Era1 { } pub fn deserialize(buf: &[u8]) -> anyhow::Result { - let file = E2StoreFile::deserialize(buf)?; + let file = E2StoreMemory::deserialize(buf)?; ensure!( // era1 file #0-1895 || era1 file #1896 file.entries.len() == ERA1_ENTRY_COUNT || file.entries.len() == 21451, @@ -97,7 +100,7 @@ impl Era1 { #[allow(dead_code)] fn write(&self) -> anyhow::Result> { let mut entries: Vec = vec![]; - let version_entry: Entry = self.version.clone().try_into()?; + let version_entry: Entry = self.version.clone().into(); entries.push(version_entry); for block_tuple in &self.block_tuples { let block_tuple_entries: [Entry; 4] = block_tuple.clone().try_into()?; @@ -107,7 +110,7 @@ impl Era1 { entries.push(accumulator_entry); let block_index_entry: Entry = self.block_index.clone().try_into()?; entries.push(block_index_entry); - let file = E2StoreFile { entries }; + let file = E2StoreMemory { entries }; ensure!( // era1 file #0-1895 || era1 file #1896 file.entries.len() == ERA1_ENTRY_COUNT || file.entries.len() == 21451, @@ -166,45 +169,6 @@ impl TryInto<[Entry; 4]> for BlockTuple { } } -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct VersionEntry { - version: Entry, -} - -impl TryFrom<&Entry> for VersionEntry { - type Error = anyhow::Error; - - fn try_from(entry: &Entry) -> anyhow::Result { - ensure!( - entry.header.type_ == 0x3265, - "invalid version entry: incorrect header type" - ); - ensure!( - entry.header.length == 0, - "invalid version entry: incorrect header length" - ); - ensure!( - entry.header.reserved == 0, - "invalid version entry: incorrect header reserved bytes" - ); - ensure!( - entry.value.is_empty(), - "invalid version entry: non-empty value" - ); - Ok(Self { - version: entry.clone(), - }) - } -} - -impl TryInto for VersionEntry { - type Error = anyhow::Error; - - fn try_into(self) -> anyhow::Result { - Ok(self.version) - } -} - #[derive(Clone, Eq, PartialEq, Debug)] pub struct HeaderEntry { pub header: Header, diff --git a/e2store/src/lib.rs b/e2store/src/lib.rs index c13687e4f..06abd3bb5 100644 --- a/e2store/src/lib.rs +++ b/e2store/src/lib.rs @@ -1,4 +1,4 @@ -pub mod e2s; +pub mod e2store; pub mod era; pub mod era1; pub mod utils;