Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: split census into multiple types #1510

Merged
merged 3 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions portal-bridge/src/bridge/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ use alloy_rlp::Decodable;
use eth_trie::{decode_node, node::Node, RootWithTrieDiff};
use ethportal_api::{
jsonrpsee::http_client::HttpClient,
types::{portal_wire::OfferTrace, state_trie::account_state::AccountState as AccountStateInfo},
types::{
network::Subnetwork, portal_wire::OfferTrace,
state_trie::account_state::AccountState as AccountStateInfo,
},
ContentValue, Enr, OverlayContentKey, StateContentKey, StateContentValue,
StateNetworkApiClient,
};
use revm::Database;
use revm_primitives::{keccak256, Bytecode, SpecId, B256};
use tokio::{
sync::{mpsc, OwnedSemaphorePermit, Semaphore},
sync::{OwnedSemaphorePermit, Semaphore},
time::timeout,
};
use tracing::{debug, enabled, error, info, warn, Level};
Expand All @@ -32,7 +35,7 @@ use trin_utils::dir::create_temp_dir;

use crate::{
bridge::history::SERVE_BLOCK_TIMEOUT,
census::{ContentKey, EnrsRequest},
census::Census,
cli::BridgeId,
types::mode::{BridgeMode, ModeType},
};
Expand All @@ -44,8 +47,8 @@ pub struct StateBridge {
// Semaphore used to limit the amount of active offer transfers
// to make sure we don't overwhelm the trin client
offer_semaphore: Arc<Semaphore>,
// Used to request all interested enrs in the network from census process.
census_tx: mpsc::UnboundedSender<EnrsRequest>,
// Used to request all interested enrs in the network.
census: Census,
// Global offer report for tallying total performance of state bridge
global_offer_report: Arc<Mutex<GlobalOfferReport>>,
// Bridge id used to determine which content keys to gossip
Expand All @@ -57,7 +60,7 @@ impl StateBridge {
mode: BridgeMode,
portal_client: HttpClient,
offer_limit: usize,
census_tx: mpsc::UnboundedSender<EnrsRequest>,
census: Census,
bridge_id: BridgeId,
) -> anyhow::Result<Self> {
let metrics = BridgeMetricsReporter::new("state".to_string(), &format!("{mode:?}"));
Expand All @@ -68,7 +71,7 @@ impl StateBridge {
portal_client,
metrics,
offer_semaphore,
census_tx,
census,
global_offer_report: Arc::new(Mutex::new(global_offer_report)),
bridge_id,
})
Expand Down Expand Up @@ -284,14 +287,10 @@ impl StateBridge {
}

// request enrs interested in the content key from Census
async fn request_enrs(&self, content_key: &StateContentKey) -> anyhow::Result<Vec<Enr>> {
let (resp_tx, resp_rx) = futures::channel::oneshot::channel();
let enrs_request = EnrsRequest {
content_key: ContentKey::State(content_key.clone()),
resp_tx,
};
self.census_tx.send(enrs_request)?;
Ok(resp_rx.await?)
fn request_enrs(&self, content_key: &StateContentKey) -> anyhow::Result<Vec<Enr>> {
Ok(self
.census
.get_interested_enrs(Subnetwork::State, &content_key.content_id())?)
}

// spawn individual offer tasks of the content key for each interested enr found in Census
Expand All @@ -300,7 +299,7 @@ impl StateBridge {
content_key: StateContentKey,
content_value: StateContentValue,
) {
let Ok(enrs) = self.request_enrs(&content_key).await else {
let Ok(enrs) = self.request_enrs(&content_key) else {
error!("Failed to request enrs for content key, skipping offer: {content_key:?}");
return;
};
Expand Down
155 changes: 70 additions & 85 deletions portal-bridge/src/census/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use ethportal_api::{
jsonrpsee::http_client::HttpClient, types::network::Subnetwork, BeaconContentKey, Enr,
HistoryContentKey, OverlayContentKey, StateContentKey,
};
use futures::{channel::oneshot, StreamExt};
use std::collections::HashSet;

use ethportal_api::{jsonrpsee::http_client::HttpClient, types::network::Subnetwork, Enr};
use thiserror::Error;
use tokio::sync::mpsc;
use tracing::{error, info};
use tokio::task::JoinHandle;
use tracing::{error, info, Instrument};

use crate::cli::BridgeConfig;
use network::Network;

mod network;
mod peers;

/// The error that occured in [Census].
#[derive(Error, Debug)]
Expand All @@ -19,20 +18,10 @@ pub enum CensusError {
NoPeers,
#[error("Failed to initialize Census")]
FailedInitialization,
}

/// The request for ENRs that should be offered the content.
pub struct EnrsRequest {
pub content_key: ContentKey,
pub resp_tx: oneshot::Sender<Vec<Enr>>,
}

/// The enum for network specific content key
#[derive(Debug, Clone)]
pub enum ContentKey {
History(HistoryContentKey),
State(StateContentKey),
Beacon(BeaconContentKey),
#[error("Subnetwork {0} is not supported")]
UnsupportedSubnetwork(Subnetwork),
#[error("Census already initialized")]
AlreadyInitialized,
}

/// The maximum number of enrs to return in a response,
Expand All @@ -42,91 +31,87 @@ pub const ENR_OFFER_LIMIT: usize = 4;

/// The census is responsible for maintaining a list of known peers in the network,
/// checking their liveness, updating their data radius, iterating through their
/// rfn to find new peers, and providing interested enrs for a given content key.
/// rfn to find new peers, and providing interested enrs for a given content id.
pub struct Census {
history: Network,
state: Network,
beacon: Network,
census_rx: mpsc::UnboundedReceiver<EnrsRequest>,
initialized: bool,
}

impl Census {
pub fn new(
client: HttpClient,
census_rx: mpsc::UnboundedReceiver<EnrsRequest>,
bridge_config: &BridgeConfig,
) -> Self {
pub fn new(client: HttpClient, bridge_config: &BridgeConfig) -> Self {
Self {
history: Network::new(client.clone(), Subnetwork::History, bridge_config),
state: Network::new(client.clone(), Subnetwork::State, bridge_config),
beacon: Network::new(client.clone(), Subnetwork::Beacon, bridge_config),
census_rx,
initialized: false,
}
}
}

impl Census {
pub async fn init(&mut self) -> Result<(), CensusError> {
// currently, the census is only initialized for the state network
// only initialized networks will yield inside `run()` loop
self.state.init().await;
if self.state.peers.is_empty() {
return Err(CensusError::FailedInitialization);
/// Returns ENRs interested into provided content id.
pub fn get_interested_enrs(
morph-dev marked this conversation as resolved.
Show resolved Hide resolved
&self,
subnetwork: Subnetwork,
content_id: &[u8; 32],
) -> Result<Vec<Enr>, CensusError> {
match subnetwork {
Subnetwork::History => self.history.get_interested_enrs(content_id),
Subnetwork::State => self.state.get_interested_enrs(content_id),
Subnetwork::Beacon => self.beacon.get_interested_enrs(content_id),
_ => Err(CensusError::UnsupportedSubnetwork(subnetwork)),
}
Ok(())
}

pub async fn run(&mut self) {
loop {
// Randomly selects between what available task is ready
// and executes it. Ensures that the census will continue
// to update while it handles a stream of enr requests.
tokio::select! {
// handle enrs request
Some(request) = self.census_rx.recv() => {
match self.get_interested_enrs(request.content_key) {
Ok(enrs) => {
if let Err(err) = request.resp_tx.send(enrs) {
error!("Error sending enrs response: {err:?}");
}
}
Err(_) => {
error!("No peers found in census, restarting initialization.");
self.state.init().await;
if let Err(err) = request.resp_tx.send(Vec::new()) {
error!("Error sending enrs response: {err:?}");
}
}
}
}
Some(Ok(known_enr)) = self.history.peers.next() => {
self.history.process_enr(known_enr.1.0).await;
info!("Updated history census: found peers: {}", self.history.peers.len());
}
// yield next known state peer and ping for liveness
Some(Ok(known_enr)) = self.state.peers.next() => {
self.state.process_enr(known_enr.1.0).await;
info!("Updated state census: found peers: {}", self.state.peers.len());
}
Some(Ok(known_enr)) = self.beacon.peers.next() => {
self.beacon.process_enr(known_enr.1.0).await;
info!("Updated beacon census: found peers: {}", self.beacon.peers.len());
}
/// Initialize subnetworks and starts background service that will keep our view of the network
/// up to date.
///
/// Returns JoinHandle of the background service.
pub async fn init(
&mut self,
subnetworks: impl IntoIterator<Item = Subnetwork>,
) -> Result<JoinHandle<()>, CensusError> {
if self.initialized {
return Err(CensusError::AlreadyInitialized);
}
self.initialized = true;

let subnetworks = HashSet::from_iter(subnetworks);
for subnetwork in &subnetworks {
info!("Initializing {subnetwork} subnetwork");
match subnetwork {
Subnetwork::History => self.history.init().await?,
Subnetwork::State => self.state.init().await?,
Subnetwork::Beacon => self.beacon.init().await?,
_ => return Err(CensusError::UnsupportedSubnetwork(*subnetwork)),
}
}

Ok(self.start_background_service(subnetworks))
}

pub fn get_interested_enrs(&self, content_key: ContentKey) -> Result<Vec<Enr>, CensusError> {
match content_key {
ContentKey::History(content_key) => {
self.history.get_interested_enrs(content_key.content_id())
}
ContentKey::State(content_key) => {
self.state.get_interested_enrs(content_key.content_id())
}
ContentKey::Beacon(content_key) => {
self.beacon.get_interested_enrs(content_key.content_id())
/// Starts background service that is responsible for keeping view of the network up to date.
///
/// Selects available tasks and runs them. Tasks are provided by enabled subnetworks.
fn start_background_service(&self, subnetworks: HashSet<Subnetwork>) -> JoinHandle<()> {
let mut history_network = self.history.clone();
let mut state_network = self.state.clone();
let mut beacon_network = self.beacon.clone();
let service = async move {
loop {
tokio::select! {
peer = history_network.peer_to_process(), if subnetworks.contains(&Subnetwork::History) => {
history_network.process_peer(peer).await;
}
peer = state_network.peer_to_process(), if subnetworks.contains(&Subnetwork::State) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit is it worth adding an else here with a panic/error!? maybe not bc it'll get a bit convoluted, but then if we don't need an else do we need this if? there shouldn't be a case where we try to process a peer from an unactivated subnetwork... but this is very much a nit, fine to leave as is

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be misunderstanding.

The if is not part of the regular code flow, but the special syntax for tokio::select! (more details here), and there isn't an option for else.
Unless you were thinking about else case of the tokio::select! statement, but then the comment doesn't make sense because code would already panic if else is not present and code execution would go there.
With the current implementation, this can happen if no subnetworks were provided in the init function, and now I added a check for that as well.

The if statements are needed with the current implementation because peer_to_process() would return None for uninitialized networks, forcing them to re-initialize and we don't want that for non-enabled subnetworks.

The original implementation was using selectors like this:

Some(Ok(known_enr)) = self.history.peers.next() => { ... }

(basically ignoring those None values) and re-initialization would be triggered when get_interested_enrs() would fail, causing that request to take much longer but still return empty set of enrs.

I believe that re-initialization should be detected and done as part of the regular census maintenance (and not be triggered by get_interested_enrs), and this is the cleanest way I found to do it.

I'm not sure if my explanation is clear. I think good understanding of what tokio::select! does under the hood is very important here, and I'm not sure how well I was able to explain my reasoning around it.

state_network.process_peer(peer).await;
}
peer = beacon_network.peer_to_process(), if subnetworks.contains(&Subnetwork::Beacon) => {
beacon_network.process_peer(peer).await;
}
}
}
}
};
tokio::spawn(service.instrument(tracing::trace_span!("census").or_current()))
}
}
Loading
Loading