Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncing strategy refactoring (part 3) #5737

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0515f8f
Use `SyncingAction` everywhere instead of custom enums in different s…
nazar-pc Sep 6, 2024
743161c
Make strategy key opaque instead of having a hardcoded list of variants
nazar-pc Sep 6, 2024
e393d30
Rename `SyncingConfig` to `PolkadotSyncingStrategyConfig`
nazar-pc Sep 6, 2024
6b5acc4
Move `PolkadotSyncingStrategy` into its own module
nazar-pc Sep 6, 2024
a312e09
Replace `SyncingAction::SendWarpProofRequest` and `SyncingStrategy::o…
nazar-pc Sep 6, 2024
72908d3
Replace state sync request/response handling with generic one
nazar-pc Sep 6, 2024
c727dc1
Replace `SyncingAction::SendGenericRequest` with more generic `Syncin…
nazar-pc Sep 6, 2024
6bf9b7f
Move block downloader instantiation outside the network instantiation
nazar-pc Sep 10, 2024
76b2dcf
Move `ChainSync::on_block_response` implementation out of the trait i…
nazar-pc Sep 10, 2024
a1a0863
Pass protocol name down to generic response handler
nazar-pc Sep 10, 2024
d2ab786
Replace `SyncingAction::SendBlockRequest` and `SyncingStrategy::on_bl…
nazar-pc Sep 10, 2024
0ae9dc0
Improve import queue API, removing the need for `Box<dyn Link<B>>` if…
nazar-pc Sep 11, 2024
a26bbfa
Remove syncing engine construction from `build_network` and expose `b…
nazar-pc Sep 11, 2024
53b34d0
Add prdoc
nazar-pc Sep 17, 2024
0ac0ddf
Rename `build_network` to `build_network_advanced` and restore `build…
nazar-pc Sep 20, 2024
8388a81
Make `build_network_advanced` arguments lower-level and not require `…
nazar-pc Sep 20, 2024
243af75
Merge remote-tracking branch 'upstream/master' into syncing-strategy-…
nazar-pc Sep 20, 2024
63dbf0f
Fix toml formatting
nazar-pc Sep 20, 2024
7f4ce2c
Change name of generics for consistency
nazar-pc Oct 22, 2024
87be088
Remove redundant comment (present in places that set `remove_obsolete…
nazar-pc Oct 22, 2024
27ee43c
Merge remote-tracking branch 'upstream/master' into syncing-strategy-…
nazar-pc Oct 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ artifacts
bin/node-template/Cargo.lock
nohup.out
polkadot_argument_parsing
polkadot.*
!docs/sdk/src/polkadot_sdk/polkadot.rs
pwasm-alloc/Cargo.lock
pwasm-libc/Cargo.lock
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 27 additions & 15 deletions cumulus/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ use sc_consensus::{
BlockImport,
};
use sc_network::{config::SyncMode, service::traits::NetworkService, NetworkBackend};
use sc_network_sync::SyncingService;
use sc_network_sync::{service::network::NetworkServiceProvider, SyncingService};
use sc_network_transactions::TransactionsHandlerController;
use sc_service::{
build_polkadot_syncing_strategy, Configuration, NetworkStarter, SpawnTaskHandle, TaskManager,
WarpSyncConfig,
build_default_syncing_engine, Configuration, DefaultSyncingEngineConfig, NetworkStarter,
SpawnTaskHandle, TaskManager, WarpSyncConfig,
};
use sc_telemetry::{log, TelemetryWorkerHandle};
use sc_utils::mpsc::TracingUnboundedSender;
Expand Down Expand Up @@ -499,15 +499,27 @@ where
parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
);

let syncing_strategy = build_polkadot_syncing_strategy(
parachain_config.protocol_id(),
parachain_config.chain_spec.fork_id(),
&mut net_config,
warp_sync_config,
client.clone(),
&spawn_handle,
parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
)?;
let network_service_provider = NetworkServiceProvider::new();
let (sync_service, block_announce_config) =
build_default_syncing_engine(DefaultSyncingEngineConfig {
role: parachain_config.role,
protocol_id: parachain_config.protocol_id(),
fork_id: None,
net_config: &mut net_config,
block_announce_validator,
network_service_handle: network_service_provider.handle(),
warp_sync_config,
client: client.clone(),
import_queue_service: import_queue.service(),
num_peers_hint: parachain_config.network.default_peers_set.in_peers as usize +
parachain_config.network.default_peers_set.out_peers as usize,
spawn_handle: &spawn_handle,
metrics_registry: parachain_config
.prometheus_config
.as_ref()
.map(|config| &config.registry),
metrics: metrics.clone(),
})?;

sc_service::build_network(sc_service::BuildNetworkParams {
config: parachain_config,
Expand All @@ -516,9 +528,9 @@ where
transaction_pool,
spawn_handle,
import_queue,
block_announce_validator_builder: Some(Box::new(move |_| block_announce_validator)),
syncing_strategy,
block_relay: None,
sync_service,
block_announce_config,
network_service_provider,
metrics,
})
}
Expand Down
40 changes: 27 additions & 13 deletions polkadot/node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
use prometheus_endpoint::Registry;
#[cfg(feature = "full-node")]
use sc_service::KeystoreContainer;
use sc_service::{build_polkadot_syncing_strategy, RpcHandlers, SpawnTaskHandle};
use sc_service::{
build_default_syncing_engine, DefaultSyncingEngineConfig, RpcHandlers, SpawnTaskHandle,
};
use sc_telemetry::TelemetryWorker;
#[cfg(feature = "full-node")]
use sc_telemetry::{Telemetry, TelemetryWorkerHandle};
Expand All @@ -98,6 +100,7 @@ pub use sc_client_api::{Backend, CallExecutor};
pub use sc_consensus::{BlockImport, LongestChain};
pub use sc_executor::NativeExecutionDispatch;
use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
use sc_network_sync::service::network::NetworkServiceProvider;
pub use sc_service::{
config::{DatabaseSource, PrometheusConfig},
ChainSpec, Configuration, Error as SubstrateServiceError, PruningMode, Role, TFullBackend,
Expand All @@ -117,6 +120,8 @@ pub use {rococo_runtime, rococo_runtime_constants};
pub use {westend_runtime, westend_runtime_constants};

pub use fake_runtime_api::{GetLastTimestamp, RuntimeApi};
use sc_consensus::ImportQueue;
use sp_consensus::block_validation::DefaultBlockAnnounceValidator;

#[cfg(feature = "full-node")]
pub type FullBackend = sc_service::TFullBackend<Block>;
Expand Down Expand Up @@ -1028,15 +1033,24 @@ pub fn new_full<
})
};

let syncing_strategy = build_polkadot_syncing_strategy(
config.protocol_id(),
config.chain_spec.fork_id(),
&mut net_config,
Some(WarpSyncConfig::WithProvider(warp_sync)),
client.clone(),
&task_manager.spawn_handle(),
config.prometheus_config.as_ref().map(|config| &config.registry),
)?;
let network_service_provider = NetworkServiceProvider::new();
let (sync_service, block_announce_config) =
build_default_syncing_engine(DefaultSyncingEngineConfig {
role: config.role,
protocol_id: config.protocol_id(),
fork_id: None,
net_config: &mut net_config,
block_announce_validator: Box::new(DefaultBlockAnnounceValidator),
network_service_handle: network_service_provider.handle(),
warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)),
client: client.clone(),
import_queue_service: import_queue.service(),
num_peers_hint: config.network.default_peers_set.in_peers as usize +
config.network.default_peers_set.out_peers as usize,
spawn_handle: &task_manager.spawn_handle(),
metrics_registry: config.prometheus_config.as_ref().map(|config| &config.registry),
metrics: metrics.clone(),
})?;

let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
Expand All @@ -1046,9 +1060,9 @@ pub fn new_full<
transaction_pool: transaction_pool.clone(),
spawn_handle: task_manager.spawn_handle(),
import_queue,
block_announce_validator_builder: None,
syncing_strategy,
block_relay: None,
sync_service,
block_announce_config,
network_service_provider,
metrics,
})?;

Expand Down
39 changes: 26 additions & 13 deletions substrate/bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ use frame_system_rpc_runtime_api::AccountNonceApi;
use futures::prelude::*;
use kitchensink_runtime::RuntimeApi;
use node_primitives::Block;
use polkadot_sdk::sc_service::build_polkadot_syncing_strategy;
use polkadot_sdk::{
sc_network_sync::service::network::NetworkServiceProvider,
sc_service::{build_default_syncing_engine, DefaultSyncingEngineConfig, ImportQueue},
sp_consensus::block_validation::DefaultBlockAnnounceValidator,
};
use sc_client_api::{Backend, BlockBackend};
use sc_consensus_babe::{self, SlotProportion};
use sc_network::{
Expand Down Expand Up @@ -507,15 +511,24 @@ pub fn new_full_base<N: NetworkBackend<Block, <Block as BlockT>::Hash>>(
Vec::default(),
));

let syncing_strategy = build_polkadot_syncing_strategy(
config.protocol_id(),
config.chain_spec.fork_id(),
&mut net_config,
Some(WarpSyncConfig::WithProvider(warp_sync)),
client.clone(),
&task_manager.spawn_handle(),
config.prometheus_config.as_ref().map(|config| &config.registry),
)?;
let network_service_provider = NetworkServiceProvider::new();
let (sync_service, block_announce_config) =
build_default_syncing_engine(DefaultSyncingEngineConfig {
role: config.role,
protocol_id: config.protocol_id(),
fork_id: None,
net_config: &mut net_config,
block_announce_validator: Box::new(DefaultBlockAnnounceValidator),
network_service_handle: network_service_provider.handle(),
warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)),
client: client.clone(),
import_queue_service: import_queue.service(),
num_peers_hint: config.network.default_peers_set.in_peers as usize +
config.network.default_peers_set.out_peers as usize,
spawn_handle: &task_manager.spawn_handle(),
metrics_registry: config.prometheus_config.as_ref().map(|config| &config.registry),
metrics: metrics.clone(),
})?;

let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
Expand All @@ -525,9 +538,9 @@ pub fn new_full_base<N: NetworkBackend<Block, <Block as BlockT>::Hash>>(
transaction_pool: transaction_pool.clone(),
spawn_handle: task_manager.spawn_handle(),
import_queue,
block_announce_validator_builder: None,
syncing_strategy,
block_relay: None,
sync_service,
block_announce_config,
network_service_provider,
metrics,
})?;

Expand Down
14 changes: 7 additions & 7 deletions substrate/client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub trait Verifier<B: BlockT>: Send + Sync {
///
/// The `import_*` methods can be called in order to send elements for the import queue to verify.
pub trait ImportQueueService<B: BlockT>: Send {
/// Import bunch of blocks, every next block must be an ancestor of the previous block in the
/// Import a bunch of blocks, every next block must be an ancestor of the previous block in the
/// list.
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);

Expand All @@ -132,21 +132,21 @@ pub trait ImportQueue<B: BlockT>: Send {
/// This method should behave in a way similar to `Future::poll`. It can register the current
/// task and notify later when more actions are ready to be polled. To continue the comparison,
/// it is as if this method always returned `Poll::Pending`.
fn poll_actions(&mut self, cx: &mut futures::task::Context, link: &mut dyn Link<B>);
fn poll_actions(&mut self, cx: &mut futures::task::Context, link: &dyn Link<B>);

/// Start asynchronous runner for import queue.
///
/// Takes an object implementing [`Link`] which allows the import queue to
/// influence the synchronization process.
async fn run(self, link: Box<dyn Link<B>>);
async fn run(self, link: &dyn Link<B>);
}

/// Hooks that the verification queue can use to influence the synchronization
/// algorithm.
pub trait Link<B: BlockT>: Send {
pub trait Link<B: BlockT>: Send + Sync {
/// Batch of blocks imported, with or without error.
fn blocks_processed(
&mut self,
&self,
_imported: usize,
_count: usize,
_results: Vec<(BlockImportResult<B>, B::Hash)>,
Expand All @@ -155,7 +155,7 @@ pub trait Link<B: BlockT>: Send {

/// Justification import result.
fn justification_imported(
&mut self,
&self,
_who: RuntimeOrigin,
_hash: &B::Hash,
_number: NumberFor<B>,
Expand All @@ -164,7 +164,7 @@ pub trait Link<B: BlockT>: Send {
}

/// Request a justification for the given block.
fn request_justification(&mut self, _hash: &B::Hash, _number: NumberFor<B>) {}
fn request_justification(&self, _hash: &B::Hash, _number: NumberFor<B>) {}
}

/// Block import successful result.
Expand Down
29 changes: 15 additions & 14 deletions substrate/client/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
}

/// Poll actions from network.
fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link<B>) {
fn poll_actions(&mut self, cx: &mut Context, link: &dyn Link<B>) {
if self.result_port.poll_actions(cx, link).is_err() {
log::error!(
target: LOG_TARGET,
Expand All @@ -190,9 +190,9 @@ impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
///
/// Takes an object implementing [`Link`] which allows the import queue to
/// influence the synchronization process.
async fn run(mut self, mut link: Box<dyn Link<B>>) {
async fn run(mut self, link: &dyn Link<B>) {
loop {
if let Err(_) = self.result_port.next_action(&mut *link).await {
if let Err(_) = self.result_port.next_action(link).await {
log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
return
}
Expand Down Expand Up @@ -223,7 +223,7 @@ mod worker_messages {
async fn block_import_process<B: BlockT>(
mut block_import: BoxBlockImport<B>,
verifier: impl Verifier<B>,
mut result_sender: BufferedLinkSender<B>,
result_sender: BufferedLinkSender<B>,
mut block_import_receiver: TracingUnboundedReceiver<worker_messages::ImportBlocks<B>>,
metrics: Option<Metrics>,
) {
Expand Down Expand Up @@ -501,6 +501,7 @@ mod tests {
import_queue::Verifier,
};
use futures::{executor::block_on, Future};
use parking_lot::Mutex;
use sp_test_primitives::{Block, BlockNumber, Hash, Header};

#[async_trait::async_trait]
Expand Down Expand Up @@ -558,29 +559,29 @@ mod tests {

#[derive(Default)]
struct TestLink {
events: Vec<Event>,
events: Mutex<Vec<Event>>,
}

impl Link<Block> for TestLink {
fn blocks_processed(
&mut self,
&self,
_imported: usize,
_count: usize,
results: Vec<(Result<BlockImportStatus<BlockNumber>, BlockImportError>, Hash)>,
) {
if let Some(hash) = results.into_iter().find_map(|(r, h)| r.ok().map(|_| h)) {
self.events.push(Event::BlockImported(hash));
self.events.lock().push(Event::BlockImported(hash));
}
}

fn justification_imported(
&mut self,
&self,
_who: RuntimeOrigin,
hash: &Hash,
_number: BlockNumber,
_success: bool,
) {
self.events.push(Event::JustificationImported(*hash))
self.events.lock().push(Event::JustificationImported(*hash))
}
}

Expand Down Expand Up @@ -638,7 +639,7 @@ mod tests {
hash
};

let mut link = TestLink::default();
let link = TestLink::default();

// we send a bunch of tasks to the worker
let block1 = import_block(1);
Expand All @@ -653,22 +654,22 @@ mod tests {

// we poll the worker until we have processed 9 events
block_on(futures::future::poll_fn(|cx| {
while link.events.len() < 9 {
while link.events.lock().len() < 9 {
match Future::poll(Pin::new(&mut worker), cx) {
Poll::Pending => {},
Poll::Ready(()) => panic!("import queue worker should not conclude."),
}

result_port.poll_actions(cx, &mut link).unwrap();
result_port.poll_actions(cx, &link).unwrap();
}

Poll::Ready(())
}));

// all justification tasks must be done before any block import work
assert_eq!(
link.events,
vec![
&*link.events.lock(),
&[
Event::JustificationImported(justification1),
Event::JustificationImported(justification2),
Event::JustificationImported(justification3),
Expand Down
Loading