diff --git a/prdoc/pr_6205.prdoc b/prdoc/pr_6205.prdoc new file mode 100644 index 000000000000..0874eb468db4 --- /dev/null +++ b/prdoc/pr_6205.prdoc @@ -0,0 +1,8 @@ +title: 'pallet-message-queue: Fix max message size calculation' +doc: +- audience: Runtime Dev + description: |- + The max size of a message should not depend on the weight left in a given execution context. Instead the max message size depends on the service weights configured for the pallet. A message that may does not fit into `on_idle` is not automatically overweight, because it may can be executed successfully in `on_initialize` or in another block in `on_idle` when there is more weight left. +crates: +- name: pallet-message-queue + bump: patch diff --git a/substrate/frame/message-queue/src/lib.rs b/substrate/frame/message-queue/src/lib.rs index 31402f2a9d81..04620fa88d85 100644 --- a/substrate/frame/message-queue/src/lib.rs +++ b/substrate/frame/message-queue/src/lib.rs @@ -868,13 +868,26 @@ impl Pallet { } } - /// The maximal weight that a single message can consume. + /// The maximal weight that a single message ever can consume. /// /// Any message using more than this will be marked as permanently overweight and not /// automatically re-attempted. Returns `None` if the servicing of a message cannot begin. /// `Some(0)` means that only messages with no weight may be served. fn max_message_weight(limit: Weight) -> Option { - limit.checked_sub(&Self::single_msg_overhead()) + let service_weight = T::ServiceWeight::get().unwrap_or_default(); + let on_idle_weight = T::IdleMaxServiceWeight::get().unwrap_or_default(); + + // Whatever weight is set, the one with the biggest one is used as the maximum weight. If a + // message is tried in one context and fails, it will be retried in the other context later. + let max_message_weight = + if service_weight.any_gt(on_idle_weight) { service_weight } else { on_idle_weight }; + + if max_message_weight.is_zero() { + // If no service weight is set, we need to use the given limit as max message weight. + limit.checked_sub(&Self::single_msg_overhead()) + } else { + max_message_weight.checked_sub(&Self::single_msg_overhead()) + } } /// The overhead of servicing a single message. @@ -896,6 +909,8 @@ impl Pallet { fn do_integrity_test() -> Result<(), String> { ensure!(!MaxMessageLenOf::::get().is_zero(), "HeapSize too low"); + let max_block = T::BlockWeights::get().max_block; + if let Some(service) = T::ServiceWeight::get() { if Self::max_message_weight(service).is_none() { return Err(format!( @@ -904,6 +919,31 @@ impl Pallet { Self::single_msg_overhead(), )) } + + if service.any_gt(max_block) { + return Err(format!( + "ServiceWeight {service} is bigger than max block weight {max_block}" + )) + } + } + + if let Some(on_idle) = T::IdleMaxServiceWeight::get() { + if on_idle.any_gt(max_block) { + return Err(format!( + "IdleMaxServiceWeight {on_idle} is bigger than max block weight {max_block}" + )) + } + } + + if let (Some(service_weight), Some(on_idle)) = + (T::ServiceWeight::get(), T::IdleMaxServiceWeight::get()) + { + if !(service_weight.all_gt(on_idle) || + on_idle.all_gt(service_weight) || + service_weight == on_idle) + { + return Err("One of `ServiceWeight` or `IdleMaxServiceWeight` needs to be `all_gt` or both need to be equal.".into()) + } } Ok(()) @@ -1531,7 +1571,7 @@ impl Pallet { let mut weight = WeightMeter::with_limit(weight_limit); // Get the maximum weight that processing a single message may take: - let max_weight = Self::max_message_weight(weight_limit).unwrap_or_else(|| { + let overweight_limit = Self::max_message_weight(weight_limit).unwrap_or_else(|| { if matches!(context, ServiceQueuesContext::OnInitialize) { defensive!("Not enough weight to service a single message."); } @@ -1549,7 +1589,8 @@ impl Pallet { let mut last_no_progress = None; loop { - let (progressed, n) = Self::service_queue(next.clone(), &mut weight, max_weight); + let (progressed, n) = + Self::service_queue(next.clone(), &mut weight, overweight_limit); next = match n { Some(n) => if !progressed { diff --git a/substrate/frame/message-queue/src/mock.rs b/substrate/frame/message-queue/src/mock.rs index d3f719c62356..f1d341d1a5db 100644 --- a/substrate/frame/message-queue/src/mock.rs +++ b/substrate/frame/message-queue/src/mock.rs @@ -42,7 +42,7 @@ impl frame_system::Config for Test { type Block = Block; } parameter_types! { - pub const HeapSize: u32 = 24; + pub const HeapSize: u32 = 40; pub const MaxStale: u32 = 2; pub const ServiceWeight: Option = Some(Weight::from_parts(100, 100)); } diff --git a/substrate/frame/message-queue/src/tests.rs b/substrate/frame/message-queue/src/tests.rs index b75764b67bea..c81e486a40df 100644 --- a/substrate/frame/message-queue/src/tests.rs +++ b/substrate/frame/message-queue/src/tests.rs @@ -177,7 +177,7 @@ fn service_queues_failing_messages_works() { MessageQueue::enqueue_message(msg("stacklimitreached"), Here); MessageQueue::enqueue_message(msg("yield"), Here); // Starts with four pages. - assert_pages(&[0, 1, 2, 3, 4]); + assert_pages(&[0, 1, 2]); assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); assert_last_event::( @@ -209,7 +209,7 @@ fn service_queues_failing_messages_works() { assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); assert_eq!(System::events().len(), 4); // Last page with the `yield` stays in. - assert_pages(&[4]); + assert_pages(&[2]); }); } @@ -313,7 +313,7 @@ fn reap_page_permanent_overweight_works() { // Create 10 pages more than the stale limit. let n = (MaxStale::get() + 10) as usize; for _ in 0..n { - MessageQueue::enqueue_message(msg("weight=2"), Here); + MessageQueue::enqueue_message(msg("weight=200 datadatadata"), Here); } assert_eq!(Pages::::iter().count(), n); assert_eq!(MessageQueue::footprint(Here).pages, n as u32); @@ -334,7 +334,7 @@ fn reap_page_permanent_overweight_works() { break } assert_ok!(MessageQueue::do_reap_page(&Here, i)); - assert_eq!(QueueChanges::take(), vec![(Here, b.message_count - 1, b.size - 8)]); + assert_eq!(QueueChanges::take(), vec![(Here, b.message_count - 1, b.size - 23)]); } // Cannot reap any more pages. @@ -353,20 +353,20 @@ fn reaping_overweight_fails_properly() { build_and_execute::(|| { // page 0 - MessageQueue::enqueue_message(msg("weight=4"), Here); + MessageQueue::enqueue_message(msg("weight=200 datadata"), Here); MessageQueue::enqueue_message(msg("a"), Here); // page 1 - MessageQueue::enqueue_message(msg("weight=4"), Here); + MessageQueue::enqueue_message(msg("weight=200 datadata"), Here); MessageQueue::enqueue_message(msg("b"), Here); // page 2 - MessageQueue::enqueue_message(msg("weight=4"), Here); + MessageQueue::enqueue_message(msg("weight=200 datadata"), Here); MessageQueue::enqueue_message(msg("c"), Here); // page 3 - MessageQueue::enqueue_message(msg("bigbig 1"), Here); + MessageQueue::enqueue_message(msg("bigbig 1 datadata"), Here); // page 4 - MessageQueue::enqueue_message(msg("bigbig 2"), Here); + MessageQueue::enqueue_message(msg("bigbig 2 datadata"), Here); // page 5 - MessageQueue::enqueue_message(msg("bigbig 3"), Here); + MessageQueue::enqueue_message(msg("bigbig 3 datadata"), Here); // Double-check that exactly these pages exist. assert_pages(&[0, 1, 2, 3, 4, 5]); @@ -385,7 +385,7 @@ fn reaping_overweight_fails_properly() { // 3 stale now: can take something 4 pages in history. assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); - assert_eq!(MessagesProcessed::take(), vec![(vmsg("bigbig 1"), Here)]); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("bigbig 1 datadata"), Here)]); // Nothing reapable yet, because we haven't hit the stale limit. for (o, i, _) in Pages::::iter() { @@ -394,7 +394,7 @@ fn reaping_overweight_fails_properly() { assert_pages(&[0, 1, 2, 4, 5]); assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); - assert_eq!(MessagesProcessed::take(), vec![(vmsg("bigbig 2"), Here)]); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("bigbig 2 datadata"), Here)]); assert_pages(&[0, 1, 2, 5]); // First is now reapable as it is too far behind the first ready page (5). @@ -406,7 +406,7 @@ fn reaping_overweight_fails_properly() { assert_pages(&[1, 2, 5]); assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight()); - assert_eq!(MessagesProcessed::take(), vec![(vmsg("bigbig 3"), Here)]); + assert_eq!(MessagesProcessed::take(), vec![(vmsg("bigbig 3 datadata"), Here)]); assert_noop!(MessageQueue::do_reap_page(&Here, 0), Error::::NoPage); assert_noop!(MessageQueue::do_reap_page(&Here, 3), Error::::NoPage); @@ -1062,29 +1062,29 @@ fn footprint_on_swept_works() { fn footprint_num_pages_works() { use MessageOrigin::*; build_and_execute::(|| { - MessageQueue::enqueue_message(msg("weight=2"), Here); - MessageQueue::enqueue_message(msg("weight=3"), Here); + MessageQueue::enqueue_message(msg("weight=200"), Here); + MessageQueue::enqueue_message(msg("weight=300"), Here); - assert_eq!(MessageQueue::footprint(Here), fp(2, 2, 2, 16)); + assert_eq!(MessageQueue::footprint(Here), fp(1, 1, 2, 20)); // Mark the messages as overweight. assert_eq!(MessageQueue::service_queues(1.into_weight()), 0.into_weight()); assert_eq!(System::events().len(), 2); // `ready_pages` decreases but `page` count does not. - assert_eq!(MessageQueue::footprint(Here), fp(2, 0, 2, 16)); + assert_eq!(MessageQueue::footprint(Here), fp(1, 0, 2, 20)); // Now execute the second message. assert_eq!( - ::execute_overweight(3.into_weight(), (Here, 1, 0)) + ::execute_overweight(300.into_weight(), (Here, 0, 1)) .unwrap(), - 3.into_weight() + 300.into_weight() ); - assert_eq!(MessageQueue::footprint(Here), fp(1, 0, 1, 8)); + assert_eq!(MessageQueue::footprint(Here), fp(1, 0, 1, 10)); // And the first one: assert_eq!( - ::execute_overweight(2.into_weight(), (Here, 0, 0)) + ::execute_overweight(200.into_weight(), (Here, 0, 0)) .unwrap(), - 2.into_weight() + 200.into_weight() ); assert_eq!(MessageQueue::footprint(Here), Default::default()); assert_eq!(MessageQueue::footprint(Here), fp(0, 0, 0, 0)); @@ -1104,7 +1104,7 @@ fn execute_overweight_works() { // Enqueue a message let origin = MessageOrigin::Here; - MessageQueue::enqueue_message(msg("weight=6"), origin); + MessageQueue::enqueue_message(msg("weight=200"), origin); // Load the current book let book = BookStateFor::::get(origin); assert_eq!(book.message_count, 1); @@ -1112,10 +1112,10 @@ fn execute_overweight_works() { // Mark the message as permanently overweight. assert_eq!(MessageQueue::service_queues(4.into_weight()), 4.into_weight()); - assert_eq!(QueueChanges::take(), vec![(origin, 1, 8)]); + assert_eq!(QueueChanges::take(), vec![(origin, 1, 10)]); assert_last_event::( Event::OverweightEnqueued { - id: blake2_256(b"weight=6"), + id: blake2_256(b"weight=200"), origin: MessageOrigin::Here, message_index: 0, page_index: 0, @@ -1132,9 +1132,9 @@ fn execute_overweight_works() { assert_eq!(Pages::::iter().count(), 1); assert!(QueueChanges::take().is_empty()); let consumed = - ::execute_overweight(7.into_weight(), (origin, 0, 0)) + ::execute_overweight(200.into_weight(), (origin, 0, 0)) .unwrap(); - assert_eq!(consumed, 6.into_weight()); + assert_eq!(consumed, 200.into_weight()); assert_eq!(QueueChanges::take(), vec![(origin, 0, 0)]); // There is no message left in the book. let book = BookStateFor::::get(origin); @@ -1162,7 +1162,7 @@ fn permanently_overweight_book_unknits() { set_weight("service_queue_base", 1.into_weight()); set_weight("service_page_base_completion", 1.into_weight()); - MessageQueue::enqueue_messages([msg("weight=9")].into_iter(), Here); + MessageQueue::enqueue_messages([msg("weight=200")].into_iter(), Here); // It is the only ready book. assert_ring(&[Here]); @@ -1170,7 +1170,7 @@ fn permanently_overweight_book_unknits() { assert_eq!(MessageQueue::service_queues(8.into_weight()), 4.into_weight()); assert_last_event::( Event::OverweightEnqueued { - id: blake2_256(b"weight=9"), + id: blake2_256(b"weight=200"), origin: Here, message_index: 0, page_index: 0, @@ -1201,19 +1201,19 @@ fn permanently_overweight_book_unknits_multiple() { set_weight("service_page_base_completion", 1.into_weight()); MessageQueue::enqueue_messages( - [msg("weight=1"), msg("weight=9"), msg("weight=9")].into_iter(), + [msg("weight=1"), msg("weight=200"), msg("weight=200")].into_iter(), Here, ); assert_ring(&[Here]); // Process the first message. assert_eq!(MessageQueue::service_queues(4.into_weight()), 4.into_weight()); - assert_eq!(num_overweight_enqueued_events(), 0); + assert_eq!(num_overweight_enqueued_events(), 1); assert_eq!(MessagesProcessed::take().len(), 1); // Book is still ready since it was not marked as overweight yet. assert_ring(&[Here]); - assert_eq!(MessageQueue::service_queues(8.into_weight()), 5.into_weight()); + assert_eq!(MessageQueue::service_queues(8.into_weight()), 4.into_weight()); assert_eq!(num_overweight_enqueued_events(), 2); assert_eq!(MessagesProcessed::take().len(), 0); // Now it is overweight. @@ -1566,12 +1566,12 @@ fn service_queues_suspend_works() { fn execute_overweight_respects_suspension() { build_and_execute::(|| { let origin = MessageOrigin::Here; - MessageQueue::enqueue_message(msg("weight=5"), origin); + MessageQueue::enqueue_message(msg("weight=200"), origin); // Mark the message as permanently overweight. MessageQueue::service_queues(4.into_weight()); assert_last_event::( Event::OverweightEnqueued { - id: blake2_256(b"weight=5"), + id: blake2_256(b"weight=200"), origin, message_index: 0, page_index: 0, @@ -1598,9 +1598,9 @@ fn execute_overweight_respects_suspension() { assert_last_event::( Event::Processed { - id: blake2_256(b"weight=5").into(), + id: blake2_256(b"weight=200").into(), origin, - weight_used: 5.into_weight(), + weight_used: 200.into_weight(), success: true, } .into(), @@ -1768,7 +1768,7 @@ fn recursive_overweight_while_service_is_forbidden() { // Check that the message was permanently overweight. assert_last_event::( Event::OverweightEnqueued { - id: blake2_256(b"weight=10"), + id: blake2_256(b"weight=200"), origin: There, message_index: 0, page_index: 0, @@ -1786,13 +1786,13 @@ fn recursive_overweight_while_service_is_forbidden() { Ok(()) })); - MessageQueue::enqueue_message(msg("weight=10"), There); + MessageQueue::enqueue_message(msg("weight=200"), There); MessageQueue::enqueue_message(msg("callback=0"), Here); // Mark it as permanently overweight. MessageQueue::service_queues(5.into_weight()); assert_ok!(::execute_overweight( - 10.into_weight(), + 200.into_weight(), (There, 0, 0) )); }); @@ -1812,7 +1812,7 @@ fn recursive_reap_page_is_forbidden() { // Create 10 pages more than the stale limit. let n = (MaxStale::get() + 10) as usize; for _ in 0..n { - MessageQueue::enqueue_message(msg("weight=2"), Here); + MessageQueue::enqueue_message(msg("weight=200"), Here); } // Mark all pages as stale since their message is permanently overweight. @@ -1886,6 +1886,11 @@ fn process_enqueued_on_idle_requires_enough_weight() { // Not enough weight to process on idle. Pallet::::on_idle(1, Weight::from_parts(0, 0)); assert_eq!(MessagesProcessed::take(), vec![]); + + assert!(!System::events().into_iter().any(|e| matches!( + e.event, + RuntimeEvent::MessageQueue(Event::::OverweightEnqueued { .. }) + ))); }) } @@ -1923,12 +1928,12 @@ fn execute_overweight_keeps_stack_ov_message() { // We need to create a mocked message that first reports insufficient weight, and then // `StackLimitReached`: IgnoreStackOvError::set(true); - MessageQueue::enqueue_message(msg("stacklimitreached"), Here); + MessageQueue::enqueue_message(msg("weight=200 stacklimitreached"), Here); MessageQueue::service_queues(0.into_weight()); assert_last_event::( Event::OverweightEnqueued { - id: blake2_256(b"stacklimitreached"), + id: blake2_256(b"weight=200 stacklimitreached"), origin: MessageOrigin::Here, message_index: 0, page_index: 0, @@ -1952,7 +1957,7 @@ fn execute_overweight_keeps_stack_ov_message() { ); assert_last_event::( Event::ProcessingFailed { - id: blake2_256(b"stacklimitreached").into(), + id: blake2_256(b"weight=200 stacklimitreached").into(), origin: MessageOrigin::Here, error: ProcessMessageError::StackLimitReached, } @@ -1964,16 +1969,16 @@ fn execute_overweight_keeps_stack_ov_message() { // Now let's process it normally: IgnoreStackOvError::set(true); assert_eq!( - ::execute_overweight(1.into_weight(), (Here, 0, 0)) + ::execute_overweight(200.into_weight(), (Here, 0, 0)) .unwrap(), - 1.into_weight() + 200.into_weight() ); assert_last_event::( Event::Processed { - id: blake2_256(b"stacklimitreached").into(), + id: blake2_256(b"weight=200 stacklimitreached").into(), origin: MessageOrigin::Here, - weight_used: 1.into_weight(), + weight_used: 200.into_weight(), success: true, } .into(),