diff --git a/cumulus/client/consensus/aura/src/collators/lookahead.rs b/cumulus/client/consensus/aura/src/collators/lookahead.rs index 8ac43fbd116e..2dbcf5eb58e9 100644 --- a/cumulus/client/consensus/aura/src/collators/lookahead.rs +++ b/cumulus/client/consensus/aura/src/collators/lookahead.rs @@ -36,17 +36,15 @@ use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterfa use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker}; use cumulus_client_consensus_proposer::ProposerInterface; use cumulus_primitives_aura::AuraUnincludedSegmentApi; -use cumulus_primitives_core::{ - ClaimQueueOffset, CollectCollationInfo, PersistedValidationData, DEFAULT_CLAIM_QUEUE_OFFSET, -}; +use cumulus_primitives_core::{ClaimQueueOffset, CollectCollationInfo, PersistedValidationData}; use cumulus_relay_chain_interface::RelayChainInterface; use polkadot_node_primitives::{PoV, SubmitCollationParams}; use polkadot_node_subsystem::messages::CollationGenerationMessage; use polkadot_overseer::Handle as OverseerHandle; use polkadot_primitives::{ - BlockNumber as RBlockNumber, CollatorPair, Hash as RHash, HeadData, Id as ParaId, - OccupiedCoreAssumption, + vstaging::DEFAULT_CLAIM_QUEUE_OFFSET, BlockNumber as RBlockNumber, CollatorPair, Hash as RHash, + HeadData, Id as ParaId, OccupiedCoreAssumption, }; use futures::prelude::*; diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs b/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs index e75b52aeebd3..425151230704 100644 --- a/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs +++ b/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs @@ -20,13 +20,11 @@ use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterfa use cumulus_client_consensus_common::{self as consensus_common, ParachainBlockImportMarker}; use cumulus_client_consensus_proposer::ProposerInterface; use cumulus_primitives_aura::AuraUnincludedSegmentApi; -use cumulus_primitives_core::{ - GetCoreSelectorApi, PersistedValidationData, DEFAULT_CLAIM_QUEUE_OFFSET, -}; +use cumulus_primitives_core::{GetCoreSelectorApi, PersistedValidationData}; use cumulus_relay_chain_interface::RelayChainInterface; use polkadot_primitives::{ - vstaging::{ClaimQueueOffset, CoreSelector}, + vstaging::{ClaimQueueOffset, CoreSelector, DEFAULT_CLAIM_QUEUE_OFFSET}, BlockId, CoreIndex, Hash as RelayHash, Header as RelayHeader, Id as ParaId, OccupiedCoreAssumption, }; diff --git a/cumulus/pallets/parachain-system/src/lib.rs b/cumulus/pallets/parachain-system/src/lib.rs index 98989a852b8d..39fc8321a072 100644 --- a/cumulus/pallets/parachain-system/src/lib.rs +++ b/cumulus/pallets/parachain-system/src/lib.rs @@ -35,12 +35,12 @@ use core::{cmp, marker::PhantomData}; use cumulus_primitives_core::{ relay_chain::{ self, - vstaging::{ClaimQueueOffset, CoreSelector}, + vstaging::{ClaimQueueOffset, CoreSelector, DEFAULT_CLAIM_QUEUE_OFFSET}, }, AbridgedHostConfiguration, ChannelInfo, ChannelStatus, CollationInfo, GetChannelInfo, InboundDownwardMessage, InboundHrmpMessage, ListChannelInfos, MessageSendError, OutboundHrmpMessage, ParaId, PersistedValidationData, UpwardMessage, UpwardMessageSender, - XcmpMessageHandler, XcmpMessageSource, DEFAULT_CLAIM_QUEUE_OFFSET, + XcmpMessageHandler, XcmpMessageSource, }; use cumulus_primitives_parachain_inherent::{MessageQueueChain, ParachainInherentData}; use frame_support::{ diff --git a/cumulus/primitives/core/src/lib.rs b/cumulus/primitives/core/src/lib.rs index dfb574ef3301..f88e663db19e 100644 --- a/cumulus/primitives/core/src/lib.rs +++ b/cumulus/primitives/core/src/lib.rs @@ -333,10 +333,6 @@ pub mod rpsr_digest { } } -/// The default claim queue offset to be used if it's not configured/accessible in the parachain -/// runtime -pub const DEFAULT_CLAIM_QUEUE_OFFSET: u8 = 0; - /// Information about a collation. /// /// This was used in version 1 of the [`CollectCollationInfo`] runtime api. diff --git a/polkadot/primitives/src/vstaging/mod.rs b/polkadot/primitives/src/vstaging/mod.rs index bc687f7e2fbe..5c24efff9923 100644 --- a/polkadot/primitives/src/vstaging/mod.rs +++ b/polkadot/primitives/src/vstaging/mod.rs @@ -39,6 +39,10 @@ use sp_staking::SessionIndex; /// Async backing primitives pub mod async_backing; +/// The default claim queue offset to be used if it's not configured/accessible in the parachain +/// runtime +pub const DEFAULT_CLAIM_QUEUE_OFFSET: u8 = 0; + /// A type representing the version of the candidate descriptor and internal version number. #[derive(PartialEq, Eq, Encode, Decode, Clone, TypeInfo, RuntimeDebug, Copy)] #[cfg_attr(feature = "std", derive(Hash))] @@ -310,24 +314,6 @@ pub enum UMPSignal { pub const UMP_SEPARATOR: Vec = vec![]; impl CandidateCommitments { - /// Returns the core selector and claim queue offset the candidate has committed to, if any. - pub fn selected_core(&self) -> Option<(CoreSelector, ClaimQueueOffset)> { - // We need at least 2 messages for the separator and core selector - if self.upward_messages.len() < 2 { - return None - } - - let separator_pos = - self.upward_messages.iter().rposition(|message| message == &UMP_SEPARATOR)?; - - // Use first commitment - let message = self.upward_messages.get(separator_pos + 1)?; - - match UMPSignal::decode(&mut message.as_slice()).ok()? { - UMPSignal::SelectCore(core_selector, cq_offset) => Some((core_selector, cq_offset)), - } - } - /// Returns the core index determined by `UMPSignal::SelectCore` commitment /// and `assigned_cores`. /// @@ -335,16 +321,47 @@ impl CandidateCommitments { /// assigned cores is empty. /// /// `assigned_cores` must be a sorted vec of all core indices assigned to a parachain. - pub fn committed_core_index(&self, assigned_cores: &[&CoreIndex]) -> Option { + pub fn committed_core_index( + &self, + assigned_cores: &BTreeMap>, + ) -> Result { + let mut signals_iter = + self.upward_messages.iter().skip_while(|message| *message != &UMP_SEPARATOR); + + let (core_selector, cq_offset) = if signals_iter.next().is_some() { + let core_selector_message = + signals_iter.next().ok_or(CandidateReceiptError::NoCoreSelected)?; + // We should have exactly one signal beyond the separator + if signals_iter.next().is_some() { + return Err(CandidateReceiptError::TooManyUMPSignals) + } + + match UMPSignal::decode(&mut core_selector_message.as_slice()) + .map_err(|_| CandidateReceiptError::InvalidSelectedCore)? + { + UMPSignal::SelectCore(core_selector, cq_offset) => (Some(core_selector), cq_offset), + } + } else { + // No separator, we should use the defaults. + (None, ClaimQueueOffset(DEFAULT_CLAIM_QUEUE_OFFSET)) + }; + + let assigned_cores = + assigned_cores.get(&cq_offset.0).ok_or(CandidateReceiptError::NoAssignment)?; + if assigned_cores.is_empty() { - return None + return Err(CandidateReceiptError::NoAssignment) } - self.selected_core().and_then(|(core_selector, _cq_offset)| { - let core_index = - **assigned_cores.get(core_selector.0 as usize % assigned_cores.len())?; - Some(core_index) - }) + let core_selector = core_selector + .or_else(|| if assigned_cores.len() == 1 { Some(CoreSelector(0)) } else { None }) + .ok_or(CandidateReceiptError::NoCoreSelected)?; + + assigned_cores + .iter() + .nth(core_selector.0 as usize % assigned_cores.len()) + .ok_or(CandidateReceiptError::InvalidSelectedCore) + .copied() } } @@ -364,6 +381,9 @@ pub enum CandidateReceiptError { NoCoreSelected, /// Unknown version. UnknownVersion(InternalVersion), + /// The allowed number of `UMPSignal` messages in the queue was exceeded. + /// Currenly only one such message is allowed. + TooManyUMPSignals, } macro_rules! impl_getter { @@ -470,39 +490,11 @@ impl CommittedCandidateReceiptV2 { return Err(CandidateReceiptError::UnknownVersion(self.descriptor.version)), } - if cores_per_para.is_empty() { - return Err(CandidateReceiptError::NoAssignment) - } - - let (offset, core_selected) = - if let Some((_core_selector, cq_offset)) = self.commitments.selected_core() { - (cq_offset.0, true) - } else { - // If no core has been selected then we use offset 0 (top of claim queue) - (0, false) - }; - - // The cores assigned to the parachain at above computed offset. let assigned_cores = cores_per_para .get(&self.descriptor.para_id()) - .ok_or(CandidateReceiptError::NoAssignment)? - .get(&offset) - .ok_or(CandidateReceiptError::NoAssignment)? - .into_iter() - .collect::>(); - - let core_index = if core_selected { - self.commitments - .committed_core_index(assigned_cores.as_slice()) - .ok_or(CandidateReceiptError::NoAssignment)? - } else { - // `SelectCore` commitment is mandatory for elastic scaling parachains. - if assigned_cores.len() > 1 { - return Err(CandidateReceiptError::NoCoreSelected) - } + .ok_or(CandidateReceiptError::NoAssignment)?; - **assigned_cores.get(0).ok_or(CandidateReceiptError::NoAssignment)? - }; + let core_index = self.commitments.committed_core_index(assigned_cores)?; let descriptor_core_index = CoreIndex(self.descriptor.core_index as u32); if core_index != descriptor_core_index { diff --git a/polkadot/runtime/parachains/src/inclusion/mod.rs b/polkadot/runtime/parachains/src/inclusion/mod.rs index ea3a5d3cdda9..6ae6e83baf1f 100644 --- a/polkadot/runtime/parachains/src/inclusion/mod.rs +++ b/polkadot/runtime/parachains/src/inclusion/mod.rs @@ -48,7 +48,7 @@ use polkadot_primitives::{ vstaging::{ BackedCandidate, CandidateDescriptorV2 as CandidateDescriptor, CandidateReceiptV2 as CandidateReceipt, - CommittedCandidateReceiptV2 as CommittedCandidateReceipt, + CommittedCandidateReceiptV2 as CommittedCandidateReceipt, UMP_SEPARATOR, }, well_known_keys, CandidateCommitments, CandidateHash, CoreIndex, GroupIndex, HeadData, Id as ParaId, SignedAvailabilityBitfields, SigningContext, UpwardMessage, ValidatorId, @@ -412,11 +412,6 @@ pub(crate) enum UmpAcceptanceCheckErr { TotalSizeExceeded { total_size: u64, limit: u64 }, /// A para-chain cannot send UMP messages while it is offboarding. IsOffboarding, - /// The allowed number of `UMPSignal` messages in the queue was exceeded. - /// Currenly only one such message is allowed. - TooManyUMPSignals { count: u32 }, - /// The UMP queue contains an invalid `UMPSignal` - NoUmpSignal, } impl fmt::Debug for UmpAcceptanceCheckErr { @@ -445,12 +440,6 @@ impl fmt::Debug for UmpAcceptanceCheckErr { UmpAcceptanceCheckErr::IsOffboarding => { write!(fmt, "upward message rejected because the para is off-boarding") }, - UmpAcceptanceCheckErr::TooManyUMPSignals { count } => { - write!(fmt, "the ump queue has too many `UMPSignal` messages ({} > 1 )", count) - }, - UmpAcceptanceCheckErr::NoUmpSignal => { - write!(fmt, "Required UMP signal not found") - }, } } } @@ -925,25 +914,10 @@ impl Pallet { upward_messages: &[UpwardMessage], ) -> Result<(), UmpAcceptanceCheckErr> { // Filter any pending UMP signals and the separator. - let upward_messages = if let Some(separator_index) = - upward_messages.iter().position(|message| message.is_empty()) - { - let (upward_messages, ump_signals) = upward_messages.split_at(separator_index); - - if ump_signals.len() > 2 { - return Err(UmpAcceptanceCheckErr::TooManyUMPSignals { - count: ump_signals.len() as u32, - }) - } - - if ump_signals.len() == 1 { - return Err(UmpAcceptanceCheckErr::NoUmpSignal) - } - - upward_messages - } else { - upward_messages - }; + let upward_messages = upward_messages + .iter() + .take_while(|message| message != &&UMP_SEPARATOR) + .collect::>(); // Cannot send UMP messages while off-boarding. if paras::Pallet::::is_offboarding(para) { @@ -1000,7 +974,7 @@ impl Pallet { let bounded = upward_messages .iter() // Stop once we hit the `UMPSignal` separator. - .take_while(|message| !message.is_empty()) + .take_while(|message| message != &&UMP_SEPARATOR) .filter_map(|d| { BoundedSlice::try_from(&d[..]) .inspect_err(|_| {