Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: adds receive_message test #11

Merged
merged 29 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e5dc74a
test: adds receive_message test
pedrohba1 Feb 29, 2024
c4c8296
refactor(era_validator): function receives headers now instead of blocks
pedrohba1 Mar 1, 2024
c983baf
feat(utils): adds function to decode a single header
pedrohba1 Mar 1, 2024
fdf11ab
refactor(inclusion_proof): adds two arrays fo handling with HeaderRec…
pedrohba1 Mar 1, 2024
cc26351
docs(era_validator): corrects doc for era_validate
pedrohba1 Mar 1, 2024
1947522
docs(inclusion_proof): removes unnecessary comments
pedrohba1 Mar 1, 2024
ccee8f8
feat(era_validator): optional lock
pedrohba1 Mar 6, 2024
502ab44
docs: change phrasing
pedrohba1 Mar 6, 2024
078ae7a
feat(types): adds a full header to the ExtHeaderRecord
pedrohba1 Mar 6, 2024
18dfa79
feat: adds sf-protos dependency to share types between flat-files-dec…
pedrohba1 Mar 8, 2024
f5998a2
refactor(removes-extract_100_blocks-function,-changes-decode_header_r…
pedrohba1 Mar 9, 2024
03708f5
test: corrects era validator to use ext_header_from_block
pedrohba1 Mar 9, 2024
33360c0
feat(types): adds conversions between ExtHeaderRecord, HeaderRecord a…
pedrohba1 Mar 9, 2024
f386185
refactor(inclusion_proof): simplifies inclusion proof generation an v…
pedrohba1 Mar 9, 2024
6b9c6f5
test(inclusion_proof): adds tests for failing proofs, proof verificat…
pedrohba1 Mar 9, 2024
c313fc1
build(cargo.toml): moves flat-files-decoder to dev dependencies
pedrohba1 Mar 9, 2024
957b7ea
refactor(main): comment code that is to be moved to flat_head
pedrohba1 Mar 9, 2024
3e59852
refactor(types): changes to use TryFrom trait instead of convertion f…
pedrohba1 Mar 11, 2024
c10f788
refactor(utils): removes unnecessary convertion function (it already …
pedrohba1 Mar 11, 2024
994519a
test(tests/era_validator): test lockfile presence
pedrohba1 Mar 11, 2024
8f2a056
refactor: removes unnecessary code
pedrohba1 Mar 11, 2024
3918db4
refactor(inclusion_proof): use constant instead of plain number
pedrohba1 Mar 15, 2024
5fb9aa7
test(era_validator): test if values were skipped accordingly
pedrohba1 Mar 15, 2024
436751c
refactor(sync): remove print
pedrohba1 Mar 15, 2024
f2cb51c
refactor(inclusion_proof): adds new type of error: ProofValidationFai…
pedrohba1 Mar 15, 2024
4d27868
refactor(inclusion_proof): use constant instead of plain number
pedrohba1 Mar 15, 2024
33b42ad
refactor(inclusion_proof): use constant instead of plain number
pedrohba1 Mar 15, 2024
ac34a7d
feat(era_validator): checkes for correct block number start in epoch,…
pedrohba1 Mar 15, 2024
7fb3ed8
fix(era_validator): changes equality to correct comparison
pedrohba1 Mar 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,008 changes: 500 additions & 508 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
decoder = { git = "https://github.com/semiotic-ai/flat-files-decoder.git", branch = "main" }
ethportal-api = {git = "https://github.com/ethereum/trin.git", version = "0.2.2"}
tree_hash = "0.5.2"
revm-primitives = "=1.1.2"
Expand All @@ -23,10 +22,18 @@ protobuf-json-mapping = "3.2.0"
bincode = "1.3.3"
serde = "1.0.196"
base64 = "0.21.7"
sf-protos = { git = "https://github.com/semiotic-ai/sf-protos.git", branch = "main" }


[dev-dependencies]
tempfile = "3.0"
decoder = { git = "https://github.com/semiotic-ai/flat-files-decoder.git", branch = "main" }


[profile.release]
codegen-units = 1
lto = false

[build-dependencies]
prost-build = "0.12.3"
prost-wkt-build = "0.5.0"
23 changes: 7 additions & 16 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,16 @@ against header accumulators. This process is used to verify the authenticity of

- `-h, --help`: Display a help message that includes usage, commands, and options.

<!-- ## Usage Examples

1. To validate a stream of epochs, arriving as blocks from flat files:

```
❯ cargo run era_validate stream
```

Then feed the files into the stdin.

If there are are multiple files to feed,

## Goals

2.
-->
Our goal is to provide a tool that can be used to verify
blocks


## Testing
Some tests depend on [flat-files-decoder] to work, so it is used as a development dependency.

## Goals
### Coverage

Our goal is to provide a tool that can be used to verify
blocks
Generate code coverage reports with `cargo llvm-cov --html` and open them with `open ./target/llvm-cov/html/index.html`.
265 changes: 159 additions & 106 deletions src/era_validator.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,39 @@
use std::{
io::{BufRead, Read, Write as StdWrite},
path::Path,
};

use decoder::{
headers::HeaderRecordWithNumber,
sf::{self, ethereum::r#type::v2::Block},
};
use std::path::Path;

use ethportal_api::types::execution::accumulator::HeaderRecord;
use primitive_types::{H256, U256};

use tree_hash::TreeHash;
use trin_validation::accumulator::MasterAccumulator;

use crate::{
errors::EraValidateError,
sync::{check_sync_state, store_last_state, LockEntry},
utils::{
compute_epoch_accumulator, decode_header_records, FINAL_EPOCH, MAX_EPOCH_SIZE, MERGE_BLOCK,
},
types::ExtHeaderRecord,
utils::{compute_epoch_accumulator, FINAL_EPOCH, MAX_EPOCH_SIZE, MERGE_BLOCK},
};

/// Validates many blocks against a header accumulator
/// Validates many headers against a header accumulator
///
/// It also keeps a record in `lockfile.json` of the validated epochs to skip them
///
/// # Arguments
///
/// * `blocks`- A mutable vector of blocks. The Vector can be any size, however, it must be in chunks of 8192 blocks
/// * `headers`- A mutable vector of [`ExtHeaderRecord`]. The Vector can be any size, however, it must be in chunks of 8192 blocks to work properly
/// to function without error
/// * `master_accumulator_file`- An instance of `MasterAccumulator` which is a file that maintains a record of historical epoch
/// * `master_accumulator_file`- An instance of [`MasterAccumulator`] which is a file that maintains a record of historical epoch
/// it is used to verify canonical-ness of headers accumulated from the `blocks`
/// * `start_epoch` - The epoch number that all the first 8192 blocks are set located
/// * `end_epoch` - The epoch number that all the last 8192 blocks are located
/// * `use_lock` - when set to true, uses the lockfile to store already processed blocks. True by default
pub fn era_validate(
mut blocks: Vec<sf::ethereum::r#type::v2::Block>,
mut headers: Vec<ExtHeaderRecord>,
master_accumulator_file: Option<&String>,
start_epoch: usize,
end_epoch: Option<usize>,
use_lock: Option<bool>, // Changed to Option<bool>
) -> Result<Vec<usize>, EraValidateError> {
let use_lock = use_lock.unwrap_or(true);

// Load master accumulator if available, otherwise use default from Portal Network
let master_accumulator = match master_accumulator_file {
Some(master_accumulator_file) => {
Expand All @@ -61,57 +56,80 @@ pub fn era_validate(
let mut validated_epochs = Vec::new();
for epoch in start_epoch..end_epoch {
// checks if epoch was already synced form lockfile.
match check_sync_state(
Path::new("./lockfile.json"),
epoch.to_string(),
master_accumulator.historical_epochs[epoch].0,
) {
Ok(true) => {
log::info!("Skipping, epoch already synced: {}", epoch);
continue;
}
Ok(false) => {
log::info!("syncing new epoch: {}", epoch);
}
Err(e) => {
return {
log::error!("error: {}", e);
Err(EraValidateError::EpochAccumulatorError)
if use_lock {
match check_sync_state(
Path::new("./lockfile.json"),
epoch.to_string(),
master_accumulator.historical_epochs[epoch].0,
) {
Ok(true) => {
log::info!("Skipping, epoch already synced: {}", epoch);
continue;
}
Ok(false) => {
log::info!("syncing new epoch: {}", epoch);
}
Err(e) => {
return {
log::error!("error: {}", e);
Err(EraValidateError::EpochAccumulatorError)
}
}
}
}

let epoch_blocks: Vec<Block> = blocks.drain(0..MAX_EPOCH_SIZE).collect();
let root = process_blocks(epoch_blocks, epoch, &master_accumulator)?;
let epoch_headers: Vec<ExtHeaderRecord> = headers.drain(0..MAX_EPOCH_SIZE).collect();
let root = process_headers(epoch_headers, epoch, &master_accumulator)?;
validated_epochs.push(epoch);
// stores the validated epoch into lockfile to avoid validating again and keeping a concise state
match store_last_state(Path::new("./lockfile.json"), LockEntry::new(&epoch, root)) {
Ok(_) => {}
Err(e) => {
log::error!("error: {}", e);
return Err(EraValidateError::EpochAccumulatorError);
if use_lock {
match store_last_state(Path::new("./lockfile.json"), LockEntry::new(&epoch, root)) {
Ok(_) => {}
Err(e) => {
log::error!("error: {}", e);
return Err(EraValidateError::EpochAccumulatorError);
}
}
}
}

Ok(validated_epochs)
}

/// takes 8192 blocks and checks if they consist in a valid epoch
fn process_blocks(
mut blocks: Vec<sf::ethereum::r#type::v2::Block>,
/// takes 8192 block headers and checks if they consist in a valid epoch.
///
/// An epoch must respect the order of blocks, i.e., block numbers for epoch
/// 0 must start from block 0 to block 8191.
///
/// headers can only be validated for now against epochs before The Merge.
/// All pre-merge blocks (which are numbered before [`FINAL_EPOCH`]), are validated using
/// the [Header Accumulator](https://github.com/ethereum/portal-network-specs/blob/8ad5bc33cb0d4485d2eab73bf2decc43e7566a8f/history-network.md#the-header-accumulator)
///
/// For block post merge, the sync-committee should be used to validate block headers
/// in the canonical blockchain. So this function is not useful for those.
fn process_headers(
mut headers: Vec<ExtHeaderRecord>,
epoch: usize,
master_accumulator: &MasterAccumulator,
) -> Result<[u8; 32], EraValidateError> {
if blocks.len() != MAX_EPOCH_SIZE {
if headers.len() != MAX_EPOCH_SIZE {
Err(EraValidateError::InvalidEpochLength)?;
}
if headers[0].block_number % MAX_EPOCH_SIZE as u64 != 0 {
Err(EraValidateError::InvalidEpochStart)?;
}

if epoch > FINAL_EPOCH {
severiano-sisneros marked this conversation as resolved.
Show resolved Hide resolved
blocks.retain(|block: &Block| block.number < MERGE_BLOCK);
severiano-sisneros marked this conversation as resolved.
Show resolved Hide resolved
log::warn!(
"the blocks from this epoch are not being validated since they are post merge.
For post merge blocks, use the sync-committee subprotocol"
);
headers.retain(|header: &ExtHeaderRecord| header.block_number < MERGE_BLOCK);
}

let header_records = decode_header_records(blocks)?;
let header_records: Vec<HeaderRecord> = headers
.into_iter()
.map(|ext_record| HeaderRecord::from(ext_record))
.collect();
let epoch_accumulator = compute_epoch_accumulator(&header_records)?;

// Return an error if the epoch accumulator does not match the master accumulator
Expand All @@ -129,62 +147,97 @@ fn process_blocks(
Ok(root)
}

pub fn stream_validation<R: Read + BufRead, W: StdWrite>(
master_accumulator: MasterAccumulator,
mut reader: R,
mut writer: W,
) -> Result<(), EraValidateError> {
let mut header_records = Vec::new();
let mut append_flag = false;
let mut buf = String::new();

while let Ok(hrwn) = receive_message(&mut reader) {
buf.clear();
if header_records.len() == 0 {
if hrwn.block_number % MAX_EPOCH_SIZE as u64 == 0 {
let epoch = hrwn.block_number as usize / MAX_EPOCH_SIZE;
log::info!("Validating epoch: {}", epoch);
append_flag = true;
}
}
if append_flag == true {
let header_record = HeaderRecord {
block_hash: H256::from_slice(&hrwn.block_hash),
total_difficulty: U256::try_from(hrwn.total_difficulty.as_slice())
.map_err(|_| EraValidateError::TotalDifficultyDecodeError)?,
};
header_records.push(header_record);
}

if header_records.len() == MAX_EPOCH_SIZE {
let epoch = hrwn.block_number as usize / MAX_EPOCH_SIZE;
let epoch_accumulator = compute_epoch_accumulator(&header_records)?;
if epoch_accumulator.tree_hash_root().0 != master_accumulator.historical_epochs[epoch].0
{
Err(EraValidateError::EraAccumulatorMismatch)?;
}
log::info!("Validated epoch: {}", epoch);
writer
.write_all(format!("Validated epoch: {}\n", epoch).as_bytes())
.map_err(|_| EraValidateError::JsonError)?;
header_records.clear();
}
}
log::info!("Read {} block headers from stdin", header_records.len());
Ok(())
}

fn receive_message<R: Read>(reader: &mut R) -> Result<HeaderRecordWithNumber, bincode::Error> {
let mut size_buf = [0u8; 4];
if reader.read_exact(&mut size_buf).is_err() {
return Err(Box::new(bincode::ErrorKind::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Failed to read size",
))));
}
let size = u32::from_be_bytes(size_buf) as usize;

let mut buf = vec![0u8; size];
reader.read_exact(&mut buf)?;
bincode::deserialize(&buf)
}
// TODO: move stream validation to be a functionality of flat_head
// pub fn stream_validation<R: Read + BufRead, W: StdWrite>(
// master_accumulator: MasterAccumulator,
// mut reader: R,
// mut writer: W,
// ) -> Result<(), EraValidateError> {
// let mut header_records = Vec::new();
// let mut append_flag = false;
// let mut buf = String::new();

// while let Ok(hrwn) = receive_message(&mut reader) {
// buf.clear();

// log::info!("{:?}", hrwn.block_hash);
// if header_records.len() == 0 {
// if hrwn.block_number % MAX_EPOCH_SIZE as u64 == 0 {
// let epoch = hrwn.block_number as usize / MAX_EPOCH_SIZE;
// log::info!("Validating epoch: {}", epoch);
// append_flag = true;
// }
// }
// if append_flag == true {
// let header_record = HeaderRecord {
// block_hash: H256::from_slice(&hrwn.block_hash),
// total_difficulty: U256::try_from(hrwn.total_difficulty.as_slice())
// .map_err(|_| EraValidateError::TotalDifficultyDecodeError)?,
// };
// header_records.push(header_record);
// }

// if header_records.len() == MAX_EPOCH_SIZE {
// let epoch = hrwn.block_number as usize / MAX_EPOCH_SIZE;
// let epoch_accumulator = compute_epoch_accumulator(&header_records)?;
// if epoch_accumulator.tree_hash_root().0 != master_accumulator.historical_epochs[epoch].0
// {
// Err(EraValidateError::EraAccumulatorMismatch)?;
// }
// log::info!("Validated epoch: {}", epoch);
// writer
// .write_all(format!("Validated epoch: {}\n", epoch).as_bytes())
// .map_err(|_| EraValidateError::JsonError)?;
// header_records.clear();
// }
// }

// log::info!("Read {} block headers from stdin", header_records.len());
// Ok(())
// }

// TODO: this functionality should be moved to flat_head
// fn receive_message<R: Read>(reader: &mut R) -> Result<HeaderRecordWithNumber, Box<dyn Error>> {
// let mut size_buf = [0u8; 4];
// if reader.read_exact(&mut size_buf).is_err() {
// return Err(Box::new(bincode::ErrorKind::Io(std::io::Error::new(
// std::io::ErrorKind::UnexpectedEof,
// "Failed to read size",
// ))));
// }

// let size = u32::from_be_bytes(size_buf) as usize;
// println!("size: {:?}", size);

// let mut buf = vec![0u8; size];
// reader.read_exact(&mut buf)?;
// let hrwn: HeaderRecordWithNumber = bincode::deserialize(&buf)?;

// println!(" decoding {:?}", hrwn);
// Ok(hrwn)
// }

// Test function

// TODO: move this test to flat_head
// #[cfg(test)]
// mod tests {
// use std::{fs::File, io, path::PathBuf};

// use super::*;

// #[test]
// fn test_receive_message_from_file() -> Result<(), Box<dyn Error>> {
// let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
// path.push("tests/ethereum_firehose_first_8200/0000000000.dbin"); // Adjust the path as needed

// let file = File::open(path)?;
// let mut reader = io::BufReader::new(file);

// let result = receive_message(&mut reader)?;

// println!("block: {:?}", result.block_hash);

// Ok(())
// }
// }
Loading
Loading