From 683ff1de583760c86e7a9672100c1dad8293e800 Mon Sep 17 00:00:00 2001 From: Milos Stankovic <82043364+morph-dev@users.noreply.github.com> Date: Sat, 5 Oct 2024 23:10:51 +0300 Subject: [PATCH] refactor: split census into multiple types --- portal-bridge/src/bridge/state.rs | 31 ++-- portal-bridge/src/census/mod.rs | 155 +++++++++----------- portal-bridge/src/census/network.rs | 211 ++++++++++++++-------------- portal-bridge/src/census/peers.rs | 103 ++++++++++++++ portal-bridge/src/main.rs | 21 +-- 5 files changed, 301 insertions(+), 220 deletions(-) create mode 100644 portal-bridge/src/census/peers.rs diff --git a/portal-bridge/src/bridge/state.rs b/portal-bridge/src/bridge/state.rs index c35e4a1a4..a71b443f4 100644 --- a/portal-bridge/src/bridge/state.rs +++ b/portal-bridge/src/bridge/state.rs @@ -4,14 +4,17 @@ use alloy_rlp::Decodable; use eth_trie::{decode_node, node::Node, RootWithTrieDiff}; use ethportal_api::{ jsonrpsee::http_client::HttpClient, - types::{portal_wire::OfferTrace, state_trie::account_state::AccountState as AccountStateInfo}, + types::{ + network::Subnetwork, portal_wire::OfferTrace, + state_trie::account_state::AccountState as AccountStateInfo, + }, ContentValue, Enr, OverlayContentKey, StateContentKey, StateContentValue, StateNetworkApiClient, }; use revm::Database; use revm_primitives::{keccak256, Bytecode, SpecId, B256}; use tokio::{ - sync::{mpsc, OwnedSemaphorePermit, Semaphore}, + sync::{OwnedSemaphorePermit, Semaphore}, time::timeout, }; use tracing::{debug, enabled, error, info, warn, Level}; @@ -32,7 +35,7 @@ use trin_utils::dir::create_temp_dir; use crate::{ bridge::history::SERVE_BLOCK_TIMEOUT, - census::{ContentKey, EnrsRequest}, + census::Census, cli::BridgeId, types::mode::{BridgeMode, ModeType}, }; @@ -44,8 +47,8 @@ pub struct StateBridge { // Semaphore used to limit the amount of active offer transfers // to make sure we don't overwhelm the trin client offer_semaphore: Arc, - // Used to request all interested enrs in the network from census process. - census_tx: mpsc::UnboundedSender, + // Used to request all interested enrs in the network. + census: Census, // Global offer report for tallying total performance of state bridge global_offer_report: Arc>, // Bridge id used to determine which content keys to gossip @@ -57,7 +60,7 @@ impl StateBridge { mode: BridgeMode, portal_client: HttpClient, offer_limit: usize, - census_tx: mpsc::UnboundedSender, + census: Census, bridge_id: BridgeId, ) -> anyhow::Result { let metrics = BridgeMetricsReporter::new("state".to_string(), &format!("{mode:?}")); @@ -68,7 +71,7 @@ impl StateBridge { portal_client, metrics, offer_semaphore, - census_tx, + census, global_offer_report: Arc::new(Mutex::new(global_offer_report)), bridge_id, }) @@ -284,14 +287,10 @@ impl StateBridge { } // request enrs interested in the content key from Census - async fn request_enrs(&self, content_key: &StateContentKey) -> anyhow::Result> { - let (resp_tx, resp_rx) = futures::channel::oneshot::channel(); - let enrs_request = EnrsRequest { - content_key: ContentKey::State(content_key.clone()), - resp_tx, - }; - self.census_tx.send(enrs_request)?; - Ok(resp_rx.await?) + fn request_enrs(&self, content_key: &StateContentKey) -> anyhow::Result> { + Ok(self + .census + .get_interested_enrs(Subnetwork::State, &content_key.content_id())?) } // spawn individual offer tasks of the content key for each interested enr found in Census @@ -300,7 +299,7 @@ impl StateBridge { content_key: StateContentKey, content_value: StateContentValue, ) { - let Ok(enrs) = self.request_enrs(&content_key).await else { + let Ok(enrs) = self.request_enrs(&content_key) else { error!("Failed to request enrs for content key, skipping offer: {content_key:?}"); return; }; diff --git a/portal-bridge/src/census/mod.rs b/portal-bridge/src/census/mod.rs index 9a779cf42..7d443725e 100644 --- a/portal-bridge/src/census/mod.rs +++ b/portal-bridge/src/census/mod.rs @@ -1,16 +1,15 @@ -use ethportal_api::{ - jsonrpsee::http_client::HttpClient, types::network::Subnetwork, BeaconContentKey, Enr, - HistoryContentKey, OverlayContentKey, StateContentKey, -}; -use futures::{channel::oneshot, StreamExt}; +use std::collections::HashSet; + +use ethportal_api::{jsonrpsee::http_client::HttpClient, types::network::Subnetwork, Enr}; use thiserror::Error; -use tokio::sync::mpsc; -use tracing::{error, info}; +use tokio::task::JoinHandle; +use tracing::{error, info, Instrument}; use crate::cli::BridgeConfig; use network::Network; mod network; +mod peers; /// The error that occured in [Census]. #[derive(Error, Debug)] @@ -19,20 +18,10 @@ pub enum CensusError { NoPeers, #[error("Failed to initialize Census")] FailedInitialization, -} - -/// The request for ENRs that should be offered the content. -pub struct EnrsRequest { - pub content_key: ContentKey, - pub resp_tx: oneshot::Sender>, -} - -/// The enum for network specific content key -#[derive(Debug, Clone)] -pub enum ContentKey { - History(HistoryContentKey), - State(StateContentKey), - Beacon(BeaconContentKey), + #[error("Subnetwork {0} is not supported")] + UnsupportedSubnetwork(Subnetwork), + #[error("Census already initialized")] + AlreadyInitialized, } /// The maximum number of enrs to return in a response, @@ -42,91 +31,87 @@ pub const ENR_OFFER_LIMIT: usize = 4; /// The census is responsible for maintaining a list of known peers in the network, /// checking their liveness, updating their data radius, iterating through their -/// rfn to find new peers, and providing interested enrs for a given content key. +/// rfn to find new peers, and providing interested enrs for a given content id. pub struct Census { history: Network, state: Network, beacon: Network, - census_rx: mpsc::UnboundedReceiver, + initialized: bool, } impl Census { - pub fn new( - client: HttpClient, - census_rx: mpsc::UnboundedReceiver, - bridge_config: &BridgeConfig, - ) -> Self { + pub fn new(client: HttpClient, bridge_config: &BridgeConfig) -> Self { Self { history: Network::new(client.clone(), Subnetwork::History, bridge_config), state: Network::new(client.clone(), Subnetwork::State, bridge_config), beacon: Network::new(client.clone(), Subnetwork::Beacon, bridge_config), - census_rx, + initialized: false, } } -} -impl Census { - pub async fn init(&mut self) -> Result<(), CensusError> { - // currently, the census is only initialized for the state network - // only initialized networks will yield inside `run()` loop - self.state.init().await; - if self.state.peers.is_empty() { - return Err(CensusError::FailedInitialization); + /// Returns ENRs interested into provided content id. + pub fn get_interested_enrs( + &self, + subnetwork: Subnetwork, + content_id: &[u8; 32], + ) -> Result, CensusError> { + match subnetwork { + Subnetwork::History => self.history.get_interested_enrs(content_id), + Subnetwork::State => self.state.get_interested_enrs(content_id), + Subnetwork::Beacon => self.beacon.get_interested_enrs(content_id), + _ => Err(CensusError::UnsupportedSubnetwork(subnetwork)), } - Ok(()) } - pub async fn run(&mut self) { - loop { - // Randomly selects between what available task is ready - // and executes it. Ensures that the census will continue - // to update while it handles a stream of enr requests. - tokio::select! { - // handle enrs request - Some(request) = self.census_rx.recv() => { - match self.get_interested_enrs(request.content_key) { - Ok(enrs) => { - if let Err(err) = request.resp_tx.send(enrs) { - error!("Error sending enrs response: {err:?}"); - } - } - Err(_) => { - error!("No peers found in census, restarting initialization."); - self.state.init().await; - if let Err(err) = request.resp_tx.send(Vec::new()) { - error!("Error sending enrs response: {err:?}"); - } - } - } - } - Some(Ok(known_enr)) = self.history.peers.next() => { - self.history.process_enr(known_enr.1.0).await; - info!("Updated history census: found peers: {}", self.history.peers.len()); - } - // yield next known state peer and ping for liveness - Some(Ok(known_enr)) = self.state.peers.next() => { - self.state.process_enr(known_enr.1.0).await; - info!("Updated state census: found peers: {}", self.state.peers.len()); - } - Some(Ok(known_enr)) = self.beacon.peers.next() => { - self.beacon.process_enr(known_enr.1.0).await; - info!("Updated beacon census: found peers: {}", self.beacon.peers.len()); - } + /// Initialize subnetworks and starts background service that will keep our view of the network + /// up to date. + /// + /// Returns JoinHandle of the background service. + pub async fn init( + &mut self, + subnetworks: impl IntoIterator, + ) -> Result, CensusError> { + if self.initialized { + return Err(CensusError::AlreadyInitialized); + } + self.initialized = true; + + let subnetworks = HashSet::from_iter(subnetworks); + for subnetwork in &subnetworks { + info!("Initializing {subnetwork} subnetwork"); + match subnetwork { + Subnetwork::History => self.history.init().await?, + Subnetwork::State => self.state.init().await?, + Subnetwork::Beacon => self.beacon.init().await?, + _ => return Err(CensusError::UnsupportedSubnetwork(*subnetwork)), } } + + Ok(self.run_background_service(subnetworks)) } - pub fn get_interested_enrs(&self, content_key: ContentKey) -> Result, CensusError> { - match content_key { - ContentKey::History(content_key) => { - self.history.get_interested_enrs(content_key.content_id()) - } - ContentKey::State(content_key) => { - self.state.get_interested_enrs(content_key.content_id()) - } - ContentKey::Beacon(content_key) => { - self.beacon.get_interested_enrs(content_key.content_id()) + /// Starts background service that is responsible for keeping view of the network up to date. + /// + /// Selects available tasks and runs them. Tasks are provided by enabled subnetworks. + fn run_background_service(&self, subnetworks: HashSet) -> JoinHandle<()> { + let mut history_network = self.history.clone(); + let mut state_network = self.state.clone(); + let mut beacon_network = self.beacon.clone(); + let task = async move { + loop { + tokio::select! { + peer = history_network.peer_to_process(), if subnetworks.contains(&Subnetwork::History) => { + history_network.process_peer(peer).await; + } + peer = state_network.peer_to_process(), if subnetworks.contains(&Subnetwork::State) => { + state_network.process_peer(peer).await; + } + peer = beacon_network.peer_to_process(), if subnetworks.contains(&Subnetwork::Beacon) => { + beacon_network.process_peer(peer).await; + } + } } - } + }; + tokio::spawn(task.instrument(tracing::trace_span!("census").or_current())) } } diff --git a/portal-bridge/src/census/network.rs b/portal-bridge/src/census/network.rs index f9247c511..82939fb1e 100644 --- a/portal-bridge/src/census/network.rs +++ b/portal-bridge/src/census/network.rs @@ -1,19 +1,11 @@ -use alloy_primitives::U256; -use anyhow::{anyhow, bail}; -use delay_map::HashMapDelay; +use anyhow::anyhow; use discv5::enr::NodeId; use ethportal_api::{ - generate_random_remote_enr, - jsonrpsee::http_client::HttpClient, - types::{ - distance::{Distance, Metric, XorMetric}, - network::Subnetwork, - portal::PongInfo, - }, + generate_random_remote_enr, jsonrpsee::http_client::HttpClient, types::network::Subnetwork, BeaconNetworkApiClient, Enr, HistoryNetworkApiClient, StateNetworkApiClient, }; -use rand::seq::IteratorRandom; -use tokio::time::{Duration, Instant}; +use futures::StreamExt; +use tokio::time::Instant; use tracing::{error, info, warn}; use crate::{ @@ -21,15 +13,13 @@ use crate::{ cli::{BridgeConfig, ClientType}, }; -/// Ping delay for liveness check of peers in census -/// One hour was chosen after 2mins was too slow, and can be adjusted -/// in the future based on performance -const LIVENESS_CHECK_DELAY: Duration = Duration::from_secs(3600); +use super::peers::Peers; /// The network struct is responsible for maintaining a list of known peers /// in the given subnetwork. +#[derive(Clone)] pub struct Network { - pub peers: HashMapDelay<[u8; 32], (Enr, Distance)>, + peers: Peers, client: HttpClient, subnetwork: Subnetwork, filter_clients: Vec, @@ -46,7 +36,7 @@ impl Network { } Self { - peers: HashMapDelay::new(LIVENESS_CHECK_DELAY), + peers: Peers::new(), client, subnetwork, filter_clients: bridge_config.filter_clients.to_vec(), @@ -54,14 +44,31 @@ impl Network { } } - // We initialize a network with a random rfn lookup to get an initial view of the network - // and then iterate through the rfn of each peer to find new peers. Since this initialization - // blocks the bridge's gossip feature, there is a tradeoff between the time taken to initialize - // the census and the time taken to start gossiping. In the future, we might consider updating - // the initialization process to be considered complete after it has found ~100% of the network - // peers. However, since the census continues to iterate through the peers after initialization, - // the initialization is just to reach a critical mass of peers so that gossip can begin. - pub async fn init(&mut self) { + // Look up all known interested enrs for a given content id + pub fn get_interested_enrs(&self, content_id: &[u8; 32]) -> Result, CensusError> { + if self.peers.is_empty() { + error!( + "No known peers in {} census, unable to offer.", + self.subnetwork + ); + return Err(CensusError::NoPeers); + } + Ok(self + .peers + .get_interested_enrs(content_id, self.enr_offer_limit)) + } + + /// Initialize the peers. + /// + /// We initialize a network with a random rfn lookup to get an initial view of the network + /// and then iterate through the rfn of each peer to find new peers. Since this initialization + /// blocks the bridge's gossip feature, there is a tradeoff between the time taken to initialize + /// the census and the time taken to start gossiping. In the future, we might consider updating + /// the initialization process to be considered complete after it has found ~100% of the network + /// peers. However, since the census continues to iterate through the peers after + /// initialization, the initialization is just to reach a critical mass of peers so that gossip + /// can begin. + pub async fn init(&self) -> Result<(), CensusError> { match self.filter_clients.is_empty() { true => info!("Initializing {} network census", self.subnetwork), false => info!( @@ -70,11 +77,12 @@ impl Network { ), } let (_, random_enr) = generate_random_remote_enr(); - let Ok(initial_enrs) = self - .recursive_find_nodes(&self.client, random_enr.node_id()) - .await - else { - panic!("Failed to initialize network census"); + let Ok(initial_enrs) = self.recursive_find_nodes(random_enr.node_id()).await else { + error!( + "Failed to initialize {} census, RFN failed", + self.subnetwork + ); + return Err(CensusError::FailedInitialization); }; // if this initialization is too slow, we can consider @@ -84,44 +92,76 @@ impl Network { self.process_enr(enr).await; } if self.peers.is_empty() { - panic!( + error!( "Failed to initialize {} census, couldn't find any peers.", self.subnetwork ); + return Err(CensusError::FailedInitialization); } info!( "Initialized {} census: found peers: {}", self.subnetwork, self.peers.len() ); + Ok(()) + } + + /// Returns next peer to process. + pub async fn peer_to_process(&mut self) -> Option> { + self.peers.next().await + } + + /// Processes the peer. + /// + /// If no peer is found, reinitilizes the network. + pub async fn process_peer(&self, peer: Option>) { + let subnetwork = &self.subnetwork; + match peer { + Some(Ok(enr)) => { + self.process_enr(enr).await; + } + Some(Err(err)) => { + error!("Error getting peer to process for {subnetwork} subnetwork: {err}"); + } + None => { + warn!("No peers pending! Re-initializing {subnetwork} subnetwork"); + if let Err(err) = self.init().await { + error!("Error initializing {subnetwork} subnetwork: {err}"); + } + } + } } /// Only processes an enr (iterating through its rfn) if the enr's /// liveness delay has expired - pub async fn process_enr(&mut self, enr: Enr) { + async fn process_enr(&self, enr: Enr) { // ping for liveliness check if !self.liveness_check(enr.clone()).await { return; } // iterate peers routing table via rfn over various distances for distance in 245..257 { - let Ok(result) = self - .find_nodes(&self.client, enr.clone(), vec![distance]) - .await - else { + let Ok(result) = self.find_nodes(enr.clone(), vec![distance]).await else { warn!("Find nodes request failed for enr: {}", enr); continue; }; for found_enr in result { - let _ = self.liveness_check(found_enr).await; + self.liveness_check(found_enr).await; } } + info!( + "Updated {} census. Available peers: {}", + self.subnetwork, + self.peers.len(), + ); } - // Only perform liveness check on enrs if their deadline is up, - // since the same enr might appear multiple times between the - // routing tables of different peers. - pub async fn liveness_check(&mut self, enr: Enr) -> bool { + /// Performs liveness check. + /// + /// Liveness check will pass if + /// If they are registered but expired, we shouldn't perform liveness checked now as it will be + /// done when they are polled as expired (soon). + pub async fn liveness_check(&self, enr: Enr) -> bool { // skip if client type is filtered let client_type = ClientType::from(&enr); if self.filter_clients.contains(&client_type) { @@ -129,89 +169,54 @@ impl Network { } // if enr is already registered, check if delay map deadline has expired - if let Some(deadline) = self.peers.deadline(&enr.node_id().raw()) { + if let Some(deadline) = self.peers.deadline(&enr) { if Instant::now() < deadline { return false; } } - match self.ping(&self.client, enr.clone()).await { - Ok(pong_info) => { - let data_radius = Distance::from(U256::from(pong_info.data_radius)); - self.peers.insert(enr.node_id().raw(), (enr, data_radius)); - true - } - Err(_) => { - self.peers.remove(&enr.node_id().raw()); - false - } - } + self.ping(enr).await } - // Look up all known interested enrs for a given content id - pub fn get_interested_enrs(&self, content_id: [u8; 32]) -> Result, CensusError> { - if self.peers.is_empty() { - error!( - "No known peers in {} census, unable to offer.", - self.subnetwork - ); - return Err(CensusError::NoPeers); - } - Ok(self - .peers - .iter() - .filter_map(|(node_id, (enr, data_radius))| { - let distance = XorMetric::distance(node_id, &content_id); - if data_radius >= &distance { - Some(enr.clone()) - } else { - None - } - }) - .choose_multiple(&mut rand::thread_rng(), self.enr_offer_limit)) - } - - async fn ping(&self, client: &HttpClient, enr: Enr) -> anyhow::Result { - let result = match self.subnetwork { - Subnetwork::History => HistoryNetworkApiClient::ping(client, enr).await, - Subnetwork::State => StateNetworkApiClient::ping(client, enr).await, - Subnetwork::Beacon => BeaconNetworkApiClient::ping(client, enr).await, - _ => bail!("Unsupported subnetwork: {}", self.subnetwork), + async fn ping(&self, enr: Enr) -> bool { + let future_response = match self.subnetwork { + Subnetwork::History => HistoryNetworkApiClient::ping(&self.client, enr.clone()), + Subnetwork::State => StateNetworkApiClient::ping(&self.client, enr.clone()), + Subnetwork::Beacon => BeaconNetworkApiClient::ping(&self.client, enr.clone()), + _ => unreachable!("Unsupported subnetwork: {}", self.subnetwork), }; - result.map_err(|e| anyhow!(e)) + let response = future_response.await.map_err(|e| anyhow!(e)); + self.peers.process_ping_response(enr, response) } - async fn find_nodes( - &self, - client: &HttpClient, - enr: Enr, - distances: Vec, - ) -> anyhow::Result> { + async fn find_nodes(&self, enr: Enr, distances: Vec) -> anyhow::Result> { let result = match self.subnetwork { Subnetwork::History => { - HistoryNetworkApiClient::find_nodes(client, enr, distances).await + HistoryNetworkApiClient::find_nodes(&self.client, enr, distances).await + } + Subnetwork::State => { + StateNetworkApiClient::find_nodes(&self.client, enr, distances).await } - Subnetwork::State => StateNetworkApiClient::find_nodes(client, enr, distances).await, - Subnetwork::Beacon => BeaconNetworkApiClient::find_nodes(client, enr, distances).await, - _ => bail!("Unsupported subnetwork: {}", self.subnetwork), + Subnetwork::Beacon => { + BeaconNetworkApiClient::find_nodes(&self.client, enr, distances).await + } + _ => unreachable!("Unsupported subnetwork: {}", self.subnetwork), }; result.map_err(|e| anyhow!(e)) } - async fn recursive_find_nodes( - &self, - client: &HttpClient, - node_id: NodeId, - ) -> anyhow::Result> { + async fn recursive_find_nodes(&self, node_id: NodeId) -> anyhow::Result> { let result = match self.subnetwork { Subnetwork::History => { - HistoryNetworkApiClient::recursive_find_nodes(client, node_id).await + HistoryNetworkApiClient::recursive_find_nodes(&self.client, node_id).await + } + Subnetwork::State => { + StateNetworkApiClient::recursive_find_nodes(&self.client, node_id).await } - Subnetwork::State => StateNetworkApiClient::recursive_find_nodes(client, node_id).await, Subnetwork::Beacon => { - BeaconNetworkApiClient::recursive_find_nodes(client, node_id).await + BeaconNetworkApiClient::recursive_find_nodes(&self.client, node_id).await } - _ => bail!("Unsupported subnetwork: {}", self.subnetwork), + _ => unreachable!("Unsupported subnetwork: {}", self.subnetwork), }; result.map_err(|e| anyhow!(e)) } diff --git a/portal-bridge/src/census/peers.rs b/portal-bridge/src/census/peers.rs new file mode 100644 index 000000000..fcd3517de --- /dev/null +++ b/portal-bridge/src/census/peers.rs @@ -0,0 +1,103 @@ +use std::{ + pin::Pin, + sync::{Arc, RwLock}, + task::{Context, Poll}, + time::Duration, +}; + +use delay_map::HashMapDelay; +use ethportal_api::{ + types::{ + distance::{Distance, Metric, XorMetric}, + portal::PongInfo, + }, + Enr, +}; +use futures::Stream; +use rand::seq::IteratorRandom; +use tokio::time::Instant; + +/// Ping delay for liveness check of peers in census +/// One hour was chosen after 2mins was too slow, and can be adjusted +/// in the future based on performance +const LIVENESS_CHECK_DELAY: Duration = Duration::from_secs(3600); + +type PeersHashMapDelay = HashMapDelay<[u8; 32], (Enr, Distance)>; + +#[derive(Clone, Debug)] +pub(super) struct Peers { + pub peers: Arc>, +} + +impl Peers { + pub fn new() -> Self { + Self { + peers: Arc::new(RwLock::new(HashMapDelay::new(LIVENESS_CHECK_DELAY))), + } + } + + pub fn is_empty(&self) -> bool { + self.peers.read().expect("to get peers lock").is_empty() + } + + pub fn len(&self) -> usize { + self.peers.read().expect("to get peers lock").len() + } + + pub fn deadline(&self, enr: &Enr) -> Option { + self.peers + .read() + .expect("to get peers lock") + .deadline(&enr.node_id().raw()) + } + + pub fn process_ping_response(&self, enr: Enr, ping_response: anyhow::Result) -> bool { + let mut peers = self.peers.write().expect("to get peers lock"); + match ping_response { + Ok(pong_info) => { + let data_radius = Distance::from(pong_info.data_radius); + peers.insert(enr.node_id().raw(), (enr, data_radius)); + true + } + Err(_) => { + peers.remove(&enr.node_id().raw()); + false + } + } + } + + /// Selects random `limit` peers that should be interested into content. + pub fn get_interested_enrs(&self, content_id: &[u8; 32], limit: usize) -> Vec { + self.peers + .read() + .expect("to get peers lock") + .iter() + .filter_map(|(node_id, (enr, data_radius))| { + let distance = XorMetric::distance(node_id, content_id); + if data_radius >= &distance { + Some(enr.clone()) + } else { + None + } + }) + .choose_multiple(&mut rand::thread_rng(), limit) + } +} + +impl Stream for Peers { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.peers + .write() + .expect("to get peers lock") + .poll_expired(cx) + .map_ok(|(_node_id, (enr, _distance))| enr) + } +} + +impl Default for Peers { + fn default() -> Self { + Self::new() + } +} diff --git a/portal-bridge/src/main.rs b/portal-bridge/src/main.rs index a26193f5c..8cb1cadfe 100644 --- a/portal-bridge/src/main.rs +++ b/portal-bridge/src/main.rs @@ -1,8 +1,5 @@ use clap::Parser; -use tokio::{ - sync::mpsc, - time::{sleep, Duration}, -}; +use tokio::time::{sleep, Duration}; use tracing::Instrument; use ethportal_api::{ @@ -52,23 +49,15 @@ async fn main() -> Result<(), Box> { .portal_subnetworks .contains(&Subnetwork::State) { - // Initialize the census - let (census_tx, census_rx) = mpsc::unbounded_channel(); - let mut census = Census::new(portal_client.clone(), census_rx, &bridge_config); - // initialize the census to acquire critical threshold view of network before gossiping - census.init().await?; - census_handle = Some(tokio::spawn(async move { - census - .run() - .instrument(tracing::trace_span!("census")) - .await; - })); + // Create and initialize the census to acquire critical view of network before gossiping + let mut census = Census::new(portal_client.clone(), &bridge_config); + census_handle = Some(census.init([Subnetwork::State]).await); let state_bridge = StateBridge::new( bridge_config.mode.clone(), portal_client.clone(), bridge_config.offer_limit, - census_tx, + census, bridge_config.bridge_id, ) .await?;