Skip to content

Commit

Permalink
Merge pull request #11 from semiotic-ai/refactor-fixes-and-tests
Browse files Browse the repository at this point in the history
test: adds receive_message test
  • Loading branch information
pedrohba1 authored Mar 19, 2024
2 parents 17b601b + 7fb3ed8 commit a805a4b
Show file tree
Hide file tree
Showing 14 changed files with 990 additions and 847 deletions.
1,008 changes: 500 additions & 508 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
decoder = { git = "https://github.com/semiotic-ai/flat-files-decoder.git", branch = "main" }
ethportal-api = {git = "https://github.com/ethereum/trin.git", version = "0.2.2"}
tree_hash = "0.5.2"
revm-primitives = "=1.1.2"
Expand All @@ -23,10 +22,18 @@ protobuf-json-mapping = "3.2.0"
bincode = "1.3.3"
serde = "1.0.196"
base64 = "0.21.7"
sf-protos = { git = "https://github.com/semiotic-ai/sf-protos.git", branch = "main" }


[dev-dependencies]
tempfile = "3.0"
decoder = { git = "https://github.com/semiotic-ai/flat-files-decoder.git", branch = "main" }


[profile.release]
codegen-units = 1
lto = false

[build-dependencies]
prost-build = "0.12.3"
prost-wkt-build = "0.5.0"
23 changes: 7 additions & 16 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,16 @@ against header accumulators. This process is used to verify the authenticity of

- `-h, --help`: Display a help message that includes usage, commands, and options.

<!-- ## Usage Examples
1. To validate a stream of epochs, arriving as blocks from flat files:
```
❯ cargo run era_validate stream
```
Then feed the files into the stdin.
If there are are multiple files to feed,

## Goals

2.
-->
Our goal is to provide a tool that can be used to verify
blocks


## Testing
Some tests depend on [flat-files-decoder] to work, so it is used as a development dependency.

## Goals
### Coverage

Our goal is to provide a tool that can be used to verify
blocks
Generate code coverage reports with `cargo llvm-cov --html` and open them with `open ./target/llvm-cov/html/index.html`.
265 changes: 159 additions & 106 deletions src/era_validator.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,39 @@
use std::{
io::{BufRead, Read, Write as StdWrite},
path::Path,
};

use decoder::{
headers::HeaderRecordWithNumber,
sf::{self, ethereum::r#type::v2::Block},
};
use std::path::Path;

use ethportal_api::types::execution::accumulator::HeaderRecord;
use primitive_types::{H256, U256};

use tree_hash::TreeHash;
use trin_validation::accumulator::MasterAccumulator;

use crate::{
errors::EraValidateError,
sync::{check_sync_state, store_last_state, LockEntry},
utils::{
compute_epoch_accumulator, decode_header_records, FINAL_EPOCH, MAX_EPOCH_SIZE, MERGE_BLOCK,
},
types::ExtHeaderRecord,
utils::{compute_epoch_accumulator, FINAL_EPOCH, MAX_EPOCH_SIZE, MERGE_BLOCK},
};

/// Validates many blocks against a header accumulator
/// Validates many headers against a header accumulator
///
/// It also keeps a record in `lockfile.json` of the validated epochs to skip them
///
/// # Arguments
///
/// * `blocks`- A mutable vector of blocks. The Vector can be any size, however, it must be in chunks of 8192 blocks
/// * `headers`- A mutable vector of [`ExtHeaderRecord`]. The Vector can be any size, however, it must be in chunks of 8192 blocks to work properly
/// to function without error
/// * `master_accumulator_file`- An instance of `MasterAccumulator` which is a file that maintains a record of historical epoch
/// * `master_accumulator_file`- An instance of [`MasterAccumulator`] which is a file that maintains a record of historical epoch
/// it is used to verify canonical-ness of headers accumulated from the `blocks`
/// * `start_epoch` - The epoch number that all the first 8192 blocks are set located
/// * `end_epoch` - The epoch number that all the last 8192 blocks are located
/// * `use_lock` - when set to true, uses the lockfile to store already processed blocks. True by default
pub fn era_validate(
mut blocks: Vec<sf::ethereum::r#type::v2::Block>,
mut headers: Vec<ExtHeaderRecord>,
master_accumulator_file: Option<&String>,
start_epoch: usize,
end_epoch: Option<usize>,
use_lock: Option<bool>, // Changed to Option<bool>
) -> Result<Vec<usize>, EraValidateError> {
let use_lock = use_lock.unwrap_or(true);

// Load master accumulator if available, otherwise use default from Portal Network
let master_accumulator = match master_accumulator_file {
Some(master_accumulator_file) => {
Expand All @@ -61,57 +56,80 @@ pub fn era_validate(
let mut validated_epochs = Vec::new();
for epoch in start_epoch..end_epoch {
// checks if epoch was already synced form lockfile.
match check_sync_state(
Path::new("./lockfile.json"),
epoch.to_string(),
master_accumulator.historical_epochs[epoch].0,
) {
Ok(true) => {
log::info!("Skipping, epoch already synced: {}", epoch);
continue;
}
Ok(false) => {
log::info!("syncing new epoch: {}", epoch);
}
Err(e) => {
return {
log::error!("error: {}", e);
Err(EraValidateError::EpochAccumulatorError)
if use_lock {
match check_sync_state(
Path::new("./lockfile.json"),
epoch.to_string(),
master_accumulator.historical_epochs[epoch].0,
) {
Ok(true) => {
log::info!("Skipping, epoch already synced: {}", epoch);
continue;
}
Ok(false) => {
log::info!("syncing new epoch: {}", epoch);
}
Err(e) => {
return {
log::error!("error: {}", e);
Err(EraValidateError::EpochAccumulatorError)
}
}
}
}

let epoch_blocks: Vec<Block> = blocks.drain(0..MAX_EPOCH_SIZE).collect();
let root = process_blocks(epoch_blocks, epoch, &master_accumulator)?;
let epoch_headers: Vec<ExtHeaderRecord> = headers.drain(0..MAX_EPOCH_SIZE).collect();
let root = process_headers(epoch_headers, epoch, &master_accumulator)?;
validated_epochs.push(epoch);
// stores the validated epoch into lockfile to avoid validating again and keeping a concise state
match store_last_state(Path::new("./lockfile.json"), LockEntry::new(&epoch, root)) {
Ok(_) => {}
Err(e) => {
log::error!("error: {}", e);
return Err(EraValidateError::EpochAccumulatorError);
if use_lock {
match store_last_state(Path::new("./lockfile.json"), LockEntry::new(&epoch, root)) {
Ok(_) => {}
Err(e) => {
log::error!("error: {}", e);
return Err(EraValidateError::EpochAccumulatorError);
}
}
}
}

Ok(validated_epochs)
}

/// takes 8192 blocks and checks if they consist in a valid epoch
fn process_blocks(
mut blocks: Vec<sf::ethereum::r#type::v2::Block>,
/// takes 8192 block headers and checks if they consist in a valid epoch.
///
/// An epoch must respect the order of blocks, i.e., block numbers for epoch
/// 0 must start from block 0 to block 8191.
///
/// headers can only be validated for now against epochs before The Merge.
/// All pre-merge blocks (which are numbered before [`FINAL_EPOCH`]), are validated using
/// the [Header Accumulator](https://github.com/ethereum/portal-network-specs/blob/8ad5bc33cb0d4485d2eab73bf2decc43e7566a8f/history-network.md#the-header-accumulator)
///
/// For block post merge, the sync-committee should be used to validate block headers
/// in the canonical blockchain. So this function is not useful for those.
fn process_headers(
mut headers: Vec<ExtHeaderRecord>,
epoch: usize,
master_accumulator: &MasterAccumulator,
) -> Result<[u8; 32], EraValidateError> {
if blocks.len() != MAX_EPOCH_SIZE {
if headers.len() != MAX_EPOCH_SIZE {
Err(EraValidateError::InvalidEpochLength)?;
}
if headers[0].block_number % MAX_EPOCH_SIZE as u64 != 0 {
Err(EraValidateError::InvalidEpochStart)?;
}

if epoch > FINAL_EPOCH {
blocks.retain(|block: &Block| block.number < MERGE_BLOCK);
log::warn!(
"the blocks from this epoch are not being validated since they are post merge.
For post merge blocks, use the sync-committee subprotocol"
);
headers.retain(|header: &ExtHeaderRecord| header.block_number < MERGE_BLOCK);
}

let header_records = decode_header_records(blocks)?;
let header_records: Vec<HeaderRecord> = headers
.into_iter()
.map(|ext_record| HeaderRecord::from(ext_record))
.collect();
let epoch_accumulator = compute_epoch_accumulator(&header_records)?;

// Return an error if the epoch accumulator does not match the master accumulator
Expand All @@ -129,62 +147,97 @@ fn process_blocks(
Ok(root)
}

pub fn stream_validation<R: Read + BufRead, W: StdWrite>(
master_accumulator: MasterAccumulator,
mut reader: R,
mut writer: W,
) -> Result<(), EraValidateError> {
let mut header_records = Vec::new();
let mut append_flag = false;
let mut buf = String::new();

while let Ok(hrwn) = receive_message(&mut reader) {
buf.clear();
if header_records.len() == 0 {
if hrwn.block_number % MAX_EPOCH_SIZE as u64 == 0 {
let epoch = hrwn.block_number as usize / MAX_EPOCH_SIZE;
log::info!("Validating epoch: {}", epoch);
append_flag = true;
}
}
if append_flag == true {
let header_record = HeaderRecord {
block_hash: H256::from_slice(&hrwn.block_hash),
total_difficulty: U256::try_from(hrwn.total_difficulty.as_slice())
.map_err(|_| EraValidateError::TotalDifficultyDecodeError)?,
};
header_records.push(header_record);
}

if header_records.len() == MAX_EPOCH_SIZE {
let epoch = hrwn.block_number as usize / MAX_EPOCH_SIZE;
let epoch_accumulator = compute_epoch_accumulator(&header_records)?;
if epoch_accumulator.tree_hash_root().0 != master_accumulator.historical_epochs[epoch].0
{
Err(EraValidateError::EraAccumulatorMismatch)?;
}
log::info!("Validated epoch: {}", epoch);
writer
.write_all(format!("Validated epoch: {}\n", epoch).as_bytes())
.map_err(|_| EraValidateError::JsonError)?;
header_records.clear();
}
}
log::info!("Read {} block headers from stdin", header_records.len());
Ok(())
}

fn receive_message<R: Read>(reader: &mut R) -> Result<HeaderRecordWithNumber, bincode::Error> {
let mut size_buf = [0u8; 4];
if reader.read_exact(&mut size_buf).is_err() {
return Err(Box::new(bincode::ErrorKind::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Failed to read size",
))));
}
let size = u32::from_be_bytes(size_buf) as usize;

let mut buf = vec![0u8; size];
reader.read_exact(&mut buf)?;
bincode::deserialize(&buf)
}
// TODO: move stream validation to be a functionality of flat_head
// pub fn stream_validation<R: Read + BufRead, W: StdWrite>(
// master_accumulator: MasterAccumulator,
// mut reader: R,
// mut writer: W,
// ) -> Result<(), EraValidateError> {
// let mut header_records = Vec::new();
// let mut append_flag = false;
// let mut buf = String::new();

// while let Ok(hrwn) = receive_message(&mut reader) {
// buf.clear();

// log::info!("{:?}", hrwn.block_hash);
// if header_records.len() == 0 {
// if hrwn.block_number % MAX_EPOCH_SIZE as u64 == 0 {
// let epoch = hrwn.block_number as usize / MAX_EPOCH_SIZE;
// log::info!("Validating epoch: {}", epoch);
// append_flag = true;
// }
// }
// if append_flag == true {
// let header_record = HeaderRecord {
// block_hash: H256::from_slice(&hrwn.block_hash),
// total_difficulty: U256::try_from(hrwn.total_difficulty.as_slice())
// .map_err(|_| EraValidateError::TotalDifficultyDecodeError)?,
// };
// header_records.push(header_record);
// }

// if header_records.len() == MAX_EPOCH_SIZE {
// let epoch = hrwn.block_number as usize / MAX_EPOCH_SIZE;
// let epoch_accumulator = compute_epoch_accumulator(&header_records)?;
// if epoch_accumulator.tree_hash_root().0 != master_accumulator.historical_epochs[epoch].0
// {
// Err(EraValidateError::EraAccumulatorMismatch)?;
// }
// log::info!("Validated epoch: {}", epoch);
// writer
// .write_all(format!("Validated epoch: {}\n", epoch).as_bytes())
// .map_err(|_| EraValidateError::JsonError)?;
// header_records.clear();
// }
// }

// log::info!("Read {} block headers from stdin", header_records.len());
// Ok(())
// }

// TODO: this functionality should be moved to flat_head
// fn receive_message<R: Read>(reader: &mut R) -> Result<HeaderRecordWithNumber, Box<dyn Error>> {
// let mut size_buf = [0u8; 4];
// if reader.read_exact(&mut size_buf).is_err() {
// return Err(Box::new(bincode::ErrorKind::Io(std::io::Error::new(
// std::io::ErrorKind::UnexpectedEof,
// "Failed to read size",
// ))));
// }

// let size = u32::from_be_bytes(size_buf) as usize;
// println!("size: {:?}", size);

// let mut buf = vec![0u8; size];
// reader.read_exact(&mut buf)?;
// let hrwn: HeaderRecordWithNumber = bincode::deserialize(&buf)?;

// println!(" decoding {:?}", hrwn);
// Ok(hrwn)
// }

// Test function

// TODO: move this test to flat_head
// #[cfg(test)]
// mod tests {
// use std::{fs::File, io, path::PathBuf};

// use super::*;

// #[test]
// fn test_receive_message_from_file() -> Result<(), Box<dyn Error>> {
// let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
// path.push("tests/ethereum_firehose_first_8200/0000000000.dbin"); // Adjust the path as needed

// let file = File::open(path)?;
// let mut reader = io::BufReader::new(file);

// let result = receive_message(&mut reader)?;

// println!("block: {:?}", result.block_hash);

// Ok(())
// }
// }
Loading

0 comments on commit a805a4b

Please sign in to comment.