Skip to content

Commit

Permalink
refactor: split census into multiple types
Browse files Browse the repository at this point in the history
  • Loading branch information
morph-dev committed Oct 6, 2024
1 parent e5c3931 commit aa32159
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 220 deletions.
31 changes: 15 additions & 16 deletions portal-bridge/src/bridge/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ use alloy_rlp::Decodable;
use eth_trie::{decode_node, node::Node, RootWithTrieDiff};
use ethportal_api::{
jsonrpsee::http_client::HttpClient,
types::{portal_wire::OfferTrace, state_trie::account_state::AccountState as AccountStateInfo},
types::{
network::Subnetwork, portal_wire::OfferTrace,
state_trie::account_state::AccountState as AccountStateInfo,
},
ContentValue, Enr, OverlayContentKey, StateContentKey, StateContentValue,
StateNetworkApiClient,
};
use revm::Database;
use revm_primitives::{keccak256, Bytecode, SpecId, B256};
use tokio::{
sync::{mpsc, OwnedSemaphorePermit, Semaphore},
sync::{OwnedSemaphorePermit, Semaphore},
time::timeout,
};
use tracing::{debug, enabled, error, info, warn, Level};
Expand All @@ -32,7 +35,7 @@ use trin_utils::dir::create_temp_dir;

use crate::{
bridge::history::SERVE_BLOCK_TIMEOUT,
census::{ContentKey, EnrsRequest},
census::Census,
cli::BridgeId,
types::mode::{BridgeMode, ModeType},
};
Expand All @@ -44,8 +47,8 @@ pub struct StateBridge {
// Semaphore used to limit the amount of active offer transfers
// to make sure we don't overwhelm the trin client
offer_semaphore: Arc<Semaphore>,
// Used to request all interested enrs in the network from census process.
census_tx: mpsc::UnboundedSender<EnrsRequest>,
// Used to request all interested enrs in the network.
census: Census,
// Global offer report for tallying total performance of state bridge
global_offer_report: Arc<Mutex<GlobalOfferReport>>,
// Bridge id used to determine which content keys to gossip
Expand All @@ -57,7 +60,7 @@ impl StateBridge {
mode: BridgeMode,
portal_client: HttpClient,
offer_limit: usize,
census_tx: mpsc::UnboundedSender<EnrsRequest>,
census: Census,
bridge_id: BridgeId,
) -> anyhow::Result<Self> {
let metrics = BridgeMetricsReporter::new("state".to_string(), &format!("{mode:?}"));
Expand All @@ -68,7 +71,7 @@ impl StateBridge {
portal_client,
metrics,
offer_semaphore,
census_tx,
census,
global_offer_report: Arc::new(Mutex::new(global_offer_report)),
bridge_id,
})
Expand Down Expand Up @@ -284,14 +287,10 @@ impl StateBridge {
}

// request enrs interested in the content key from Census
async fn request_enrs(&self, content_key: &StateContentKey) -> anyhow::Result<Vec<Enr>> {
let (resp_tx, resp_rx) = futures::channel::oneshot::channel();
let enrs_request = EnrsRequest {
content_key: ContentKey::State(content_key.clone()),
resp_tx,
};
self.census_tx.send(enrs_request)?;
Ok(resp_rx.await?)
fn request_enrs(&self, content_key: &StateContentKey) -> anyhow::Result<Vec<Enr>> {
Ok(self
.census
.get_interested_enrs(Subnetwork::State, &content_key.content_id())?)
}

// spawn individual offer tasks of the content key for each interested enr found in Census
Expand All @@ -300,7 +299,7 @@ impl StateBridge {
content_key: StateContentKey,
content_value: StateContentValue,
) {
let Ok(enrs) = self.request_enrs(&content_key).await else {
let Ok(enrs) = self.request_enrs(&content_key) else {
error!("Failed to request enrs for content key, skipping offer: {content_key:?}");
return;
};
Expand Down
155 changes: 70 additions & 85 deletions portal-bridge/src/census/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use ethportal_api::{
jsonrpsee::http_client::HttpClient, types::network::Subnetwork, BeaconContentKey, Enr,
HistoryContentKey, OverlayContentKey, StateContentKey,
};
use futures::{channel::oneshot, StreamExt};
use std::collections::HashSet;

use ethportal_api::{jsonrpsee::http_client::HttpClient, types::network::Subnetwork, Enr};
use thiserror::Error;
use tokio::sync::mpsc;
use tracing::{error, info};
use tokio::task::JoinHandle;
use tracing::{error, info, Instrument};

use crate::cli::BridgeConfig;
use network::Network;

mod network;
mod peers;

/// The error that occured in [Census].
#[derive(Error, Debug)]
Expand All @@ -19,20 +18,10 @@ pub enum CensusError {
NoPeers,
#[error("Failed to initialize Census")]
FailedInitialization,
}

/// The request for ENRs that should be offered the content.
pub struct EnrsRequest {
pub content_key: ContentKey,
pub resp_tx: oneshot::Sender<Vec<Enr>>,
}

/// The enum for network specific content key
#[derive(Debug, Clone)]
pub enum ContentKey {
History(HistoryContentKey),
State(StateContentKey),
Beacon(BeaconContentKey),
#[error("Subnetwork {0} is not supported")]
UnsupportedSubnetwork(Subnetwork),
#[error("Census already initialized")]
AlreadyInitialized,
}

/// The maximum number of enrs to return in a response,
Expand All @@ -42,91 +31,87 @@ pub const ENR_OFFER_LIMIT: usize = 4;

/// The census is responsible for maintaining a list of known peers in the network,
/// checking their liveness, updating their data radius, iterating through their
/// rfn to find new peers, and providing interested enrs for a given content key.
/// rfn to find new peers, and providing interested enrs for a given content id.
pub struct Census {
history: Network,
state: Network,
beacon: Network,
census_rx: mpsc::UnboundedReceiver<EnrsRequest>,
initialized: bool,
}

impl Census {
pub fn new(
client: HttpClient,
census_rx: mpsc::UnboundedReceiver<EnrsRequest>,
bridge_config: &BridgeConfig,
) -> Self {
pub fn new(client: HttpClient, bridge_config: &BridgeConfig) -> Self {
Self {
history: Network::new(client.clone(), Subnetwork::History, bridge_config),
state: Network::new(client.clone(), Subnetwork::State, bridge_config),
beacon: Network::new(client.clone(), Subnetwork::Beacon, bridge_config),
census_rx,
initialized: false,
}
}
}

impl Census {
pub async fn init(&mut self) -> Result<(), CensusError> {
// currently, the census is only initialized for the state network
// only initialized networks will yield inside `run()` loop
self.state.init().await;
if self.state.peers.is_empty() {
return Err(CensusError::FailedInitialization);
/// Returns ENRs interested into provided content id.
pub fn get_interested_enrs(
&self,
subnetwork: Subnetwork,
content_id: &[u8; 32],
) -> Result<Vec<Enr>, CensusError> {
match subnetwork {
Subnetwork::History => self.history.get_interested_enrs(content_id),
Subnetwork::State => self.state.get_interested_enrs(content_id),
Subnetwork::Beacon => self.beacon.get_interested_enrs(content_id),
_ => Err(CensusError::UnsupportedSubnetwork(subnetwork)),
}
Ok(())
}

pub async fn run(&mut self) {
loop {
// Randomly selects between what available task is ready
// and executes it. Ensures that the census will continue
// to update while it handles a stream of enr requests.
tokio::select! {
// handle enrs request
Some(request) = self.census_rx.recv() => {
match self.get_interested_enrs(request.content_key) {
Ok(enrs) => {
if let Err(err) = request.resp_tx.send(enrs) {
error!("Error sending enrs response: {err:?}");
}
}
Err(_) => {
error!("No peers found in census, restarting initialization.");
self.state.init().await;
if let Err(err) = request.resp_tx.send(Vec::new()) {
error!("Error sending enrs response: {err:?}");
}
}
}
}
Some(Ok(known_enr)) = self.history.peers.next() => {
self.history.process_enr(known_enr.1.0).await;
info!("Updated history census: found peers: {}", self.history.peers.len());
}
// yield next known state peer and ping for liveness
Some(Ok(known_enr)) = self.state.peers.next() => {
self.state.process_enr(known_enr.1.0).await;
info!("Updated state census: found peers: {}", self.state.peers.len());
}
Some(Ok(known_enr)) = self.beacon.peers.next() => {
self.beacon.process_enr(known_enr.1.0).await;
info!("Updated beacon census: found peers: {}", self.beacon.peers.len());
}
/// Initialize subnetworks and starts background service that will keep our view of the network
/// up to date.
///
/// Returns JoinHandle of the background service.
pub async fn init(
&mut self,
subnetworks: impl IntoIterator<Item = Subnetwork>,
) -> Result<JoinHandle<()>, CensusError> {
if self.initialized {
return Err(CensusError::AlreadyInitialized);
}
self.initialized = true;

let subnetworks = HashSet::from_iter(subnetworks);
for subnetwork in &subnetworks {
info!("Initializing {subnetwork} subnetwork");
match subnetwork {
Subnetwork::History => self.history.init().await?,
Subnetwork::State => self.state.init().await?,
Subnetwork::Beacon => self.beacon.init().await?,
_ => return Err(CensusError::UnsupportedSubnetwork(*subnetwork)),
}
}

Ok(self.start_background_service(subnetworks))
}

pub fn get_interested_enrs(&self, content_key: ContentKey) -> Result<Vec<Enr>, CensusError> {
match content_key {
ContentKey::History(content_key) => {
self.history.get_interested_enrs(content_key.content_id())
}
ContentKey::State(content_key) => {
self.state.get_interested_enrs(content_key.content_id())
}
ContentKey::Beacon(content_key) => {
self.beacon.get_interested_enrs(content_key.content_id())
/// Starts background service that is responsible for keeping view of the network up to date.
///
/// Selects available tasks and runs them. Tasks are provided by enabled subnetworks.
fn start_background_service(&self, subnetworks: HashSet<Subnetwork>) -> JoinHandle<()> {
let mut history_network = self.history.clone();
let mut state_network = self.state.clone();
let mut beacon_network = self.beacon.clone();
let service = async move {
loop {
tokio::select! {
peer = history_network.peer_to_process(), if subnetworks.contains(&Subnetwork::History) => {
history_network.process_peer(peer).await;
}
peer = state_network.peer_to_process(), if subnetworks.contains(&Subnetwork::State) => {
state_network.process_peer(peer).await;
}
peer = beacon_network.peer_to_process(), if subnetworks.contains(&Subnetwork::Beacon) => {
beacon_network.process_peer(peer).await;
}
}
}
}
};
tokio::spawn(service.instrument(tracing::trace_span!("census").or_current()))
}
}
Loading

0 comments on commit aa32159

Please sign in to comment.