Skip to content

Commit

Permalink
touch-ups
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo committed Oct 17, 2024
1 parent fbd1f20 commit 83b942d
Showing 1 changed file with 33 additions and 124 deletions.
157 changes: 33 additions & 124 deletions crates/storage/provider/src/providers/blockchain_provider3.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
use super::{DatabaseProviderRO, ProviderFactory, ProviderNodeTypes};
use crate::{
providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader,
BlockReader, BlockReaderIdExt, BlockSource, CanonChainTracker, CanonStateNotifications,
CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, EvmEnvProvider, HeaderProvider,
ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, RequestsProvider,
StageCheckpointReader, StateProviderBox, StateProviderFactory, StateReader,
BlockReader, BlockReaderIdExt, BlockSource, ChainSpecProvider, ChangeSetReader, EvmEnvProvider,
HeaderProvider, ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
RequestsProvider, StageCheckpointReader, StateProviderBox, StateProviderFactory, StateReader,
StaticFileProviderFactory, TransactionVariant, TransactionsProvider, WithdrawalsProvider,
};
use alloy_eips::{BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag, HashOrNumber};
use alloy_primitives::{Address, BlockHash, BlockNumber, Sealable, TxHash, TxNumber, B256, U256};
use alloy_rpc_types_engine::ForkchoiceState;
use parking_lot::RwLock;
use reth_chain_state::{
BlockState, CanonicalInMemoryState, ForkChoiceNotifications, ForkChoiceSubscriptions,
MemoryOverlayStateProvider,
};
use reth_chain_state::{BlockState, CanonicalInMemoryState, MemoryOverlayStateProvider};
use reth_chainspec::{ChainInfo, EthereumHardforks};
use reth_db::models::BlockNumberAddress;
use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices};
Expand All @@ -37,7 +32,6 @@ use std::{
collections::{hash_map, HashMap},
ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
sync::Arc,
time::Instant,
};
use tracing::trace;

Expand All @@ -60,23 +54,38 @@ pub struct BlockchainProvider3<N: ProviderNodeTypes> {
}

impl<N: ProviderNodeTypes> BlockchainProvider3<N> {
/// Create a new provider using only the database, fetching the latest header from
/// the database to initialize the provider.
/// Create a new provider using [`ProviderFactory`] and [`CanonicalInMemoryState`],
///
/// Underneath it will take a snapshot by fetching [`CanonicalInMemoryState::head_state`] and
/// [`ProviderFactory::database_provider_ro`] effectively maintaining one single snapshotted
/// view of memory and database.
pub fn new(
storage_provider_factory: ProviderFactory<N>,
state: CanonicalInMemoryState,
) -> ProviderResult<Self> {
// TODO(joshie): insert doc about order
let head_block = state.head_state(); // TODO error;
// Each one provides a snapshot at the time of instantiation, but its order matters.
//
// If we acquire first the database provider, it's possible that before the in-memory chain
// snapshot is instantiated, it will flush blocks to disk. This would
// mean that our database provider would not have access to the flushed blocks (since it's
// working under an older view), while the in-memory state may have deleted them
// entirely. Resulting in gaps on the range.
let head_block = state.head_state();
let storage_provider = storage_provider_factory.database_provider_ro()?;
Ok(Self {
storage_provider: storage_provider_factory.database_provider_ro()?,
storage_provider,
storage_provider_factory,
head_block,
canonical_in_memory_state: state,
memory_chain: RwLock::new(Arc::new(vec![])),
})
}

/// Returns a vector of in-memory blocks.
///
/// If hasn't been requested yet in this provider, it also stores it in [`Self`].
///
/// The blocks are ordered from newest to oldest (highest to lowest).
fn in_memory_chain(&self) -> Arc<Vec<Arc<BlockState>>> {
let chain = self.memory_chain.read();
if let Some(head_block) = &self.head_block {
Expand Down Expand Up @@ -491,8 +500,6 @@ impl<N: ProviderNodeTypes> BlockchainProvider3<N> {
S: FnOnce(&DatabaseProviderRO<N::DB, N::ChainSpec>) -> ProviderResult<Option<R>>,
M: Fn(usize, TxNumber, &BlockState) -> ProviderResult<Option<R>>,
{
// Order of instantiation matters. More information on:
// `get_in_memory_or_storage_by_block_range_while`.
let in_mem_chain = self.in_memory_chain();
let provider = &self.storage_provider;

Expand Down Expand Up @@ -715,12 +722,12 @@ impl<N: ProviderNodeTypes> BlockHashReader for BlockchainProvider3<N> {

impl<N: ProviderNodeTypes> BlockNumReader for BlockchainProvider3<N> {
fn chain_info(&self) -> ProviderResult<ChainInfo> {
Ok(self.canonical_in_memory_state.chain_info())
let best_number = self.best_block_number()?;
Ok(ChainInfo { best_hash: self.block_hash(best_number)?.unwrap_or_default(), best_number })
}

fn best_block_number(&self) -> ProviderResult<BlockNumber> {
// TODO(joshie): self.head?
Ok(self.canonical_in_memory_state.get_canonical_block_number())
self.head_block.as_ref().map(|b| Ok(b.number())).unwrap_or_else(|| self.last_block_number())
}

fn last_block_number(&self) -> ProviderResult<BlockNumber> {
Expand Down Expand Up @@ -1376,37 +1383,6 @@ impl<N: ProviderNodeTypes> StateProviderFactory for BlockchainProvider3<N> {
}
}

impl<N: ProviderNodeTypes> CanonChainTracker for BlockchainProvider3<N> {
fn on_forkchoice_update_received(&self, _update: &ForkchoiceState) {
// update timestamp
self.canonical_in_memory_state.on_forkchoice_update_received();
}

fn last_received_update_timestamp(&self) -> Option<Instant> {
self.canonical_in_memory_state.last_received_update_timestamp()
}

fn on_transition_configuration_exchanged(&self) {
self.canonical_in_memory_state.on_transition_configuration_exchanged();
}

fn last_exchanged_transition_configuration_timestamp(&self) -> Option<Instant> {
self.canonical_in_memory_state.last_exchanged_transition_configuration_timestamp()
}

fn set_canonical_head(&self, header: SealedHeader) {
self.canonical_in_memory_state.set_canonical_head(header);
}

fn set_safe(&self, header: SealedHeader) {
self.canonical_in_memory_state.set_safe(header);
}

fn set_finalized(&self, header: SealedHeader) {
self.canonical_in_memory_state.set_finalized(header);
}
}

impl<N: ProviderNodeTypes> BlockReaderIdExt for BlockchainProvider3<N> {
fn block_by_id(&self, id: BlockId) -> ProviderResult<Option<Block>> {
match id {
Expand Down Expand Up @@ -1506,24 +1482,6 @@ impl<N: ProviderNodeTypes> BlockReaderIdExt for BlockchainProvider3<N> {
}
}

impl<N: ProviderNodeTypes> CanonStateSubscriptions for BlockchainProvider3<N> {
fn subscribe_to_canonical_state(&self) -> CanonStateNotifications {
self.canonical_in_memory_state.subscribe_canon_state()
}
}

impl<N: ProviderNodeTypes> ForkChoiceSubscriptions for BlockchainProvider3<N> {
fn subscribe_safe_block(&self) -> ForkChoiceNotifications {
let receiver = self.canonical_in_memory_state.subscribe_safe_block();
ForkChoiceNotifications(receiver)
}

fn subscribe_finalized_block(&self) -> ForkChoiceNotifications {
let receiver = self.canonical_in_memory_state.subscribe_finalized_block();
ForkChoiceNotifications(receiver)
}
}

impl<N: ProviderNodeTypes> StorageChangeSetReader for BlockchainProvider3<N> {
fn storage_changeset(
&self,
Expand Down Expand Up @@ -1674,10 +1632,7 @@ mod tests {
use alloy_primitives::{BlockNumber, TxNumber, B256};
use itertools::Itertools;
use rand::Rng;
use reth_chain_state::{
test_utils::TestBlockBuilder, CanonStateNotification, CanonStateSubscriptions,
CanonicalInMemoryState, ExecutedBlock, NewCanonicalChain,
};
use reth_chain_state::{CanonicalInMemoryState, ExecutedBlock, NewCanonicalChain};
use reth_chainspec::{
ChainSpec, ChainSpecBuilder, ChainSpecProvider, EthereumHardfork, MAINNET,
};
Expand All @@ -1687,7 +1642,7 @@ mod tests {
};
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_errors::ProviderError;
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_execution_types::ExecutionOutcome;
use reth_primitives::{
Receipt, SealedBlock, StaticFileSegment, TransactionSignedNoHash, Withdrawals,
};
Expand Down Expand Up @@ -1815,7 +1770,6 @@ mod tests {
UnifiedStorageWriter::commit(provider_rw, factory.static_file_provider())?;

let provider_factory = BlockchainProviderFactory::new(factory)?;
let provider = provider_factory.provider()?;

// Insert the rest of the blocks and receipts into the in-memory state
let chain = NewCanonicalChain::Commit {
Expand All @@ -1837,7 +1791,7 @@ mod tests {
})
.collect(),
};
provider.canonical_in_memory_state.update_chain(chain);
provider_factory.canonical_in_memory_state.update_chain(chain);

// Get canonical, safe, and finalized blocks
let blocks = database_blocks.iter().chain(in_memory_blocks.iter()).collect::<Vec<_>>();
Expand All @@ -1847,9 +1801,9 @@ mod tests {
let finalized_block = blocks.get(block_count - 3).unwrap();

// Set the canonical head, safe, and finalized blocks
provider.set_canonical_head(canonical_block.header.clone());
provider.set_safe(safe_block.header.clone());
provider.set_finalized(finalized_block.header.clone());
provider_factory.set_canonical_head(canonical_block.header.clone());
provider_factory.set_safe(safe_block.header.clone());
provider_factory.set_finalized(finalized_block.header.clone());

Ok((provider_factory, database_blocks.clone(), in_memory_blocks.clone(), receipts))
}
Expand Down Expand Up @@ -2288,51 +2242,6 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_canon_state_subscriptions() -> eyre::Result<()> {
let factory = create_test_provider_factory();

// Generate a random block to initialise the blockchain provider.
let mut test_block_builder = TestBlockBuilder::default();
let block_1 = test_block_builder.generate_random_block(0, B256::ZERO);
let block_hash_1 = block_1.hash();

// Insert and commit the block.
let provider_rw = factory.provider_rw()?;
provider_rw.insert_historical_block(block_1)?;
provider_rw.commit()?;

let provider_factory = BlockchainProviderFactory::new(factory)?;
let provider = provider_factory.provider()?;

// Subscribe twice for canonical state updates.
let in_memory_state = provider.canonical_in_memory_state.clone();
let mut rx_1 = provider.subscribe_to_canonical_state();
let mut rx_2 = provider.subscribe_to_canonical_state();

// Send and receive commit notifications.
let block_2 = test_block_builder.generate_random_block(1, block_hash_1);
let chain = Chain::new(vec![block_2], ExecutionOutcome::default(), None);
let commit = CanonStateNotification::Commit { new: Arc::new(chain.clone()) };
in_memory_state.notify_canon_state(commit.clone());
let (notification_1, notification_2) = tokio::join!(rx_1.recv(), rx_2.recv());
assert_eq!(notification_1, Ok(commit.clone()));
assert_eq!(notification_2, Ok(commit.clone()));

// Send and receive re-org notifications.
let block_3 = test_block_builder.generate_random_block(1, block_hash_1);
let block_4 = test_block_builder.generate_random_block(2, block_3.hash());
let new_chain = Chain::new(vec![block_3, block_4], ExecutionOutcome::default(), None);
let re_org =
CanonStateNotification::Reorg { old: Arc::new(chain), new: Arc::new(new_chain) };
in_memory_state.notify_canon_state(re_org.clone());
let (notification_1, notification_2) = tokio::join!(rx_1.recv(), rx_2.recv());
assert_eq!(notification_1, Ok(re_org.clone()));
assert_eq!(notification_2, Ok(re_org.clone()));

Ok(())
}

#[test]
fn test_withdrawals_provider() -> eyre::Result<()> {
let mut rng = generators::rng();
Expand Down

0 comments on commit 83b942d

Please sign in to comment.