From 26acee2902c519075bd3d5f98d2a1a498ee3d1a9 Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Tue, 13 Feb 2024 14:50:26 +0100 Subject: [PATCH 01/15] Add initial implementation of `MultiSignal` sync primitive --- embassy-sync/src/lib.rs | 1 + embassy-sync/src/multi_signal.rs | 285 +++++++++++++++++++++++++++++++ 2 files changed, 286 insertions(+) create mode 100644 embassy-sync/src/multi_signal.rs diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index d88c76db56..f029855646 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -12,6 +12,7 @@ mod ring_buffer; pub mod blocking_mutex; pub mod channel; +pub mod multi_signal; pub mod mutex; pub mod pipe; pub mod priority_channel; diff --git a/embassy-sync/src/multi_signal.rs b/embassy-sync/src/multi_signal.rs new file mode 100644 index 0000000000..db858f2695 --- /dev/null +++ b/embassy-sync/src/multi_signal.rs @@ -0,0 +1,285 @@ +//! A synchronization primitive for passing the latest value to **multiple** tasks. +use core::{ + cell::RefCell, + marker::PhantomData, + ops::{Deref, DerefMut}, + pin::Pin, + task::{Context, Poll}, +}; + +use futures_util::Future; + +use crate::{ + blocking_mutex::{raw::RawMutex, Mutex}, + waitqueue::MultiWakerRegistration, +}; + +/// A `MultiSignal` is a single-slot signaling primitive, which can awake `N` separate [`Receiver`]s. +/// +/// Similar to a [`Signal`](crate::signal::Signal), except `MultiSignal` allows for multiple tasks to +/// `.await` the latest value, and all receive it. +/// +/// This is similar to a [`PubSubChannel`](crate::pubsub::PubSubChannel) with a buffer size of 1, except +/// "sending" to it (calling [`MultiSignal::write`]) will immediately overwrite the previous value instead +/// of waiting for the receivers to pop the previous value. +/// +/// `MultiSignal` is useful when a single task is responsible for updating a value or "state", which multiple other +/// tasks are interested in getting notified about changes to the latest value of. It is therefore fine for +/// [`Receiver`]s to "lose" stale values. +/// +/// Anyone with a reference to the MultiSignal can update or peek the value. MultiSignals are generally declared +/// as `static`s and then borrowed as required to either [`MultiSignal::peek`] the value or obtain a [`Receiver`] +/// with [`MultiSignal::receiver`] which has async methods. +/// ``` +/// +/// use futures_executor::block_on; +/// use embassy_sync::multi_signal::MultiSignal; +/// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; +/// +/// let f = async { +/// +/// static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); +/// +/// // Obtain Receivers +/// let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); +/// let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); +/// assert!(SOME_SIGNAL.receiver().is_err()); +/// +/// SOME_SIGNAL.write(10); +/// +/// // Receive the new value +/// assert_eq!(rcv0.changed().await, 10); +/// assert_eq!(rcv1.try_changed(), Some(10)); +/// +/// // No update +/// assert_eq!(rcv0.try_changed(), None); +/// assert_eq!(rcv1.try_changed(), None); +/// +/// SOME_SIGNAL.write(20); +/// +/// // Receive new value with predicate +/// assert_eq!(rcv0.changed_and(|x|x>&10).await, 20); +/// assert_eq!(rcv1.try_changed_and(|x|x>&30), None); +/// +/// // Anyone can peek the current value +/// assert_eq!(rcv0.peek(), 20); +/// assert_eq!(rcv1.peek(), 20); +/// assert_eq!(SOME_SIGNAL.peek(), 20); +/// assert_eq!(SOME_SIGNAL.peek_and(|x|x>&30), None); +/// }; +/// block_on(f); +/// ``` +pub struct MultiSignal<'a, M: RawMutex, T: Clone, const N: usize> { + mutex: Mutex>>, + _phantom: PhantomData<&'a ()>, +} + +struct MultiSignalState { + data: T, + current_id: u64, + wakers: MultiWakerRegistration, + receiver_count: usize, +} + +#[derive(Debug)] +/// An error that can occur when a `MultiSignal` returns a `Result`. +pub enum Error { + /// The maximum number of [`Receiver`](crate::multi_signal::Receiver) has been reached. + MaximumReceiversReached, +} + +impl<'a, M: RawMutex, T: Clone, const N: usize> MultiSignal<'a, M, T, N> { + /// Create a new `MultiSignal` initialized with the given value. + pub const fn new(init: T) -> Self { + Self { + mutex: Mutex::new(RefCell::new(MultiSignalState { + data: init, + current_id: 1, + wakers: MultiWakerRegistration::new(), + receiver_count: 0, + })), + _phantom: PhantomData, + } + } + + /// Get a [`Receiver`] for the `MultiSignal`. + pub fn receiver(&'a self) -> Result, Error> { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + if s.receiver_count < N { + s.receiver_count += 1; + Ok(Receiver(Rcv::new(self))) + } else { + Err(Error::MaximumReceiversReached) + } + }) + } + + /// Update the value of the `MultiSignal`. + pub fn write(&self, data: T) { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + s.data = data; + s.current_id += 1; + s.wakers.wake(); + }) + } + + /// Peek the current value of the `MultiSignal`. + pub fn peek(&self) -> T { + self.mutex.lock(|state| state.borrow().data.clone()) + } + + /// Peek the current value of the `MultiSignal` and check if it satisfies the predicate `f`. + pub fn peek_and(&self, f: fn(&T) -> bool) -> Option { + self.mutex.lock(|state| { + let s = state.borrow(); + if f(&s.data) { + Some(s.data.clone()) + } else { + None + } + }) + } + + /// Get the ID of the current value of the `MultiSignal`. + /// This method is mostly for testing purposes. + #[allow(dead_code)] + fn get_id(&self) -> u64 { + self.mutex.lock(|state| state.borrow().current_id) + } + + /// Poll the `MultiSignal` with an optional context. + fn get_with_context(&'a self, waker: &mut Rcv<'a, M, T, N>, cx: Option<&mut Context>) -> Poll { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + match (s.current_id > waker.at_id, waker.predicate) { + (true, None) => { + waker.at_id = s.current_id; + Poll::Ready(s.data.clone()) + } + (true, Some(f)) if f(&s.data) => { + waker.at_id = s.current_id; + Poll::Ready(s.data.clone()) + } + _ => { + if let Some(cx) = cx { + s.wakers.register(cx.waker()); + } + Poll::Pending + } + } + }) + } +} + +/// A receiver is able to `.await` a changed `MultiSignal` value. +pub struct Rcv<'a, M: RawMutex, T: Clone, const N: usize> { + multi_sig: &'a MultiSignal<'a, M, T, N>, + predicate: Option bool>, + at_id: u64, +} + +// f: Option bool> +impl<'a, M: RawMutex, T: Clone, const N: usize> Rcv<'a, M, T, N> { + /// Create a new `Receiver` with a reference the given `MultiSignal`. + fn new(multi_sig: &'a MultiSignal<'a, M, T, N>) -> Self { + Self { + multi_sig, + predicate: None, + at_id: 0, + } + } + + /// Wait for a change to the value of the corresponding `MultiSignal`. + pub fn changed<'s>(&'s mut self) -> ReceiverFuture<'s, 'a, M, T, N> { + self.predicate = None; + ReceiverFuture { subscriber: self } + } + + /// Wait for a change to the value of the corresponding `MultiSignal` which matches the predicate `f`. + pub fn changed_and<'s>(&'s mut self, f: fn(&T) -> bool) -> ReceiverFuture<'s, 'a, M, T, N> { + self.predicate = Some(f); + ReceiverFuture { subscriber: self } + } + + /// Try to get a changed value of the corresponding `MultiSignal`. + pub fn try_changed(&mut self) -> Option { + self.multi_sig.mutex.lock(|state| { + let s = state.borrow(); + match s.current_id > self.at_id { + true => { + self.at_id = s.current_id; + Some(s.data.clone()) + } + false => None, + } + }) + } + + /// Try to get a changed value of the corresponding `MultiSignal` which matches the predicate `f`. + pub fn try_changed_and(&mut self, f: fn(&T) -> bool) -> Option { + self.multi_sig.mutex.lock(|state| { + let s = state.borrow(); + match s.current_id > self.at_id && f(&s.data) { + true => { + self.at_id = s.current_id; + Some(s.data.clone()) + } + false => None, + } + }) + } + + /// Peek the current value of the corresponding `MultiSignal`. + pub fn peek(&self) -> T { + self.multi_sig.peek() + } + + /// Peek the current value of the corresponding `MultiSignal` and check if it satisfies the predicate `f`. + pub fn peek_and(&self, f: fn(&T) -> bool) -> Option { + self.multi_sig.peek_and(f) + } + + /// Check if the value of the corresponding `MultiSignal` has changed. + pub fn has_changed(&mut self) -> bool { + self.multi_sig + .mutex + .lock(|state| state.borrow().current_id > self.at_id) + } +} + +/// A `Receiver` is able to `.await` a change to the corresponding [`MultiSignal`] value. +pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, M, T, N>); + +impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { + type Target = Rcv<'a, M, T, N>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// Future for the `Receiver` wait action +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct ReceiverFuture<'s, 'a, M: RawMutex, T: Clone, const N: usize> { + subscriber: &'s mut Rcv<'a, M, T, N>, +} + +impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Future for ReceiverFuture<'s, 'a, M, T, N> { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.subscriber + .multi_sig + .get_with_context(&mut self.subscriber, Some(cx)) + } +} + +impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Unpin for ReceiverFuture<'s, 'a, M, T, N> {} From 410c2d440afa2a500ef1398b5b48e746f77815bd Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Tue, 13 Feb 2024 15:37:07 +0100 Subject: [PATCH 02/15] Change import formatting --- embassy-sync/src/multi_signal.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/embassy-sync/src/multi_signal.rs b/embassy-sync/src/multi_signal.rs index db858f2695..bff7ad0486 100644 --- a/embassy-sync/src/multi_signal.rs +++ b/embassy-sync/src/multi_signal.rs @@ -1,18 +1,15 @@ //! A synchronization primitive for passing the latest value to **multiple** tasks. -use core::{ - cell::RefCell, - marker::PhantomData, - ops::{Deref, DerefMut}, - pin::Pin, - task::{Context, Poll}, -}; +use core::cell::RefCell; +use core::marker::PhantomData; +use core::ops::{Deref, DerefMut}; +use core::pin::Pin; +use core::task::{Context, Poll}; use futures_util::Future; -use crate::{ - blocking_mutex::{raw::RawMutex, Mutex}, - waitqueue::MultiWakerRegistration, -}; +use crate::blocking_mutex::raw::RawMutex; +use crate::blocking_mutex::Mutex; +use crate::waitqueue::MultiWakerRegistration; /// A `MultiSignal` is a single-slot signaling primitive, which can awake `N` separate [`Receiver`]s. /// From 37f1c9ac27b0542fdf404392e9bb265fa8ec41d3 Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Tue, 13 Feb 2024 23:14:16 +0100 Subject: [PATCH 03/15] Removed unused lifetime, change most fn -> FnMut --- embassy-sync/src/multi_signal.rs | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/embassy-sync/src/multi_signal.rs b/embassy-sync/src/multi_signal.rs index bff7ad0486..5f724c76b3 100644 --- a/embassy-sync/src/multi_signal.rs +++ b/embassy-sync/src/multi_signal.rs @@ -1,6 +1,5 @@ //! A synchronization primitive for passing the latest value to **multiple** tasks. use core::cell::RefCell; -use core::marker::PhantomData; use core::ops::{Deref, DerefMut}; use core::pin::Pin; use core::task::{Context, Poll}; @@ -66,9 +65,8 @@ use crate::waitqueue::MultiWakerRegistration; /// }; /// block_on(f); /// ``` -pub struct MultiSignal<'a, M: RawMutex, T: Clone, const N: usize> { +pub struct MultiSignal { mutex: Mutex>>, - _phantom: PhantomData<&'a ()>, } struct MultiSignalState { @@ -85,7 +83,7 @@ pub enum Error { MaximumReceiversReached, } -impl<'a, M: RawMutex, T: Clone, const N: usize> MultiSignal<'a, M, T, N> { +impl<'a, M: RawMutex, T: Clone, const N: usize> MultiSignal { /// Create a new `MultiSignal` initialized with the given value. pub const fn new(init: T) -> Self { Self { @@ -95,7 +93,6 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> MultiSignal<'a, M, T, N> { wakers: MultiWakerRegistration::new(), receiver_count: 0, })), - _phantom: PhantomData, } } @@ -128,7 +125,7 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> MultiSignal<'a, M, T, N> { } /// Peek the current value of the `MultiSignal` and check if it satisfies the predicate `f`. - pub fn peek_and(&self, f: fn(&T) -> bool) -> Option { + pub fn peek_and(&self, mut f: impl FnMut(&T) -> bool) -> Option { self.mutex.lock(|state| { let s = state.borrow(); if f(&s.data) { @@ -147,16 +144,16 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> MultiSignal<'a, M, T, N> { } /// Poll the `MultiSignal` with an optional context. - fn get_with_context(&'a self, waker: &mut Rcv<'a, M, T, N>, cx: Option<&mut Context>) -> Poll { + fn get_with_context(&self, rcv: &mut Rcv<'a, M, T, N>, cx: Option<&mut Context>) -> Poll { self.mutex.lock(|state| { let mut s = state.borrow_mut(); - match (s.current_id > waker.at_id, waker.predicate) { + match (s.current_id > rcv.at_id, rcv.predicate) { (true, None) => { - waker.at_id = s.current_id; + rcv.at_id = s.current_id; Poll::Ready(s.data.clone()) } (true, Some(f)) if f(&s.data) => { - waker.at_id = s.current_id; + rcv.at_id = s.current_id; Poll::Ready(s.data.clone()) } _ => { @@ -172,7 +169,7 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> MultiSignal<'a, M, T, N> { /// A receiver is able to `.await` a changed `MultiSignal` value. pub struct Rcv<'a, M: RawMutex, T: Clone, const N: usize> { - multi_sig: &'a MultiSignal<'a, M, T, N>, + multi_sig: &'a MultiSignal, predicate: Option bool>, at_id: u64, } @@ -180,7 +177,7 @@ pub struct Rcv<'a, M: RawMutex, T: Clone, const N: usize> { // f: Option bool> impl<'a, M: RawMutex, T: Clone, const N: usize> Rcv<'a, M, T, N> { /// Create a new `Receiver` with a reference the given `MultiSignal`. - fn new(multi_sig: &'a MultiSignal<'a, M, T, N>) -> Self { + fn new(multi_sig: &'a MultiSignal) -> Self { Self { multi_sig, predicate: None, @@ -195,6 +192,7 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Rcv<'a, M, T, N> { } /// Wait for a change to the value of the corresponding `MultiSignal` which matches the predicate `f`. + // TODO: How do we make this work with a FnMut closure? pub fn changed_and<'s>(&'s mut self, f: fn(&T) -> bool) -> ReceiverFuture<'s, 'a, M, T, N> { self.predicate = Some(f); ReceiverFuture { subscriber: self } @@ -215,7 +213,7 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Rcv<'a, M, T, N> { } /// Try to get a changed value of the corresponding `MultiSignal` which matches the predicate `f`. - pub fn try_changed_and(&mut self, f: fn(&T) -> bool) -> Option { + pub fn try_changed_and(&mut self, mut f: impl FnMut(&T) -> bool) -> Option { self.multi_sig.mutex.lock(|state| { let s = state.borrow(); match s.current_id > self.at_id && f(&s.data) { @@ -234,7 +232,7 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Rcv<'a, M, T, N> { } /// Peek the current value of the corresponding `MultiSignal` and check if it satisfies the predicate `f`. - pub fn peek_and(&self, f: fn(&T) -> bool) -> Option { + pub fn peek_and(&self, f: impl FnMut(&T) -> bool) -> Option { self.multi_sig.peek_and(f) } From 24a4379832d387754d407b77ff7aac5e55401eb3 Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Wed, 14 Feb 2024 01:23:11 +0100 Subject: [PATCH 04/15] Got closures to work in async, added bunch of tests --- embassy-sync/src/multi_signal.rs | 340 ++++++++++++++++++++++++++----- 1 file changed, 292 insertions(+), 48 deletions(-) diff --git a/embassy-sync/src/multi_signal.rs b/embassy-sync/src/multi_signal.rs index 5f724c76b3..1481dc8f89 100644 --- a/embassy-sync/src/multi_signal.rs +++ b/embassy-sync/src/multi_signal.rs @@ -97,7 +97,7 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> MultiSignal { } /// Get a [`Receiver`] for the `MultiSignal`. - pub fn receiver(&'a self) -> Result, Error> { + pub fn receiver<'s>(&'a self) -> Result, Error> { self.mutex.lock(|state| { let mut s = state.borrow_mut(); if s.receiver_count < N { @@ -142,60 +142,36 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> MultiSignal { fn get_id(&self) -> u64 { self.mutex.lock(|state| state.borrow().current_id) } - - /// Poll the `MultiSignal` with an optional context. - fn get_with_context(&self, rcv: &mut Rcv<'a, M, T, N>, cx: Option<&mut Context>) -> Poll { - self.mutex.lock(|state| { - let mut s = state.borrow_mut(); - match (s.current_id > rcv.at_id, rcv.predicate) { - (true, None) => { - rcv.at_id = s.current_id; - Poll::Ready(s.data.clone()) - } - (true, Some(f)) if f(&s.data) => { - rcv.at_id = s.current_id; - Poll::Ready(s.data.clone()) - } - _ => { - if let Some(cx) = cx { - s.wakers.register(cx.waker()); - } - Poll::Pending - } - } - }) - } } /// A receiver is able to `.await` a changed `MultiSignal` value. pub struct Rcv<'a, M: RawMutex, T: Clone, const N: usize> { multi_sig: &'a MultiSignal, - predicate: Option bool>, at_id: u64, } -// f: Option bool> -impl<'a, M: RawMutex, T: Clone, const N: usize> Rcv<'a, M, T, N> { +impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Rcv<'a, M, T, N> { /// Create a new `Receiver` with a reference the given `MultiSignal`. fn new(multi_sig: &'a MultiSignal) -> Self { - Self { - multi_sig, - predicate: None, - at_id: 0, - } + Self { multi_sig, at_id: 0 } } /// Wait for a change to the value of the corresponding `MultiSignal`. - pub fn changed<'s>(&'s mut self) -> ReceiverFuture<'s, 'a, M, T, N> { - self.predicate = None; - ReceiverFuture { subscriber: self } + pub async fn changed(&mut self) -> T { + ReceiverWaitFuture { subscriber: self }.await } /// Wait for a change to the value of the corresponding `MultiSignal` which matches the predicate `f`. // TODO: How do we make this work with a FnMut closure? - pub fn changed_and<'s>(&'s mut self, f: fn(&T) -> bool) -> ReceiverFuture<'s, 'a, M, T, N> { - self.predicate = Some(f); - ReceiverFuture { subscriber: self } + pub async fn changed_and(&mut self, f: F) -> T + where + F: FnMut(&T) -> bool, + { + ReceiverPredFuture { + subscriber: self, + predicate: f, + } + .await } /// Try to get a changed value of the corresponding `MultiSignal`. @@ -213,7 +189,10 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Rcv<'a, M, T, N> { } /// Try to get a changed value of the corresponding `MultiSignal` which matches the predicate `f`. - pub fn try_changed_and(&mut self, mut f: impl FnMut(&T) -> bool) -> Option { + pub fn try_changed_and(&mut self, mut f: F) -> Option + where + F: FnMut(&T) -> bool, + { self.multi_sig.mutex.lock(|state| { let s = state.borrow(); match s.current_id > self.at_id && f(&s.data) { @@ -232,7 +211,10 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Rcv<'a, M, T, N> { } /// Peek the current value of the corresponding `MultiSignal` and check if it satisfies the predicate `f`. - pub fn peek_and(&self, f: impl FnMut(&T) -> bool) -> Option { + pub fn peek_and(&self, f: F) -> Option + where + F: FnMut(&T) -> bool, + { self.multi_sig.peek_and(f) } @@ -247,7 +229,7 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Rcv<'a, M, T, N> { /// A `Receiver` is able to `.await` a change to the corresponding [`MultiSignal`] value. pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, M, T, N>); -impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { +impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { type Target = Rcv<'a, M, T, N>; fn deref(&self) -> &Self::Target { @@ -255,7 +237,7 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> } } -impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> { +impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } @@ -263,18 +245,280 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, /// Future for the `Receiver` wait action #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct ReceiverFuture<'s, 'a, M: RawMutex, T: Clone, const N: usize> { +pub struct ReceiverWaitFuture<'s, 'a, M: RawMutex, T: Clone, const N: usize> { subscriber: &'s mut Rcv<'a, M, T, N>, } -impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Future for ReceiverFuture<'s, 'a, M, T, N> { +impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Unpin for ReceiverWaitFuture<'s, 'a, M, T, N> {} +impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Future for ReceiverWaitFuture<'s, 'a, M, T, N> { type Output = T; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.subscriber - .multi_sig - .get_with_context(&mut self.subscriber, Some(cx)) + self.get_with_context(Some(cx)) + } +} + +impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> ReceiverWaitFuture<'s, 'a, M, T, N> { + /// Poll the `MultiSignal` with an optional context. + fn get_with_context(&mut self, cx: Option<&mut Context>) -> Poll { + self.subscriber.multi_sig.mutex.lock(|state| { + let mut s = state.borrow_mut(); + match s.current_id > self.subscriber.at_id { + true => { + self.subscriber.at_id = s.current_id; + Poll::Ready(s.data.clone()) + } + _ => { + if let Some(cx) = cx { + s.wakers.register(cx.waker()); + } + Poll::Pending + } + } + }) + } +} + +/// Future for the `Receiver` wait action, with the ability to filter the value with a predicate. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct ReceiverPredFuture<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&'a T) -> bool, const N: usize> { + subscriber: &'s mut Rcv<'a, M, T, N>, + predicate: F, +} + +impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> Unpin for ReceiverPredFuture<'s, 'a, M, T, F, N> {} +impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> Future for ReceiverPredFuture<'s, 'a, M, T, F, N>{ + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.get_with_context_pred(Some(cx)) + } +} + +impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> ReceiverPredFuture<'s, 'a, M, T, F, N> { + /// Poll the `MultiSignal` with an optional context. + fn get_with_context_pred(&mut self, cx: Option<&mut Context>) -> Poll { + self.subscriber.multi_sig.mutex.lock(|state| { + let mut s = state.borrow_mut(); + match s.current_id > self.subscriber.at_id { + true if (self.predicate)(&s.data) => { + self.subscriber.at_id = s.current_id; + Poll::Ready(s.data.clone()) + } + _ => { + if let Some(cx) = cx { + s.wakers.register(cx.waker()); + } + Poll::Pending + } + } + }) } } -impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Unpin for ReceiverFuture<'s, 'a, M, T, N> {} +#[cfg(test)] +mod tests { + use super::*; + use crate::blocking_mutex::raw::CriticalSectionRawMutex; + use futures_executor::block_on; + + #[test] + fn multiple_writes() { + let f = async { + static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); + + // Obtain Receivers + let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); + let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); + + SOME_SIGNAL.write(10); + + // Receive the new value + assert_eq!(rcv0.changed().await, 10); + assert_eq!(rcv1.changed().await, 10); + + // No update + assert_eq!(rcv0.try_changed(), None); + assert_eq!(rcv1.try_changed(), None); + + SOME_SIGNAL.write(20); + + assert_eq!(rcv0.changed().await, 20); + assert_eq!(rcv1.changed().await, 20); + }; + block_on(f); + } + + #[test] + fn max_receivers() { + let f = async { + static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); + + // Obtain Receivers + let _ = SOME_SIGNAL.receiver().unwrap(); + let _ = SOME_SIGNAL.receiver().unwrap(); + assert!(SOME_SIGNAL.receiver().is_err()); + }; + block_on(f); + } + + // Really weird edge case, but it's possible to have a receiver that never gets a value. + #[test] + fn receive_initial() { + let f = async { + static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); + + // Obtain Receivers + let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); + let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); + + assert_eq!(rcv0.try_changed(), Some(0)); + assert_eq!(rcv1.try_changed(), Some(0)); + + assert_eq!(rcv0.try_changed(), None); + assert_eq!(rcv1.try_changed(), None); + }; + block_on(f); + } + + #[test] + fn count_ids() { + let f = async { + static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); + + // Obtain Receivers + let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); + let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); + + SOME_SIGNAL.write(10); + + assert_eq!(rcv0.changed().await, 10); + assert_eq!(rcv1.changed().await, 10); + + assert_eq!(rcv0.try_changed(), None); + assert_eq!(rcv1.try_changed(), None); + + SOME_SIGNAL.write(20); + SOME_SIGNAL.write(20); + SOME_SIGNAL.write(20); + + assert_eq!(rcv0.changed().await, 20); + assert_eq!(rcv1.changed().await, 20); + + assert_eq!(rcv0.try_changed(), None); + assert_eq!(rcv1.try_changed(), None); + + assert_eq!(SOME_SIGNAL.get_id(), 5); + }; + block_on(f); + } + + #[test] + fn peek_still_await() { + let f = async { + static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); + + // Obtain Receivers + let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); + let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); + + SOME_SIGNAL.write(10); + + assert_eq!(rcv0.peek(), 10); + assert_eq!(rcv1.peek(), 10); + + assert_eq!(rcv0.changed().await, 10); + assert_eq!(rcv1.changed().await, 10); + }; + block_on(f); + } + + #[test] + fn predicate() { + let f = async { + static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); + + // Obtain Receivers + let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); + let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); + + SOME_SIGNAL.write(20); + + assert_eq!(rcv0.changed_and(|x| x > &10).await, 20); + assert_eq!(rcv1.try_changed_and(|x| x > &30), None); + }; + block_on(f); + } + + #[test] + fn mutable_predicate() { + let f = async { + static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); + + // Obtain Receivers + let mut rcv = SOME_SIGNAL.receiver().unwrap(); + + SOME_SIGNAL.write(10); + + let mut largest = 0; + let mut predicate = |x: &u8| { + if *x > largest { + largest = *x; + } + true + }; + + assert_eq!(rcv.changed_and(&mut predicate).await, 10); + + SOME_SIGNAL.write(20); + + assert_eq!(rcv.changed_and(&mut predicate).await, 20); + + SOME_SIGNAL.write(5); + + assert_eq!(rcv.changed_and(&mut predicate).await, 5); + + assert_eq!(largest, 20) + }; + block_on(f); + } + + #[test] + fn peek_and() { + let f = async { + static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); + + // Obtain Receivers + let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); + let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); + + SOME_SIGNAL.write(20); + + assert_eq!(rcv0.peek_and(|x| x > &10), Some(20)); + assert_eq!(rcv1.peek_and(|x| x > &30), None); + + assert_eq!(rcv0.changed().await, 20); + assert_eq!(rcv1.changed().await, 20); + }; + block_on(f); + } + + #[test] + fn peek_with_static() { + let f = async { + static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); + + // Obtain Receivers + let rcv0 = SOME_SIGNAL.receiver().unwrap(); + let rcv1 = SOME_SIGNAL.receiver().unwrap(); + + SOME_SIGNAL.write(20); + + assert_eq!(rcv0.peek(), 20); + assert_eq!(rcv1.peek(), 20); + assert_eq!(SOME_SIGNAL.peek(), 20); + assert_eq!(SOME_SIGNAL.peek_and(|x| x > &30), None); + }; + block_on(f); + } +} From 2f58d1968a7310335a0dac4d947c6972a7707ed5 Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Wed, 14 Feb 2024 01:27:48 +0100 Subject: [PATCH 05/15] Updated formatting --- embassy-sync/src/multi_signal.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/embassy-sync/src/multi_signal.rs b/embassy-sync/src/multi_signal.rs index 1481dc8f89..ff9f72f2e0 100644 --- a/embassy-sync/src/multi_signal.rs +++ b/embassy-sync/src/multi_signal.rs @@ -162,7 +162,6 @@ impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Rcv<'a, M, T, N> { } /// Wait for a change to the value of the corresponding `MultiSignal` which matches the predicate `f`. - // TODO: How do we make this work with a FnMut closure? pub async fn changed_and(&mut self, f: F) -> T where F: FnMut(&T) -> bool, @@ -286,8 +285,13 @@ pub struct ReceiverPredFuture<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&'a T) -> predicate: F, } -impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> Unpin for ReceiverPredFuture<'s, 'a, M, T, F, N> {} -impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> Future for ReceiverPredFuture<'s, 'a, M, T, F, N>{ +impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> Unpin + for ReceiverPredFuture<'s, 'a, M, T, F, N> +{ +} +impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> Future + for ReceiverPredFuture<'s, 'a, M, T, F, N> +{ type Output = T; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -318,9 +322,10 @@ impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> Receiv #[cfg(test)] mod tests { + use futures_executor::block_on; + use super::*; use crate::blocking_mutex::raw::CriticalSectionRawMutex; - use futures_executor::block_on; #[test] fn multiple_writes() { From 6defb4fed98432dee948634f3b2001cb4ea7ec5b Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Wed, 28 Feb 2024 20:59:38 +0100 Subject: [PATCH 06/15] Changed name to `Watch`, added `DynReceiver`, `get`-method and more reworks. --- embassy-sync/src/lib.rs | 2 +- embassy-sync/src/multi_signal.rs | 529 ------------------------------- embassy-sync/src/watch.rs | 515 ++++++++++++++++++++++++++++++ 3 files changed, 516 insertions(+), 530 deletions(-) delete mode 100644 embassy-sync/src/multi_signal.rs create mode 100644 embassy-sync/src/watch.rs diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index f029855646..8a69541a56 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -12,11 +12,11 @@ mod ring_buffer; pub mod blocking_mutex; pub mod channel; -pub mod multi_signal; pub mod mutex; pub mod pipe; pub mod priority_channel; pub mod pubsub; pub mod signal; pub mod waitqueue; +pub mod watch; pub mod zerocopy_channel; diff --git a/embassy-sync/src/multi_signal.rs b/embassy-sync/src/multi_signal.rs deleted file mode 100644 index ff9f72f2e0..0000000000 --- a/embassy-sync/src/multi_signal.rs +++ /dev/null @@ -1,529 +0,0 @@ -//! A synchronization primitive for passing the latest value to **multiple** tasks. -use core::cell::RefCell; -use core::ops::{Deref, DerefMut}; -use core::pin::Pin; -use core::task::{Context, Poll}; - -use futures_util::Future; - -use crate::blocking_mutex::raw::RawMutex; -use crate::blocking_mutex::Mutex; -use crate::waitqueue::MultiWakerRegistration; - -/// A `MultiSignal` is a single-slot signaling primitive, which can awake `N` separate [`Receiver`]s. -/// -/// Similar to a [`Signal`](crate::signal::Signal), except `MultiSignal` allows for multiple tasks to -/// `.await` the latest value, and all receive it. -/// -/// This is similar to a [`PubSubChannel`](crate::pubsub::PubSubChannel) with a buffer size of 1, except -/// "sending" to it (calling [`MultiSignal::write`]) will immediately overwrite the previous value instead -/// of waiting for the receivers to pop the previous value. -/// -/// `MultiSignal` is useful when a single task is responsible for updating a value or "state", which multiple other -/// tasks are interested in getting notified about changes to the latest value of. It is therefore fine for -/// [`Receiver`]s to "lose" stale values. -/// -/// Anyone with a reference to the MultiSignal can update or peek the value. MultiSignals are generally declared -/// as `static`s and then borrowed as required to either [`MultiSignal::peek`] the value or obtain a [`Receiver`] -/// with [`MultiSignal::receiver`] which has async methods. -/// ``` -/// -/// use futures_executor::block_on; -/// use embassy_sync::multi_signal::MultiSignal; -/// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; -/// -/// let f = async { -/// -/// static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); -/// -/// // Obtain Receivers -/// let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); -/// let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); -/// assert!(SOME_SIGNAL.receiver().is_err()); -/// -/// SOME_SIGNAL.write(10); -/// -/// // Receive the new value -/// assert_eq!(rcv0.changed().await, 10); -/// assert_eq!(rcv1.try_changed(), Some(10)); -/// -/// // No update -/// assert_eq!(rcv0.try_changed(), None); -/// assert_eq!(rcv1.try_changed(), None); -/// -/// SOME_SIGNAL.write(20); -/// -/// // Receive new value with predicate -/// assert_eq!(rcv0.changed_and(|x|x>&10).await, 20); -/// assert_eq!(rcv1.try_changed_and(|x|x>&30), None); -/// -/// // Anyone can peek the current value -/// assert_eq!(rcv0.peek(), 20); -/// assert_eq!(rcv1.peek(), 20); -/// assert_eq!(SOME_SIGNAL.peek(), 20); -/// assert_eq!(SOME_SIGNAL.peek_and(|x|x>&30), None); -/// }; -/// block_on(f); -/// ``` -pub struct MultiSignal { - mutex: Mutex>>, -} - -struct MultiSignalState { - data: T, - current_id: u64, - wakers: MultiWakerRegistration, - receiver_count: usize, -} - -#[derive(Debug)] -/// An error that can occur when a `MultiSignal` returns a `Result`. -pub enum Error { - /// The maximum number of [`Receiver`](crate::multi_signal::Receiver) has been reached. - MaximumReceiversReached, -} - -impl<'a, M: RawMutex, T: Clone, const N: usize> MultiSignal { - /// Create a new `MultiSignal` initialized with the given value. - pub const fn new(init: T) -> Self { - Self { - mutex: Mutex::new(RefCell::new(MultiSignalState { - data: init, - current_id: 1, - wakers: MultiWakerRegistration::new(), - receiver_count: 0, - })), - } - } - - /// Get a [`Receiver`] for the `MultiSignal`. - pub fn receiver<'s>(&'a self) -> Result, Error> { - self.mutex.lock(|state| { - let mut s = state.borrow_mut(); - if s.receiver_count < N { - s.receiver_count += 1; - Ok(Receiver(Rcv::new(self))) - } else { - Err(Error::MaximumReceiversReached) - } - }) - } - - /// Update the value of the `MultiSignal`. - pub fn write(&self, data: T) { - self.mutex.lock(|state| { - let mut s = state.borrow_mut(); - s.data = data; - s.current_id += 1; - s.wakers.wake(); - }) - } - - /// Peek the current value of the `MultiSignal`. - pub fn peek(&self) -> T { - self.mutex.lock(|state| state.borrow().data.clone()) - } - - /// Peek the current value of the `MultiSignal` and check if it satisfies the predicate `f`. - pub fn peek_and(&self, mut f: impl FnMut(&T) -> bool) -> Option { - self.mutex.lock(|state| { - let s = state.borrow(); - if f(&s.data) { - Some(s.data.clone()) - } else { - None - } - }) - } - - /// Get the ID of the current value of the `MultiSignal`. - /// This method is mostly for testing purposes. - #[allow(dead_code)] - fn get_id(&self) -> u64 { - self.mutex.lock(|state| state.borrow().current_id) - } -} - -/// A receiver is able to `.await` a changed `MultiSignal` value. -pub struct Rcv<'a, M: RawMutex, T: Clone, const N: usize> { - multi_sig: &'a MultiSignal, - at_id: u64, -} - -impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Rcv<'a, M, T, N> { - /// Create a new `Receiver` with a reference the given `MultiSignal`. - fn new(multi_sig: &'a MultiSignal) -> Self { - Self { multi_sig, at_id: 0 } - } - - /// Wait for a change to the value of the corresponding `MultiSignal`. - pub async fn changed(&mut self) -> T { - ReceiverWaitFuture { subscriber: self }.await - } - - /// Wait for a change to the value of the corresponding `MultiSignal` which matches the predicate `f`. - pub async fn changed_and(&mut self, f: F) -> T - where - F: FnMut(&T) -> bool, - { - ReceiverPredFuture { - subscriber: self, - predicate: f, - } - .await - } - - /// Try to get a changed value of the corresponding `MultiSignal`. - pub fn try_changed(&mut self) -> Option { - self.multi_sig.mutex.lock(|state| { - let s = state.borrow(); - match s.current_id > self.at_id { - true => { - self.at_id = s.current_id; - Some(s.data.clone()) - } - false => None, - } - }) - } - - /// Try to get a changed value of the corresponding `MultiSignal` which matches the predicate `f`. - pub fn try_changed_and(&mut self, mut f: F) -> Option - where - F: FnMut(&T) -> bool, - { - self.multi_sig.mutex.lock(|state| { - let s = state.borrow(); - match s.current_id > self.at_id && f(&s.data) { - true => { - self.at_id = s.current_id; - Some(s.data.clone()) - } - false => None, - } - }) - } - - /// Peek the current value of the corresponding `MultiSignal`. - pub fn peek(&self) -> T { - self.multi_sig.peek() - } - - /// Peek the current value of the corresponding `MultiSignal` and check if it satisfies the predicate `f`. - pub fn peek_and(&self, f: F) -> Option - where - F: FnMut(&T) -> bool, - { - self.multi_sig.peek_and(f) - } - - /// Check if the value of the corresponding `MultiSignal` has changed. - pub fn has_changed(&mut self) -> bool { - self.multi_sig - .mutex - .lock(|state| state.borrow().current_id > self.at_id) - } -} - -/// A `Receiver` is able to `.await` a change to the corresponding [`MultiSignal`] value. -pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, M, T, N>); - -impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { - type Target = Rcv<'a, M, T, N>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// Future for the `Receiver` wait action -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct ReceiverWaitFuture<'s, 'a, M: RawMutex, T: Clone, const N: usize> { - subscriber: &'s mut Rcv<'a, M, T, N>, -} - -impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Unpin for ReceiverWaitFuture<'s, 'a, M, T, N> {} -impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Future for ReceiverWaitFuture<'s, 'a, M, T, N> { - type Output = T; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.get_with_context(Some(cx)) - } -} - -impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> ReceiverWaitFuture<'s, 'a, M, T, N> { - /// Poll the `MultiSignal` with an optional context. - fn get_with_context(&mut self, cx: Option<&mut Context>) -> Poll { - self.subscriber.multi_sig.mutex.lock(|state| { - let mut s = state.borrow_mut(); - match s.current_id > self.subscriber.at_id { - true => { - self.subscriber.at_id = s.current_id; - Poll::Ready(s.data.clone()) - } - _ => { - if let Some(cx) = cx { - s.wakers.register(cx.waker()); - } - Poll::Pending - } - } - }) - } -} - -/// Future for the `Receiver` wait action, with the ability to filter the value with a predicate. -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct ReceiverPredFuture<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&'a T) -> bool, const N: usize> { - subscriber: &'s mut Rcv<'a, M, T, N>, - predicate: F, -} - -impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> Unpin - for ReceiverPredFuture<'s, 'a, M, T, F, N> -{ -} -impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> Future - for ReceiverPredFuture<'s, 'a, M, T, F, N> -{ - type Output = T; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.get_with_context_pred(Some(cx)) - } -} - -impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> ReceiverPredFuture<'s, 'a, M, T, F, N> { - /// Poll the `MultiSignal` with an optional context. - fn get_with_context_pred(&mut self, cx: Option<&mut Context>) -> Poll { - self.subscriber.multi_sig.mutex.lock(|state| { - let mut s = state.borrow_mut(); - match s.current_id > self.subscriber.at_id { - true if (self.predicate)(&s.data) => { - self.subscriber.at_id = s.current_id; - Poll::Ready(s.data.clone()) - } - _ => { - if let Some(cx) = cx { - s.wakers.register(cx.waker()); - } - Poll::Pending - } - } - }) - } -} - -#[cfg(test)] -mod tests { - use futures_executor::block_on; - - use super::*; - use crate::blocking_mutex::raw::CriticalSectionRawMutex; - - #[test] - fn multiple_writes() { - let f = async { - static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); - - // Obtain Receivers - let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); - let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); - - SOME_SIGNAL.write(10); - - // Receive the new value - assert_eq!(rcv0.changed().await, 10); - assert_eq!(rcv1.changed().await, 10); - - // No update - assert_eq!(rcv0.try_changed(), None); - assert_eq!(rcv1.try_changed(), None); - - SOME_SIGNAL.write(20); - - assert_eq!(rcv0.changed().await, 20); - assert_eq!(rcv1.changed().await, 20); - }; - block_on(f); - } - - #[test] - fn max_receivers() { - let f = async { - static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); - - // Obtain Receivers - let _ = SOME_SIGNAL.receiver().unwrap(); - let _ = SOME_SIGNAL.receiver().unwrap(); - assert!(SOME_SIGNAL.receiver().is_err()); - }; - block_on(f); - } - - // Really weird edge case, but it's possible to have a receiver that never gets a value. - #[test] - fn receive_initial() { - let f = async { - static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); - - // Obtain Receivers - let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); - let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); - - assert_eq!(rcv0.try_changed(), Some(0)); - assert_eq!(rcv1.try_changed(), Some(0)); - - assert_eq!(rcv0.try_changed(), None); - assert_eq!(rcv1.try_changed(), None); - }; - block_on(f); - } - - #[test] - fn count_ids() { - let f = async { - static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); - - // Obtain Receivers - let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); - let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); - - SOME_SIGNAL.write(10); - - assert_eq!(rcv0.changed().await, 10); - assert_eq!(rcv1.changed().await, 10); - - assert_eq!(rcv0.try_changed(), None); - assert_eq!(rcv1.try_changed(), None); - - SOME_SIGNAL.write(20); - SOME_SIGNAL.write(20); - SOME_SIGNAL.write(20); - - assert_eq!(rcv0.changed().await, 20); - assert_eq!(rcv1.changed().await, 20); - - assert_eq!(rcv0.try_changed(), None); - assert_eq!(rcv1.try_changed(), None); - - assert_eq!(SOME_SIGNAL.get_id(), 5); - }; - block_on(f); - } - - #[test] - fn peek_still_await() { - let f = async { - static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); - - // Obtain Receivers - let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); - let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); - - SOME_SIGNAL.write(10); - - assert_eq!(rcv0.peek(), 10); - assert_eq!(rcv1.peek(), 10); - - assert_eq!(rcv0.changed().await, 10); - assert_eq!(rcv1.changed().await, 10); - }; - block_on(f); - } - - #[test] - fn predicate() { - let f = async { - static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); - - // Obtain Receivers - let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); - let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); - - SOME_SIGNAL.write(20); - - assert_eq!(rcv0.changed_and(|x| x > &10).await, 20); - assert_eq!(rcv1.try_changed_and(|x| x > &30), None); - }; - block_on(f); - } - - #[test] - fn mutable_predicate() { - let f = async { - static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); - - // Obtain Receivers - let mut rcv = SOME_SIGNAL.receiver().unwrap(); - - SOME_SIGNAL.write(10); - - let mut largest = 0; - let mut predicate = |x: &u8| { - if *x > largest { - largest = *x; - } - true - }; - - assert_eq!(rcv.changed_and(&mut predicate).await, 10); - - SOME_SIGNAL.write(20); - - assert_eq!(rcv.changed_and(&mut predicate).await, 20); - - SOME_SIGNAL.write(5); - - assert_eq!(rcv.changed_and(&mut predicate).await, 5); - - assert_eq!(largest, 20) - }; - block_on(f); - } - - #[test] - fn peek_and() { - let f = async { - static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); - - // Obtain Receivers - let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); - let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); - - SOME_SIGNAL.write(20); - - assert_eq!(rcv0.peek_and(|x| x > &10), Some(20)); - assert_eq!(rcv1.peek_and(|x| x > &30), None); - - assert_eq!(rcv0.changed().await, 20); - assert_eq!(rcv1.changed().await, 20); - }; - block_on(f); - } - - #[test] - fn peek_with_static() { - let f = async { - static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); - - // Obtain Receivers - let rcv0 = SOME_SIGNAL.receiver().unwrap(); - let rcv1 = SOME_SIGNAL.receiver().unwrap(); - - SOME_SIGNAL.write(20); - - assert_eq!(rcv0.peek(), 20); - assert_eq!(rcv1.peek(), 20); - assert_eq!(SOME_SIGNAL.peek(), 20); - assert_eq!(SOME_SIGNAL.peek_and(|x| x > &30), None); - }; - block_on(f); - } -} diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs new file mode 100644 index 0000000000..7e5e927413 --- /dev/null +++ b/embassy-sync/src/watch.rs @@ -0,0 +1,515 @@ +//! A synchronization primitive for passing the latest value to **multiple** tasks. + +use core::cell::RefCell; +use core::future::poll_fn; +use core::marker::PhantomData; +use core::ops::{Deref, DerefMut}; +use core::task::{Context, Poll}; + +use crate::blocking_mutex::raw::RawMutex; +use crate::blocking_mutex::Mutex; +use crate::waitqueue::MultiWakerRegistration; + +/// A `Watch` is a single-slot signaling primitive, which can awake `N` up to separate [`Receiver`]s. +/// +/// Similar to a [`Signal`](crate::signal::Signal), except `Watch` allows for multiple tasks to +/// `.await` the latest value, and all receive it. +/// +/// This is similar to a [`PubSubChannel`](crate::pubsub::PubSubChannel) with a buffer size of 1, except +/// "sending" to it (calling [`Watch::write`]) will immediately overwrite the previous value instead +/// of waiting for the receivers to pop the previous value. +/// +/// `Watch` is useful when a single task is responsible for updating a value or "state", which multiple other +/// tasks are interested in getting notified about changes to the latest value of. It is therefore fine for +/// [`Receiver`]s to "lose" stale values. +/// +/// Anyone with a reference to the Watch can update or peek the value. Watches are generally declared +/// as `static`s and then borrowed as required to either [`Watch::peek`] the value or obtain a [`Receiver`] +/// with [`Watch::receiver`] which has async methods. +/// ``` +/// +/// use futures_executor::block_on; +/// use embassy_sync::watch::Watch; +/// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; +/// +/// let f = async { +/// +/// static WATCH: Watch = Watch::new(); +/// +/// // Obtain Receivers +/// let mut rcv0 = WATCH.receiver().unwrap(); +/// let mut rcv1 = WATCH.receiver().unwrap(); +/// assert!(WATCH.receiver().is_err()); +/// +/// assert_eq!(rcv1.try_changed(), None); +/// +/// WATCH.write(10); +/// assert_eq!(WATCH.try_peek(), Some(10)); +/// +/// +/// // Receive the new value +/// assert_eq!(rcv0.changed().await, 10); +/// assert_eq!(rcv1.try_changed(), Some(10)); +/// +/// // No update +/// assert_eq!(rcv0.try_changed(), None); +/// assert_eq!(rcv1.try_changed(), None); +/// +/// WATCH.write(20); +/// +/// // Defference `between` peek `get`. +/// assert_eq!(rcv0.peek().await, 20); +/// assert_eq!(rcv1.get().await, 20); +/// +/// assert_eq!(rcv0.try_changed(), Some(20)); +/// assert_eq!(rcv1.try_changed(), None); +/// +/// }; +/// block_on(f); +/// ``` +pub struct Watch { + mutex: Mutex>>, +} + +struct WatchState { + data: Option, + current_id: u64, + wakers: MultiWakerRegistration, + receiver_count: usize, +} + +/// A trait representing the 'inner' behavior of the `Watch`. +pub trait WatchBehavior { + /// Poll the `Watch` for the current value, **without** making it as seen. + fn inner_poll_peek(&self, cx: &mut Context<'_>) -> Poll; + + /// Tries to peek the value of the `Watch`, **without** marking it as seen. + fn inner_try_peek(&self) -> Option; + + /// Poll the `Watch` for the current value, making it as seen. + fn inner_poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll; + + /// Tries to get the value of the `Watch`, marking it as seen. + fn inner_try_get(&self, id: &mut u64) -> Option; + + /// Poll the `Watch` for a changed value, marking it as seen. + fn inner_poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll; + + /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. + fn inner_try_changed(&self, id: &mut u64) -> Option; + + /// Checks if the `Watch` is been initialized with a value. + fn inner_contains_value(&self) -> bool; +} + +impl WatchBehavior for Watch { + fn inner_poll_peek(&self, cx: &mut Context<'_>) -> Poll { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + match &s.data { + Some(data) => Poll::Ready(data.clone()), + None => { + s.wakers.register(cx.waker()); + Poll::Pending + } + } + }) + } + + fn inner_try_peek(&self) -> Option { + self.mutex.lock(|state| state.borrow().data.clone()) + } + + fn inner_poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + match &s.data { + Some(data) => { + *id = s.current_id; + Poll::Ready(data.clone()) + } + None => { + s.wakers.register(cx.waker()); + Poll::Pending + } + } + }) + } + + fn inner_try_get(&self, id: &mut u64) -> Option { + self.mutex.lock(|state| { + let s = state.borrow(); + *id = s.current_id; + state.borrow().data.clone() + }) + } + + fn inner_poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + match (&s.data, s.current_id > *id) { + (Some(data), true) => { + *id = s.current_id; + Poll::Ready(data.clone()) + } + _ => { + s.wakers.register(cx.waker()); + Poll::Pending + } + } + }) + } + + fn inner_try_changed(&self, id: &mut u64) -> Option { + self.mutex.lock(|state| { + let s = state.borrow(); + match s.current_id > *id { + true => { + *id = s.current_id; + state.borrow().data.clone() + } + false => None, + } + }) + } + + fn inner_contains_value(&self) -> bool { + self.mutex.lock(|state| state.borrow().data.is_some()) + } +} + +#[derive(Debug)] +/// An error that can occur when a `Watch` returns a `Result::Err(_)`. +pub enum Error { + /// The maximum number of [`Receiver`](crate::watch::Receiver)/[`DynReceiver`](crate::watch::DynReceiver) has been reached. + MaximumReceiversReached, +} + +impl<'a, M: RawMutex, T: Clone, const N: usize> Watch { + /// Create a new `Watch` channel. + pub const fn new() -> Self { + Self { + mutex: Mutex::new(RefCell::new(WatchState { + data: None, + current_id: 0, + wakers: MultiWakerRegistration::new(), + receiver_count: 0, + })), + } + } + + /// Write a new value to the `Watch`. + pub fn write(&self, val: T) { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + s.data = Some(val); + s.current_id += 1; + s.wakers.wake(); + }) + } + + /// Create a new [`Receiver`] for the `Watch`. + pub fn receiver(&self) -> Result, Error> { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + if s.receiver_count < N { + s.receiver_count += 1; + Ok(Receiver(Rcv::new(self))) + } else { + Err(Error::MaximumReceiversReached) + } + }) + } + + /// Create a new [`DynReceiver`] for the `Watch`. + pub fn dyn_receiver(&self) -> Result, Error> { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + if s.receiver_count < N { + s.receiver_count += 1; + Ok(DynReceiver(Rcv::new(self))) + } else { + Err(Error::MaximumReceiversReached) + } + }) + } + + /// Tries to retrieve the value of the `Watch`. + pub fn try_peek(&self) -> Option { + self.inner_try_peek() + } + + /// Returns true if the `Watch` contains a value. + pub fn contains_value(&self) -> bool { + self.inner_contains_value() + } + + /// Clears the value of the `Watch`. This will cause calls to [`Rcv::get`] and [`Rcv::peek`] to be pending. + pub fn clear(&self) { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + s.data = None; + }) + } +} + +/// A receiver can `.await` a change in the `Watch` value. +pub struct Rcv<'a, T: Clone, W: WatchBehavior + ?Sized> { + watch: &'a W, + at_id: u64, + _phantom: PhantomData, +} + +impl<'a, T: Clone, W: WatchBehavior + ?Sized> Rcv<'a, T, W> { + /// Creates a new `Receiver` with a reference to the `Watch`. + fn new(watch: &'a W) -> Self { + Self { + watch, + at_id: 0, + _phantom: PhantomData, + } + } + + /// Returns the current value of the `Watch` if it is initialized, **without** marking it as seen. + pub async fn peek(&self) -> T { + poll_fn(|cx| self.watch.inner_poll_peek(cx)).await + } + + /// Tries to peek the current value of the `Watch` without waiting, and **without** marking it as seen. + pub fn try_peek(&self) -> Option { + self.watch.inner_try_peek() + } + + /// Returns the current value of the `Watch` if it is initialized, marking it as seen. + pub async fn get(&mut self) -> T { + poll_fn(|cx| self.watch.inner_poll_get(&mut self.at_id, cx)).await + } + + /// Tries to get the current value of the `Watch` without waiting, marking it as seen. + pub fn try_get(&mut self) -> Option { + self.watch.inner_try_get(&mut self.at_id) + } + + /// Waits for the `Watch` to change and returns the new value, marking it as seen. + pub async fn changed(&mut self) -> T { + poll_fn(|cx| self.watch.inner_poll_changed(&mut self.at_id, cx)).await + } + + /// Tries to get the new value of the watch without waiting, marking it as seen. + pub fn try_changed(&mut self) -> Option { + self.watch.inner_try_changed(&mut self.at_id) + } + + /// Checks if the `Watch` contains a value. If this returns true, + /// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately. + pub fn contains_value(&self) -> bool { + self.watch.inner_contains_value() + } +} + +/// A receiver of a `Watch` channel. +pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch>); + +/// A receiver which holds a **reference** to a `Watch` channel. +/// +/// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of +/// some runtime performance due to dynamic dispatch. +pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior + 'a>); + +impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { + type Target = Rcv<'a, T, Watch>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl<'a, T: Clone> Deref for DynReceiver<'a, T> { + type Target = Rcv<'a, T, dyn WatchBehavior + 'a>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +#[cfg(test)] +mod tests { + use futures_executor::block_on; + + use super::*; + use crate::blocking_mutex::raw::CriticalSectionRawMutex; + + #[test] + fn multiple_writes() { + let f = async { + static WATCH: Watch = Watch::new(); + + // Obtain Receivers + let mut rcv0 = WATCH.receiver().unwrap(); + let mut rcv1 = WATCH.dyn_receiver().unwrap(); + + WATCH.write(10); + + // Receive the new value + assert_eq!(rcv0.changed().await, 10); + assert_eq!(rcv1.changed().await, 10); + + // No update + assert_eq!(rcv0.try_changed(), None); + assert_eq!(rcv1.try_changed(), None); + + WATCH.write(20); + + assert_eq!(rcv0.changed().await, 20); + assert_eq!(rcv1.changed().await, 20); + }; + block_on(f); + } + + #[test] + fn max_receivers() { + let f = async { + static WATCH: Watch = Watch::new(); + + // Obtain Receivers + let _ = WATCH.receiver().unwrap(); + let _ = WATCH.receiver().unwrap(); + assert!(WATCH.receiver().is_err()); + }; + block_on(f); + } + + #[test] + fn receive_initial() { + let f = async { + static WATCH: Watch = Watch::new(); + + // Obtain Receivers + let mut rcv0 = WATCH.receiver().unwrap(); + let mut rcv1 = WATCH.receiver().unwrap(); + + assert_eq!(rcv0.contains_value(), false); + + assert_eq!(rcv0.try_changed(), None); + assert_eq!(rcv1.try_changed(), None); + + WATCH.write(0); + + assert_eq!(rcv0.contains_value(), true); + + assert_eq!(rcv0.try_changed(), Some(0)); + assert_eq!(rcv1.try_changed(), Some(0)); + }; + block_on(f); + } + + #[test] + fn peek_get_changed() { + let f = async { + static WATCH: Watch = Watch::new(); + + // Obtain Receivers + let mut rcv0 = WATCH.receiver().unwrap(); + + WATCH.write(10); + + // Ensure peek does not mark as seen + assert_eq!(rcv0.peek().await, 10); + assert_eq!(rcv0.try_changed(), Some(10)); + assert_eq!(rcv0.try_changed(), None); + assert_eq!(rcv0.peek().await, 10); + + WATCH.write(20); + + // Ensure get does mark as seen + assert_eq!(rcv0.get().await, 20); + assert_eq!(rcv0.try_changed(), None); + assert_eq!(rcv0.try_get(), Some(20)); + }; + block_on(f); + } + + #[test] + fn count_ids() { + let f = async { + static WATCH: Watch = Watch::new(); + + // Obtain Receivers + let mut rcv0 = WATCH.receiver().unwrap(); + let mut rcv1 = WATCH.receiver().unwrap(); + + let get_id = || WATCH.mutex.lock(|state| state.borrow().current_id); + + WATCH.write(10); + + assert_eq!(rcv0.changed().await, 10); + assert_eq!(rcv1.changed().await, 10); + + assert_eq!(rcv0.try_changed(), None); + assert_eq!(rcv1.try_changed(), None); + + WATCH.write(20); + WATCH.write(20); + WATCH.write(20); + + assert_eq!(rcv0.changed().await, 20); + assert_eq!(rcv1.changed().await, 20); + + assert_eq!(rcv0.try_changed(), None); + assert_eq!(rcv1.try_changed(), None); + + assert_eq!(get_id(), 4); + }; + block_on(f); + } + + #[test] + fn peek_still_await() { + let f = async { + static WATCH: Watch = Watch::new(); + + // Obtain Receivers + let mut rcv0 = WATCH.receiver().unwrap(); + let mut rcv1 = WATCH.receiver().unwrap(); + + WATCH.write(10); + + assert_eq!(rcv0.peek().await, 10); + assert_eq!(rcv1.try_peek(), Some(10)); + + assert_eq!(rcv0.changed().await, 10); + assert_eq!(rcv1.changed().await, 10); + }; + block_on(f); + } + + #[test] + fn peek_with_static() { + let f = async { + static WATCH: Watch = Watch::new(); + + // Obtain Receivers + let rcv0 = WATCH.receiver().unwrap(); + let rcv1 = WATCH.receiver().unwrap(); + + WATCH.write(20); + + assert_eq!(rcv0.peek().await, 20); + assert_eq!(rcv1.peek().await, 20); + assert_eq!(WATCH.try_peek(), Some(20)); + }; + block_on(f); + } +} From ae2f10992149279884ea564b00eb18c8bf1f464e Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Thu, 29 Feb 2024 16:45:44 +0100 Subject: [PATCH 07/15] Added sender types, support for dropping receivers, converting to dyn-types, revised tests. --- embassy-sync/src/watch.rs | 521 +++++++++++++++++++++++++++----------- 1 file changed, 374 insertions(+), 147 deletions(-) diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs index 7e5e927413..3e22b1e7b9 100644 --- a/embassy-sync/src/watch.rs +++ b/embassy-sync/src/watch.rs @@ -1,4 +1,4 @@ -//! A synchronization primitive for passing the latest value to **multiple** tasks. +//! A synchronization primitive for passing the latest value to **multiple** receivers. use core::cell::RefCell; use core::future::poll_fn; @@ -10,22 +10,17 @@ use crate::blocking_mutex::raw::RawMutex; use crate::blocking_mutex::Mutex; use crate::waitqueue::MultiWakerRegistration; -/// A `Watch` is a single-slot signaling primitive, which can awake `N` up to separate [`Receiver`]s. +/// The `Watch` is a single-slot signaling primitive that allows multiple receivers to concurrently await +/// changes to the value. Unlike a [`Signal`](crate::signal::Signal), `Watch` supports multiple receivers, +/// and unlike a [`PubSubChannel`](crate::pubsub::PubSubChannel), `Watch` immediately overwrites the previous +/// value when a new one is sent, without waiting for all receivers to read the previous value. /// -/// Similar to a [`Signal`](crate::signal::Signal), except `Watch` allows for multiple tasks to -/// `.await` the latest value, and all receive it. +/// This makes `Watch` particularly useful when a single task updates a value or "state", and multiple other tasks +/// need to be notified about changes to this value asynchronously. Receivers may "lose" stale values, as they are +/// always provided with the latest value. /// -/// This is similar to a [`PubSubChannel`](crate::pubsub::PubSubChannel) with a buffer size of 1, except -/// "sending" to it (calling [`Watch::write`]) will immediately overwrite the previous value instead -/// of waiting for the receivers to pop the previous value. -/// -/// `Watch` is useful when a single task is responsible for updating a value or "state", which multiple other -/// tasks are interested in getting notified about changes to the latest value of. It is therefore fine for -/// [`Receiver`]s to "lose" stale values. -/// -/// Anyone with a reference to the Watch can update or peek the value. Watches are generally declared -/// as `static`s and then borrowed as required to either [`Watch::peek`] the value or obtain a [`Receiver`] -/// with [`Watch::receiver`] which has async methods. +/// Typically, `Watch` instances are declared as `static`, and a [`Sender`] and [`Receiver`] +/// (or [`DynSender`] and/or [`DynReceiver`]) are obtained and passed to the relevant parts of the program. /// ``` /// /// use futures_executor::block_on; @@ -36,18 +31,18 @@ use crate::waitqueue::MultiWakerRegistration; /// /// static WATCH: Watch = Watch::new(); /// -/// // Obtain Receivers +/// // Obtain receivers and sender /// let mut rcv0 = WATCH.receiver().unwrap(); -/// let mut rcv1 = WATCH.receiver().unwrap(); -/// assert!(WATCH.receiver().is_err()); +/// let mut rcv1 = WATCH.dyn_receiver().unwrap(); +/// let mut snd = WATCH.sender(); /// +/// // No more receivers, and no update +/// assert!(WATCH.receiver().is_err()); /// assert_eq!(rcv1.try_changed(), None); /// -/// WATCH.write(10); -/// assert_eq!(WATCH.try_peek(), Some(10)); -/// +/// snd.send(10); /// -/// // Receive the new value +/// // Receive the new value (async or try) /// assert_eq!(rcv0.changed().await, 10); /// assert_eq!(rcv1.try_changed(), Some(10)); /// @@ -55,13 +50,14 @@ use crate::waitqueue::MultiWakerRegistration; /// assert_eq!(rcv0.try_changed(), None); /// assert_eq!(rcv1.try_changed(), None); /// -/// WATCH.write(20); +/// snd.send(20); /// -/// // Defference `between` peek `get`. +/// // Peek does not mark the value as seen /// assert_eq!(rcv0.peek().await, 20); -/// assert_eq!(rcv1.get().await, 20); -/// /// assert_eq!(rcv0.try_changed(), Some(20)); +/// +/// // Get marks the value as seen +/// assert_eq!(rcv1.get().await, 20); /// assert_eq!(rcv1.try_changed(), None); /// /// }; @@ -80,30 +76,57 @@ struct WatchState { /// A trait representing the 'inner' behavior of the `Watch`. pub trait WatchBehavior { + /// Sends a new value to the `Watch`. + fn send(&self, val: T); + + /// Clears the value of the `Watch`. + fn clear(&self); + /// Poll the `Watch` for the current value, **without** making it as seen. - fn inner_poll_peek(&self, cx: &mut Context<'_>) -> Poll; + fn poll_peek(&self, cx: &mut Context<'_>) -> Poll; /// Tries to peek the value of the `Watch`, **without** marking it as seen. - fn inner_try_peek(&self) -> Option; + fn try_peek(&self) -> Option; /// Poll the `Watch` for the current value, making it as seen. - fn inner_poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll; + fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll; /// Tries to get the value of the `Watch`, marking it as seen. - fn inner_try_get(&self, id: &mut u64) -> Option; + fn try_get(&self, id: &mut u64) -> Option; /// Poll the `Watch` for a changed value, marking it as seen. - fn inner_poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll; + fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll; /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. - fn inner_try_changed(&self, id: &mut u64) -> Option; + fn try_changed(&self, id: &mut u64) -> Option; /// Checks if the `Watch` is been initialized with a value. - fn inner_contains_value(&self) -> bool; + fn contains_value(&self) -> bool; + + /// Used when a receiver is dropped to decrement the receiver count. + /// + /// ## This method should not be called by the user. + fn drop_receiver(&self); } impl WatchBehavior for Watch { - fn inner_poll_peek(&self, cx: &mut Context<'_>) -> Poll { + fn send(&self, val: T) { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + s.data = Some(val); + s.current_id += 1; + s.wakers.wake(); + }) + } + + fn clear(&self) { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + s.data = None; + }) + } + + fn poll_peek(&self, cx: &mut Context<'_>) -> Poll { self.mutex.lock(|state| { let mut s = state.borrow_mut(); match &s.data { @@ -116,11 +139,11 @@ impl WatchBehavior for Watch }) } - fn inner_try_peek(&self) -> Option { + fn try_peek(&self) -> Option { self.mutex.lock(|state| state.borrow().data.clone()) } - fn inner_poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll { + fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll { self.mutex.lock(|state| { let mut s = state.borrow_mut(); match &s.data { @@ -136,7 +159,7 @@ impl WatchBehavior for Watch }) } - fn inner_try_get(&self, id: &mut u64) -> Option { + fn try_get(&self, id: &mut u64) -> Option { self.mutex.lock(|state| { let s = state.borrow(); *id = s.current_id; @@ -144,7 +167,7 @@ impl WatchBehavior for Watch }) } - fn inner_poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll { + fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll { self.mutex.lock(|state| { let mut s = state.borrow_mut(); match (&s.data, s.current_id > *id) { @@ -160,7 +183,7 @@ impl WatchBehavior for Watch }) } - fn inner_try_changed(&self, id: &mut u64) -> Option { + fn try_changed(&self, id: &mut u64) -> Option { self.mutex.lock(|state| { let s = state.borrow(); match s.current_id > *id { @@ -173,9 +196,16 @@ impl WatchBehavior for Watch }) } - fn inner_contains_value(&self) -> bool { + fn contains_value(&self) -> bool { self.mutex.lock(|state| state.borrow().data.is_some()) } + + fn drop_receiver(&self) { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + s.receiver_count -= 1; + }) + } } #[derive(Debug)] @@ -198,14 +228,14 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Watch { } } - /// Write a new value to the `Watch`. - pub fn write(&self, val: T) { - self.mutex.lock(|state| { - let mut s = state.borrow_mut(); - s.data = Some(val); - s.current_id += 1; - s.wakers.wake(); - }) + /// Create a new [`Receiver`] for the `Watch`. + pub fn sender(&self) -> Sender<'_, M, T, N> { + Sender(Snd::new(self)) + } + + /// Create a new [`DynReceiver`] for the `Watch`. + pub fn dyn_sender(&self) -> DynSender<'_, T> { + DynSender(Snd::new(self)) } /// Create a new [`Receiver`] for the `Watch`. @@ -214,7 +244,7 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Watch { let mut s = state.borrow_mut(); if s.receiver_count < N { s.receiver_count += 1; - Ok(Receiver(Rcv::new(self))) + Ok(Receiver(Rcv::new(self, 0))) } else { Err(Error::MaximumReceiversReached) } @@ -227,29 +257,121 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Watch { let mut s = state.borrow_mut(); if s.receiver_count < N { s.receiver_count += 1; - Ok(DynReceiver(Rcv::new(self))) + Ok(DynReceiver(Rcv::new(self, 0))) } else { Err(Error::MaximumReceiversReached) } }) } +} + +/// A receiver can `.await` a change in the `Watch` value. +pub struct Snd<'a, T: Clone, W: WatchBehavior + ?Sized> { + watch: &'a W, + _phantom: PhantomData, +} + +impl<'a, T: Clone, W: WatchBehavior + ?Sized> Clone for Snd<'a, T, W> { + fn clone(&self) -> Self { + Self { + watch: self.watch, + _phantom: PhantomData, + } + } +} + +impl<'a, T: Clone, W: WatchBehavior + ?Sized> Snd<'a, T, W> { + /// Creates a new `Receiver` with a reference to the `Watch`. + fn new(watch: &'a W) -> Self { + Self { + watch, + _phantom: PhantomData, + } + } + + /// Sends a new value to the `Watch`. + pub fn send(&self, val: T) { + self.watch.send(val) + } + + /// Clears the value of the `Watch`. + /// This will cause calls to [`Rcv::get`] and [`Rcv::peek`] to be pending. + pub fn clear(&self) { + self.watch.clear() + } /// Tries to retrieve the value of the `Watch`. pub fn try_peek(&self) -> Option { - self.inner_try_peek() + self.watch.try_peek() } /// Returns true if the `Watch` contains a value. pub fn contains_value(&self) -> bool { - self.inner_contains_value() + self.watch.contains_value() } +} - /// Clears the value of the `Watch`. This will cause calls to [`Rcv::get`] and [`Rcv::peek`] to be pending. - pub fn clear(&self) { - self.mutex.lock(|state| { - let mut s = state.borrow_mut(); - s.data = None; - }) +/// A sender of a `Watch` channel. +/// +/// For a simpler type definition, consider [`DynSender`] at the expense of +/// some runtime performance due to dynamic dispatch. +pub struct Sender<'a, M: RawMutex, T: Clone, const N: usize>(Snd<'a, T, Watch>); + +impl<'a, M: RawMutex, T: Clone, const N: usize> Clone for Sender<'a, M, T, N> { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl<'a, M: RawMutex, T: Clone, const N: usize> Sender<'a, M, T, N> { + /// Converts the `Sender` into a [`DynSender`]. + pub fn as_dyn(self) -> DynSender<'a, T> { + DynSender(Snd::new(self.watch)) + } +} + +impl<'a, M: RawMutex, T: Clone, const N: usize> Into> for Sender<'a, M, T, N> { + fn into(self) -> DynSender<'a, T> { + self.as_dyn() + } +} + +impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Sender<'a, M, T, N> { + type Target = Snd<'a, T, Watch>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Sender<'a, M, T, N> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// A sender which holds a **dynamic** reference to a `Watch` channel. +/// +/// This is an alternative to [`Sender`] with a simpler type definition, +pub struct DynSender<'a, T: Clone>(Snd<'a, T, dyn WatchBehavior + 'a>); + +impl<'a, T: Clone> Clone for DynSender<'a, T> { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl<'a, T: Clone> Deref for DynSender<'a, T> { + type Target = Snd<'a, T, dyn WatchBehavior + 'a>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Clone> DerefMut for DynSender<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 } } @@ -262,59 +384,83 @@ pub struct Rcv<'a, T: Clone, W: WatchBehavior + ?Sized> { impl<'a, T: Clone, W: WatchBehavior + ?Sized> Rcv<'a, T, W> { /// Creates a new `Receiver` with a reference to the `Watch`. - fn new(watch: &'a W) -> Self { + fn new(watch: &'a W, at_id: u64) -> Self { Self { watch, - at_id: 0, + at_id, _phantom: PhantomData, } } - /// Returns the current value of the `Watch` if it is initialized, **without** marking it as seen. + /// Returns the current value of the `Watch` once it is initialized, **without** marking it as seen. + /// + /// **Note**: Futures do nothing unless you `.await` or poll them. pub async fn peek(&self) -> T { - poll_fn(|cx| self.watch.inner_poll_peek(cx)).await + poll_fn(|cx| self.watch.poll_peek(cx)).await } /// Tries to peek the current value of the `Watch` without waiting, and **without** marking it as seen. pub fn try_peek(&self) -> Option { - self.watch.inner_try_peek() + self.watch.try_peek() } - /// Returns the current value of the `Watch` if it is initialized, marking it as seen. + /// Returns the current value of the `Watch` once it is initialized, marking it as seen. + /// + /// **Note**: Futures do nothing unless you `.await` or poll them. pub async fn get(&mut self) -> T { - poll_fn(|cx| self.watch.inner_poll_get(&mut self.at_id, cx)).await + poll_fn(|cx| self.watch.poll_get(&mut self.at_id, cx)).await } /// Tries to get the current value of the `Watch` without waiting, marking it as seen. pub fn try_get(&mut self) -> Option { - self.watch.inner_try_get(&mut self.at_id) + self.watch.try_get(&mut self.at_id) } /// Waits for the `Watch` to change and returns the new value, marking it as seen. + /// + /// **Note**: Futures do nothing unless you `.await` or poll them. pub async fn changed(&mut self) -> T { - poll_fn(|cx| self.watch.inner_poll_changed(&mut self.at_id, cx)).await + poll_fn(|cx| self.watch.poll_changed(&mut self.at_id, cx)).await } /// Tries to get the new value of the watch without waiting, marking it as seen. pub fn try_changed(&mut self) -> Option { - self.watch.inner_try_changed(&mut self.at_id) + self.watch.try_changed(&mut self.at_id) } /// Checks if the `Watch` contains a value. If this returns true, /// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately. pub fn contains_value(&self) -> bool { - self.watch.inner_contains_value() + self.watch.contains_value() + } +} + +impl<'a, T: Clone, W: WatchBehavior + ?Sized> Drop for Rcv<'a, T, W> { + fn drop(&mut self) { + self.watch.drop_receiver(); } } /// A receiver of a `Watch` channel. pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch>); -/// A receiver which holds a **reference** to a `Watch` channel. -/// -/// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of -/// some runtime performance due to dynamic dispatch. -pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior + 'a>); +impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> { + /// Converts the `Receiver` into a [`DynReceiver`]. + pub fn as_dyn(self) -> DynReceiver<'a, T> { + // We need to increment the receiver count since the original + // receiver is being dropped, which decrements the count. + self.watch.mutex.lock(|state| { + state.borrow_mut().receiver_count += 1; + }); + DynReceiver(Rcv::new(self.0.watch, self.at_id)) + } +} + +impl<'a, M: RawMutex, T: Clone, const N: usize> Into> for Receiver<'a, M, T, N> { + fn into(self) -> DynReceiver<'a, T> { + self.as_dyn() + } +} impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { type Target = Rcv<'a, T, Watch>; @@ -330,6 +476,12 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, } } +/// A receiver which holds a **dynamic** reference to a `Watch` channel. +/// +/// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of +/// some runtime performance due to dynamic dispatch. +pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior + 'a>); + impl<'a, T: Clone> Deref for DynReceiver<'a, T> { type Target = Rcv<'a, T, dyn WatchBehavior + 'a>; @@ -348,167 +500,242 @@ impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> { mod tests { use futures_executor::block_on; - use super::*; + use super::Watch; use crate::blocking_mutex::raw::CriticalSectionRawMutex; #[test] - fn multiple_writes() { + fn multiple_sends() { let f = async { - static WATCH: Watch = Watch::new(); + static WATCH: Watch = Watch::new(); - // Obtain Receivers - let mut rcv0 = WATCH.receiver().unwrap(); - let mut rcv1 = WATCH.dyn_receiver().unwrap(); + // Obtain receiver and sender + let mut rcv = WATCH.receiver().unwrap(); + let snd = WATCH.sender(); - WATCH.write(10); + // Not initialized + assert_eq!(rcv.try_changed(), None); // Receive the new value - assert_eq!(rcv0.changed().await, 10); - assert_eq!(rcv1.changed().await, 10); + snd.send(10); + assert_eq!(rcv.changed().await, 10); + + // Receive another value + snd.send(20); + assert_eq!(rcv.try_changed(), Some(20)); // No update - assert_eq!(rcv0.try_changed(), None); - assert_eq!(rcv1.try_changed(), None); + assert_eq!(rcv.try_changed(), None); + }; + block_on(f); + } - WATCH.write(20); + #[test] + fn receive_after_create() { + let f = async { + static WATCH: Watch = Watch::new(); + + // Obtain sender and send value + let snd = WATCH.sender(); + snd.send(10); - assert_eq!(rcv0.changed().await, 20); - assert_eq!(rcv1.changed().await, 20); + // Obtain receiver and receive value + let mut rcv = WATCH.receiver().unwrap(); + assert_eq!(rcv.try_changed(), Some(10)); }; block_on(f); } #[test] - fn max_receivers() { + fn max_receivers_drop() { let f = async { static WATCH: Watch = Watch::new(); - // Obtain Receivers - let _ = WATCH.receiver().unwrap(); - let _ = WATCH.receiver().unwrap(); - assert!(WATCH.receiver().is_err()); + // Try to create 3 receivers (only 2 can exist at once) + let rcv0 = WATCH.receiver(); + let rcv1 = WATCH.receiver(); + let rcv2 = WATCH.receiver(); + + // Ensure the first two are successful and the third is not + assert!(rcv0.is_ok()); + assert!(rcv1.is_ok()); + assert!(rcv2.is_err()); + + // Drop the first receiver + drop(rcv0); + + // Create another receiver and ensure it is successful + let rcv3 = WATCH.receiver(); + assert!(rcv3.is_ok()); }; block_on(f); } #[test] - fn receive_initial() { + fn multiple_receivers() { let f = async { static WATCH: Watch = Watch::new(); - // Obtain Receivers + // Obtain receivers and sender let mut rcv0 = WATCH.receiver().unwrap(); let mut rcv1 = WATCH.receiver().unwrap(); + let snd = WATCH.sender(); - assert_eq!(rcv0.contains_value(), false); - + // No update for both assert_eq!(rcv0.try_changed(), None); assert_eq!(rcv1.try_changed(), None); - WATCH.write(0); - - assert_eq!(rcv0.contains_value(), true); + // Send a new value + snd.send(0); + // Both receivers receive the new value assert_eq!(rcv0.try_changed(), Some(0)); assert_eq!(rcv1.try_changed(), Some(0)); }; block_on(f); } + #[test] + fn clone_senders() { + let f = async { + // Obtain different ways to send + static WATCH: Watch = Watch::new(); + let snd0 = WATCH.sender(); + let snd1 = snd0.clone(); + + // Obtain Receiver + let mut rcv = WATCH.receiver().unwrap().as_dyn(); + + // Send a value from first sender + snd0.send(10); + assert_eq!(rcv.try_changed(), Some(10)); + + // Send a value from second sender + snd1.send(20); + assert_eq!(rcv.try_changed(), Some(20)); + }; + block_on(f); + } + #[test] fn peek_get_changed() { let f = async { static WATCH: Watch = Watch::new(); - // Obtain Receivers - let mut rcv0 = WATCH.receiver().unwrap(); + // Obtain receiver and sender + let mut rcv = WATCH.receiver().unwrap(); + let snd = WATCH.sender(); - WATCH.write(10); + // Send a value + snd.send(10); // Ensure peek does not mark as seen - assert_eq!(rcv0.peek().await, 10); - assert_eq!(rcv0.try_changed(), Some(10)); - assert_eq!(rcv0.try_changed(), None); - assert_eq!(rcv0.peek().await, 10); + assert_eq!(rcv.peek().await, 10); + assert_eq!(rcv.try_changed(), Some(10)); + assert_eq!(rcv.try_changed(), None); + assert_eq!(rcv.try_peek(), Some(10)); - WATCH.write(20); + // Send a value + snd.send(20); // Ensure get does mark as seen - assert_eq!(rcv0.get().await, 20); - assert_eq!(rcv0.try_changed(), None); - assert_eq!(rcv0.try_get(), Some(20)); + assert_eq!(rcv.get().await, 20); + assert_eq!(rcv.try_changed(), None); + assert_eq!(rcv.try_get(), Some(20)); }; block_on(f); } #[test] - fn count_ids() { + fn use_dynamics() { let f = async { static WATCH: Watch = Watch::new(); - // Obtain Receivers - let mut rcv0 = WATCH.receiver().unwrap(); - let mut rcv1 = WATCH.receiver().unwrap(); + // Obtain receiver and sender + let mut dyn_rcv = WATCH.dyn_receiver().unwrap(); + let dyn_snd = WATCH.dyn_sender(); - let get_id = || WATCH.mutex.lock(|state| state.borrow().current_id); + // Send a value + dyn_snd.send(10); - WATCH.write(10); - - assert_eq!(rcv0.changed().await, 10); - assert_eq!(rcv1.changed().await, 10); + // Ensure the dynamic receiver receives the value + assert_eq!(dyn_rcv.try_changed(), Some(10)); + assert_eq!(dyn_rcv.try_changed(), None); + }; + block_on(f); + } - assert_eq!(rcv0.try_changed(), None); - assert_eq!(rcv1.try_changed(), None); + #[test] + fn convert_to_dyn() { + let f = async { + static WATCH: Watch = Watch::new(); - WATCH.write(20); - WATCH.write(20); - WATCH.write(20); + // Obtain receiver and sender + let rcv = WATCH.receiver().unwrap(); + let snd = WATCH.sender(); - assert_eq!(rcv0.changed().await, 20); - assert_eq!(rcv1.changed().await, 20); + // Convert to dynamic + let mut dyn_rcv = rcv.as_dyn(); + let dyn_snd = snd.as_dyn(); - assert_eq!(rcv0.try_changed(), None); - assert_eq!(rcv1.try_changed(), None); + // Send a value + dyn_snd.send(10); - assert_eq!(get_id(), 4); + // Ensure the dynamic receiver receives the value + assert_eq!(dyn_rcv.try_changed(), Some(10)); + assert_eq!(dyn_rcv.try_changed(), None); }; block_on(f); } #[test] - fn peek_still_await() { + fn dynamic_receiver_count() { let f = async { static WATCH: Watch = Watch::new(); - // Obtain Receivers - let mut rcv0 = WATCH.receiver().unwrap(); - let mut rcv1 = WATCH.receiver().unwrap(); + // Obtain receiver and sender + let rcv0 = WATCH.receiver(); + let rcv1 = WATCH.receiver(); + let rcv2 = WATCH.receiver(); - WATCH.write(10); + // Ensure the first two are successful and the third is not + assert!(rcv0.is_ok()); + assert!(rcv1.is_ok()); + assert!(rcv2.is_err()); - assert_eq!(rcv0.peek().await, 10); - assert_eq!(rcv1.try_peek(), Some(10)); + // Convert to dynamic + let dyn_rcv0 = rcv0.unwrap().as_dyn(); - assert_eq!(rcv0.changed().await, 10); - assert_eq!(rcv1.changed().await, 10); + // Drop the (now dynamic) receiver + drop(dyn_rcv0); + + // Create another receiver and ensure it is successful + let rcv3 = WATCH.receiver(); + let rcv4 = WATCH.receiver(); + assert!(rcv3.is_ok()); + assert!(rcv4.is_err()); }; block_on(f); } #[test] - fn peek_with_static() { + fn contains_value() { let f = async { static WATCH: Watch = Watch::new(); - // Obtain Receivers - let rcv0 = WATCH.receiver().unwrap(); - let rcv1 = WATCH.receiver().unwrap(); + // Obtain receiver and sender + let rcv = WATCH.receiver().unwrap(); + let snd = WATCH.sender(); + + // check if the watch contains a value + assert_eq!(rcv.contains_value(), false); + assert_eq!(snd.contains_value(), false); - WATCH.write(20); + // Send a value + snd.send(10); - assert_eq!(rcv0.peek().await, 20); - assert_eq!(rcv1.peek().await, 20); - assert_eq!(WATCH.try_peek(), Some(20)); + // check if the watch contains a value + assert_eq!(rcv.contains_value(), true); + assert_eq!(snd.contains_value(), true); }; block_on(f); } From 3208e0fec4717ff1580d1cc0cf93e18cbba2db91 Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Thu, 29 Feb 2024 16:56:52 +0100 Subject: [PATCH 08/15] Use Option instead of Result for receiver creation since it is the only way it can fail. --- embassy-sync/src/watch.rs | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs index 3e22b1e7b9..2bba93915a 100644 --- a/embassy-sync/src/watch.rs +++ b/embassy-sync/src/watch.rs @@ -208,14 +208,7 @@ impl WatchBehavior for Watch } } -#[derive(Debug)] -/// An error that can occur when a `Watch` returns a `Result::Err(_)`. -pub enum Error { - /// The maximum number of [`Receiver`](crate::watch::Receiver)/[`DynReceiver`](crate::watch::DynReceiver) has been reached. - MaximumReceiversReached, -} - -impl<'a, M: RawMutex, T: Clone, const N: usize> Watch { +impl Watch { /// Create a new `Watch` channel. pub const fn new() -> Self { Self { @@ -238,28 +231,30 @@ impl<'a, M: RawMutex, T: Clone, const N: usize> Watch { DynSender(Snd::new(self)) } - /// Create a new [`Receiver`] for the `Watch`. - pub fn receiver(&self) -> Result, Error> { + /// Try to create a new [`Receiver`] for the `Watch`. If the + /// maximum number of receivers has been reached, `None` is returned. + pub fn receiver(&self) -> Option> { self.mutex.lock(|state| { let mut s = state.borrow_mut(); if s.receiver_count < N { s.receiver_count += 1; - Ok(Receiver(Rcv::new(self, 0))) + Some(Receiver(Rcv::new(self, 0))) } else { - Err(Error::MaximumReceiversReached) + None } }) } - /// Create a new [`DynReceiver`] for the `Watch`. - pub fn dyn_receiver(&self) -> Result, Error> { + /// Try to create a new [`DynReceiver`] for the `Watch`. If the + /// maximum number of receivers has been reached, `None` is returned. + pub fn dyn_receiver(&self) -> Option> { self.mutex.lock(|state| { let mut s = state.borrow_mut(); if s.receiver_count < N { s.receiver_count += 1; - Ok(DynReceiver(Rcv::new(self, 0))) + Some(DynReceiver(Rcv::new(self, 0))) } else { - Err(Error::MaximumReceiversReached) + None } }) } From c08e75057a4282a0dfee13a6e181a2077944c1b0 Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Thu, 29 Feb 2024 16:58:21 +0100 Subject: [PATCH 09/15] Update tests to reflect changes in previous commit --- embassy-sync/src/watch.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs index 2bba93915a..1301eb817c 100644 --- a/embassy-sync/src/watch.rs +++ b/embassy-sync/src/watch.rs @@ -551,16 +551,16 @@ mod tests { let rcv2 = WATCH.receiver(); // Ensure the first two are successful and the third is not - assert!(rcv0.is_ok()); - assert!(rcv1.is_ok()); - assert!(rcv2.is_err()); + assert!(rcv0.is_some()); + assert!(rcv1.is_some()); + assert!(rcv2.is_none()); // Drop the first receiver drop(rcv0); // Create another receiver and ensure it is successful let rcv3 = WATCH.receiver(); - assert!(rcv3.is_ok()); + assert!(rcv3.is_some()); }; block_on(f); } @@ -693,9 +693,9 @@ mod tests { let rcv2 = WATCH.receiver(); // Ensure the first two are successful and the third is not - assert!(rcv0.is_ok()); - assert!(rcv1.is_ok()); - assert!(rcv2.is_err()); + assert!(rcv0.is_some()); + assert!(rcv1.is_some()); + assert!(rcv2.is_none()); // Convert to dynamic let dyn_rcv0 = rcv0.unwrap().as_dyn(); @@ -706,8 +706,8 @@ mod tests { // Create another receiver and ensure it is successful let rcv3 = WATCH.receiver(); let rcv4 = WATCH.receiver(); - assert!(rcv3.is_ok()); - assert!(rcv4.is_err()); + assert!(rcv3.is_some()); + assert!(rcv4.is_none()); }; block_on(f); } From df282aa23d00b2f3116081be2b07ba0c9f810fc3 Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Thu, 29 Feb 2024 16:59:58 +0100 Subject: [PATCH 10/15] Forgot to update doc comment --- embassy-sync/src/watch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs index 1301eb817c..01d82def4f 100644 --- a/embassy-sync/src/watch.rs +++ b/embassy-sync/src/watch.rs @@ -37,7 +37,7 @@ use crate::waitqueue::MultiWakerRegistration; /// let mut snd = WATCH.sender(); /// /// // No more receivers, and no update -/// assert!(WATCH.receiver().is_err()); +/// assert!(WATCH.receiver().is_none()); /// assert_eq!(rcv1.try_changed(), None); /// /// snd.send(10); From 311ab07a9af0029060813779038220481d1bf1c5 Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Sat, 2 Mar 2024 00:14:11 +0100 Subject: [PATCH 11/15] Reintroduce predicate methods. Add ability for sender to modify value in-place. --- embassy-sync/src/watch.rs | 267 +++++++++++++++++++++++++++++++++++++- 1 file changed, 260 insertions(+), 7 deletions(-) diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs index 01d82def4f..520696f7d5 100644 --- a/embassy-sync/src/watch.rs +++ b/embassy-sync/src/watch.rs @@ -88,21 +88,48 @@ pub trait WatchBehavior { /// Tries to peek the value of the `Watch`, **without** marking it as seen. fn try_peek(&self) -> Option; + /// Poll the `Watch` for the value if it matches the predicate function + /// `f`, **without** making it as seen. + fn poll_peek_and(&self, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll; + + /// Tries to peek the value of the `Watch` if it matches the predicate function `f`, **without** marking it as seen. + fn try_peek_and(&self, f: &mut dyn Fn(&T) -> bool) -> Option; + /// Poll the `Watch` for the current value, making it as seen. fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll; /// Tries to get the value of the `Watch`, marking it as seen. fn try_get(&self, id: &mut u64) -> Option; + /// Poll the `Watch` for the value if it matches the predicate function + /// `f`, making it as seen. + fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll; + + /// Tries to get the value of the `Watch` if it matches the predicate function + /// `f`, marking it as seen. + fn try_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option; + /// Poll the `Watch` for a changed value, marking it as seen. fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll; /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. fn try_changed(&self, id: &mut u64) -> Option; + /// Poll the `Watch` for a changed value that matches the predicate function + /// `f`, marking it as seen. + fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll; + + /// Tries to retrieve the value of the `Watch` if it has changed and matches the + /// predicate function `f`, marking it as seen. + fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option; + /// Checks if the `Watch` is been initialized with a value. fn contains_value(&self) -> bool; + /// Modify the value of the `Watch` using a closure. Returns `false` if the + /// `Watch` does not already contain a value. + fn modify(&self, f: &mut dyn Fn(&mut Option)); + /// Used when a receiver is dropped to decrement the receiver count. /// /// ## This method should not be called by the user. @@ -143,6 +170,29 @@ impl WatchBehavior for Watch self.mutex.lock(|state| state.borrow().data.clone()) } + fn poll_peek_and(&self, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + match s.data { + Some(ref data) if f(data) => Poll::Ready(data.clone()), + _ => { + s.wakers.register(cx.waker()); + Poll::Pending + } + } + }) + } + + fn try_peek_and(&self, f: &mut dyn Fn(&T) -> bool) -> Option { + self.mutex.lock(|state| { + let s = state.borrow(); + match s.data { + Some(ref data) if f(data) => Some(data.clone()), + _ => None, + } + }) + } + fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll { self.mutex.lock(|state| { let mut s = state.borrow_mut(); @@ -167,6 +217,35 @@ impl WatchBehavior for Watch }) } + fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + match s.data { + Some(ref data) if f(data) => { + *id = s.current_id; + Poll::Ready(data.clone()) + } + _ => { + s.wakers.register(cx.waker()); + Poll::Pending + } + } + }) + } + + fn try_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option { + self.mutex.lock(|state| { + let s = state.borrow(); + match s.data { + Some(ref data) if f(data) => { + *id = s.current_id; + Some(data.clone()) + } + _ => None, + } + }) + } + fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll { self.mutex.lock(|state| { let mut s = state.borrow_mut(); @@ -189,13 +268,42 @@ impl WatchBehavior for Watch match s.current_id > *id { true => { *id = s.current_id; - state.borrow().data.clone() + s.data.clone() } false => None, } }) } + fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + match (&s.data, s.current_id > *id) { + (Some(data), true) if f(data) => { + *id = s.current_id; + Poll::Ready(data.clone()) + } + _ => { + s.wakers.register(cx.waker()); + Poll::Pending + } + } + }) + } + + fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option { + self.mutex.lock(|state| { + let s = state.borrow(); + match (&s.data, s.current_id > *id) { + (Some(data), true) if f(data) => { + *id = s.current_id; + s.data.clone() + } + _ => None, + } + }) + } + fn contains_value(&self) -> bool { self.mutex.lock(|state| state.borrow().data.is_some()) } @@ -206,6 +314,15 @@ impl WatchBehavior for Watch s.receiver_count -= 1; }) } + + fn modify(&self, f: &mut dyn Fn(&mut Option)) { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + f(&mut s.data); + s.current_id += 1; + s.wakers.wake(); + }) + } } impl Watch { @@ -300,10 +417,27 @@ impl<'a, T: Clone, W: WatchBehavior + ?Sized> Snd<'a, T, W> { self.watch.try_peek() } + /// Tries to peek the current value of the `Watch` if it matches the predicate + /// function `f`. + pub fn try_peek_and(&self, mut f: F) -> Option + where + F: Fn(&T) -> bool, + { + self.watch.try_peek_and(&mut f) + } + /// Returns true if the `Watch` contains a value. pub fn contains_value(&self) -> bool { self.watch.contains_value() } + + /// Modify the value of the `Watch` using a closure. + pub fn modify(&self, mut f: F) + where + F: Fn(&mut Option), + { + self.watch.modify(&mut f) + } } /// A sender of a `Watch` channel. @@ -399,6 +533,26 @@ impl<'a, T: Clone, W: WatchBehavior + ?Sized> Rcv<'a, T, W> { self.watch.try_peek() } + /// Returns the current value of the `Watch` if it matches the predicate function `f`, + /// or waits for it to match, **without** marking it as seen. + /// + /// **Note**: Futures do nothing unless you `.await` or poll them. + pub async fn peek_and(&self, mut f: F) -> T + where + F: Fn(&T) -> bool, + { + poll_fn(|cx| self.watch.poll_peek_and(&mut f, cx)).await + } + + /// Tries to peek the current value of the `Watch` if it matches the predicate + /// function `f` without waiting, and **without** marking it as seen. + pub fn try_peek_and(&self, mut f: F) -> Option + where + F: Fn(&T) -> bool, + { + self.watch.try_peek_and(&mut f) + } + /// Returns the current value of the `Watch` once it is initialized, marking it as seen. /// /// **Note**: Futures do nothing unless you `.await` or poll them. @@ -411,6 +565,26 @@ impl<'a, T: Clone, W: WatchBehavior + ?Sized> Rcv<'a, T, W> { self.watch.try_get(&mut self.at_id) } + /// Returns the value of the `Watch` if it matches the predicate function `f`, + /// or waits for it to match, marking it as seen. + /// + /// **Note**: Futures do nothing unless you `.await` or poll them. + pub async fn get_and(&mut self, mut f: F) -> T + where + F: Fn(&T) -> bool, + { + poll_fn(|cx| self.watch.poll_get_and(&mut self.at_id, &mut f, cx)).await + } + + /// Tries to get the current value of the `Watch` if it matches the predicate + /// function `f` without waiting, marking it as seen. + pub fn try_get_and(&mut self, mut f: F) -> Option + where + F: Fn(&T) -> bool, + { + self.watch.try_get_and(&mut self.at_id, &mut f) + } + /// Waits for the `Watch` to change and returns the new value, marking it as seen. /// /// **Note**: Futures do nothing unless you `.await` or poll them. @@ -423,6 +597,26 @@ impl<'a, T: Clone, W: WatchBehavior + ?Sized> Rcv<'a, T, W> { self.watch.try_changed(&mut self.at_id) } + /// Waits for the `Watch` to change to a value which satisfies the predicate + /// function `f` and returns the new value, marking it as seen. + /// + /// **Note**: Futures do nothing unless you `.await` or poll them. + pub async fn changed_and(&mut self, mut f: F) -> T + where + F: Fn(&T) -> bool, + { + poll_fn(|cx| self.watch.poll_changed_and(&mut self.at_id, &mut f, cx)).await + } + + /// Tries to get the new value of the watch which satisfies the predicate + /// function `f` and returns the new value without waiting, marking it as seen. + pub fn try_changed_and(&mut self, mut f: F) -> Option + where + F: Fn(&T) -> bool, + { + self.watch.try_changed_and(&mut self.at_id, &mut f) + } + /// Checks if the `Watch` contains a value. If this returns true, /// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately. pub fn contains_value(&self) -> bool { @@ -442,12 +636,9 @@ pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch< impl<'a, M: RawMutex, T: Clone, const N: usize> Receiver<'a, M, T, N> { /// Converts the `Receiver` into a [`DynReceiver`]. pub fn as_dyn(self) -> DynReceiver<'a, T> { - // We need to increment the receiver count since the original - // receiver is being dropped, which decrements the count. - self.watch.mutex.lock(|state| { - state.borrow_mut().receiver_count += 1; - }); - DynReceiver(Rcv::new(self.0.watch, self.at_id)) + let rcv = DynReceiver(Rcv::new(self.0.watch, self.at_id)); + core::mem::forget(self); // Ensures the destructor is not called + rcv } } @@ -524,6 +715,68 @@ mod tests { block_on(f); } + #[test] + fn sender_modify() { + let f = async { + static WATCH: Watch = Watch::new(); + + // Obtain receiver and sender + let mut rcv = WATCH.receiver().unwrap(); + let snd = WATCH.sender(); + + // Receive the new value + snd.send(10); + assert_eq!(rcv.try_changed(), Some(10)); + + // Modify the value inplace + snd.modify(|opt|{ + if let Some(inner) = opt { + *inner += 5; + } + }); + + // Get the modified value + assert_eq!(rcv.try_changed(), Some(15)); + assert_eq!(rcv.try_changed(), None); + }; + block_on(f); + } + + #[test] + fn predicate_fn() { + let f = async { + static WATCH: Watch = Watch::new(); + + // Obtain receiver and sender + let mut rcv = WATCH.receiver().unwrap(); + let snd = WATCH.sender(); + + snd.send(10); + assert_eq!(rcv.try_peek_and(|x| x > &5), Some(10)); + assert_eq!(rcv.try_peek_and(|x| x < &5), None); + assert!(rcv.try_changed().is_some()); + + snd.send(15); + assert_eq!(rcv.try_get_and(|x| x > &5), Some(15)); + assert_eq!(rcv.try_get_and(|x| x < &5), None); + assert!(rcv.try_changed().is_none()); + + snd.send(20); + assert_eq!(rcv.try_changed_and(|x| x > &5), Some(20)); + assert_eq!(rcv.try_changed_and(|x| x > &5), None); + + snd.send(25); + assert_eq!(rcv.try_changed_and(|x| x < &5), None); + assert_eq!(rcv.try_changed(), Some(25)); + + snd.send(30); + assert_eq!(rcv.changed_and(|x| x > &5).await, 30); + assert_eq!(rcv.peek_and(|x| x > &5).await, 30); + assert_eq!(rcv.get_and(|x| x > &5).await, 30); + }; + block_on(f); + } + #[test] fn receive_after_create() { let f = async { From e02a987bafd4f0fcf9d80e7c4f6e1504b8b02cec Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Sat, 2 Mar 2024 00:16:17 +0100 Subject: [PATCH 12/15] This one is for cargo fmt --- embassy-sync/src/watch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs index 520696f7d5..298c09d436 100644 --- a/embassy-sync/src/watch.rs +++ b/embassy-sync/src/watch.rs @@ -729,7 +729,7 @@ mod tests { assert_eq!(rcv.try_changed(), Some(10)); // Modify the value inplace - snd.modify(|opt|{ + snd.modify(|opt| { if let Some(inner) = opt { *inner += 5; } From a669611d7c8fb17666f35086ea20c476b6029854 Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Mon, 23 Sep 2024 20:09:35 +0200 Subject: [PATCH 13/15] Discontinue peek, add AnonReceiver --- embassy-sync/src/watch.rs | 440 +++++++++++++++++++++++++------------- 1 file changed, 289 insertions(+), 151 deletions(-) diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs index 298c09d436..1b4a8b5895 100644 --- a/embassy-sync/src/watch.rs +++ b/embassy-sync/src/watch.rs @@ -20,7 +20,9 @@ use crate::waitqueue::MultiWakerRegistration; /// always provided with the latest value. /// /// Typically, `Watch` instances are declared as `static`, and a [`Sender`] and [`Receiver`] -/// (or [`DynSender`] and/or [`DynReceiver`]) are obtained and passed to the relevant parts of the program. +/// (or [`DynSender`] and/or [`DynReceiver`]) are obtained where relevant. An [`AnonReceiver`] +/// and [`DynAnonReceiver`] are also available, which do not increase the receiver count for the +/// channel, and unwrapping is therefore not required, but it is not possible to `.await` the channel. /// ``` /// /// use futures_executor::block_on; @@ -41,25 +43,25 @@ use crate::waitqueue::MultiWakerRegistration; /// assert_eq!(rcv1.try_changed(), None); /// /// snd.send(10); -/// +/// /// // Receive the new value (async or try) /// assert_eq!(rcv0.changed().await, 10); /// assert_eq!(rcv1.try_changed(), Some(10)); -/// +/// /// // No update /// assert_eq!(rcv0.try_changed(), None); /// assert_eq!(rcv1.try_changed(), None); /// /// snd.send(20); /// -/// // Peek does not mark the value as seen -/// assert_eq!(rcv0.peek().await, 20); -/// assert_eq!(rcv0.try_changed(), Some(20)); -/// -/// // Get marks the value as seen +/// // Using `get` marks the value as seen /// assert_eq!(rcv1.get().await, 20); /// assert_eq!(rcv1.try_changed(), None); /// +/// // But `get` also returns when unchanged +/// assert_eq!(rcv1.get().await, 20); +/// assert_eq!(rcv1.get().await, 20); +/// /// }; /// block_on(f); /// ``` @@ -82,24 +84,11 @@ pub trait WatchBehavior { /// Clears the value of the `Watch`. fn clear(&self); - /// Poll the `Watch` for the current value, **without** making it as seen. - fn poll_peek(&self, cx: &mut Context<'_>) -> Poll; - - /// Tries to peek the value of the `Watch`, **without** marking it as seen. - fn try_peek(&self) -> Option; - - /// Poll the `Watch` for the value if it matches the predicate function - /// `f`, **without** making it as seen. - fn poll_peek_and(&self, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll; - - /// Tries to peek the value of the `Watch` if it matches the predicate function `f`, **without** marking it as seen. - fn try_peek_and(&self, f: &mut dyn Fn(&T) -> bool) -> Option; - /// Poll the `Watch` for the current value, making it as seen. fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll; - /// Tries to get the value of the `Watch`, marking it as seen. - fn try_get(&self, id: &mut u64) -> Option; + /// Tries to get the value of the `Watch`, marking it as seen, if an id is given. + fn try_get(&self, id: Option<&mut u64>) -> Option; /// Poll the `Watch` for the value if it matches the predicate function /// `f`, making it as seen. @@ -107,9 +96,9 @@ pub trait WatchBehavior { /// Tries to get the value of the `Watch` if it matches the predicate function /// `f`, marking it as seen. - fn try_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option; + fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option; - /// Poll the `Watch` for a changed value, marking it as seen. + /// Poll the `Watch` for a changed value, marking it as seen, if an id is given. fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll; /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. @@ -128,7 +117,11 @@ pub trait WatchBehavior { /// Modify the value of the `Watch` using a closure. Returns `false` if the /// `Watch` does not already contain a value. - fn modify(&self, f: &mut dyn Fn(&mut Option)); + fn send_modify(&self, f: &mut dyn Fn(&mut Option)); + + /// Modify the value of the `Watch` using a closure. Returns `false` if the + /// `Watch` does not already contain a value. + fn send_if_modified(&self, f: &mut dyn Fn(&mut Option) -> bool); /// Used when a receiver is dropped to decrement the receiver count. /// @@ -153,46 +146,6 @@ impl WatchBehavior for Watch }) } - fn poll_peek(&self, cx: &mut Context<'_>) -> Poll { - self.mutex.lock(|state| { - let mut s = state.borrow_mut(); - match &s.data { - Some(data) => Poll::Ready(data.clone()), - None => { - s.wakers.register(cx.waker()); - Poll::Pending - } - } - }) - } - - fn try_peek(&self) -> Option { - self.mutex.lock(|state| state.borrow().data.clone()) - } - - fn poll_peek_and(&self, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll { - self.mutex.lock(|state| { - let mut s = state.borrow_mut(); - match s.data { - Some(ref data) if f(data) => Poll::Ready(data.clone()), - _ => { - s.wakers.register(cx.waker()); - Poll::Pending - } - } - }) - } - - fn try_peek_and(&self, f: &mut dyn Fn(&T) -> bool) -> Option { - self.mutex.lock(|state| { - let s = state.borrow(); - match s.data { - Some(ref data) if f(data) => Some(data.clone()), - _ => None, - } - }) - } - fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll { self.mutex.lock(|state| { let mut s = state.borrow_mut(); @@ -209,11 +162,13 @@ impl WatchBehavior for Watch }) } - fn try_get(&self, id: &mut u64) -> Option { + fn try_get(&self, id: Option<&mut u64>) -> Option { self.mutex.lock(|state| { let s = state.borrow(); - *id = s.current_id; - state.borrow().data.clone() + if let Some(id) = id { + *id = s.current_id; + } + s.data.clone() }) } @@ -233,12 +188,14 @@ impl WatchBehavior for Watch }) } - fn try_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option { + fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option { self.mutex.lock(|state| { let s = state.borrow(); match s.data { Some(ref data) if f(data) => { - *id = s.current_id; + if let Some(id) = id { + *id = s.current_id; + } Some(data.clone()) } _ => None, @@ -315,7 +272,7 @@ impl WatchBehavior for Watch }) } - fn modify(&self, f: &mut dyn Fn(&mut Option)) { + fn send_modify(&self, f: &mut dyn Fn(&mut Option)) { self.mutex.lock(|state| { let mut s = state.borrow_mut(); f(&mut s.data); @@ -323,6 +280,16 @@ impl WatchBehavior for Watch s.wakers.wake(); }) } + + fn send_if_modified(&self, f: &mut dyn Fn(&mut Option) -> bool) { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + if f(&mut s.data) { + s.current_id += 1; + s.wakers.wake(); + } + }) + } } impl Watch { @@ -375,6 +342,60 @@ impl Watch { } }) } + + /// Try to create a new [`AnonReceiver`] for the `Watch`. + pub fn anon_receiver(&self) -> AnonReceiver<'_, M, T, N> { + AnonReceiver(AnonRcv::new(self, 0)) + } + + /// Try to create a new [`DynAnonReceiver`] for the `Watch`. + pub fn dyn_anon_receiver(&self) -> DynAnonReceiver<'_, T> { + DynAnonReceiver(AnonRcv::new(self, 0)) + } + + /// Returns the message ID of the latest message sent to the `Watch`. + /// + /// This counter is monotonic, and is incremented every time a new message is sent. + pub fn get_msg_id(&self) -> u64 { + self.mutex.lock(|state| state.borrow().current_id) + } + + /// Waits for the `Watch` to be initialized with a value using a busy-wait mechanism. + /// + /// This is useful for initialization code where receivers may only be interested in + /// awaiting the value once in the lifetime of the program. It is therefore a temporaryily + /// CPU-inefficient operation, while being more memory efficient than using a `Receiver`. + /// + /// **Note** Be careful about using this within an InterruptExecutor, as it will starve + /// tasks in lower-priority executors. + pub async fn spin_get(&self) -> T { + poll_fn(|cx| { + self.mutex.lock(|state| { + let s = state.borrow(); + match &s.data { + Some(data) => Poll::Ready(data.clone()), + None => { + cx.waker().wake_by_ref(); + Poll::Pending + } + } + }) + }) + .await + } + + /// Tries to get the value of the `Watch`. + pub fn try_get(&self) -> Option { + WatchBehavior::try_get(self, None) + } + + /// Tries to get the value of the `Watch` if it matches the predicate function `f`. + pub fn try_get_and(&self, mut f: F) -> Option + where + F: Fn(&T) -> bool, + { + WatchBehavior::try_get_and(self, None, &mut f) + } } /// A receiver can `.await` a change in the `Watch` value. @@ -407,23 +428,23 @@ impl<'a, T: Clone, W: WatchBehavior + ?Sized> Snd<'a, T, W> { } /// Clears the value of the `Watch`. - /// This will cause calls to [`Rcv::get`] and [`Rcv::peek`] to be pending. + /// This will cause calls to [`Rcv::get`] to be pending. pub fn clear(&self) { self.watch.clear() } /// Tries to retrieve the value of the `Watch`. - pub fn try_peek(&self) -> Option { - self.watch.try_peek() + pub fn try_get(&self) -> Option { + self.watch.try_get(None) } /// Tries to peek the current value of the `Watch` if it matches the predicate /// function `f`. - pub fn try_peek_and(&self, mut f: F) -> Option + pub fn try_get_and(&self, mut f: F) -> Option where F: Fn(&T) -> bool, { - self.watch.try_peek_and(&mut f) + self.watch.try_get_and(None, &mut f) } /// Returns true if the `Watch` contains a value. @@ -432,11 +453,20 @@ impl<'a, T: Clone, W: WatchBehavior + ?Sized> Snd<'a, T, W> { } /// Modify the value of the `Watch` using a closure. - pub fn modify(&self, mut f: F) + pub fn send_modify(&self, mut f: F) where F: Fn(&mut Option), { - self.watch.modify(&mut f) + self.watch.send_modify(&mut f) + } + + /// Modify the value of the `Watch` using a closure. The closure must return + /// `true` if the value was modified, which notifies all receivers. + pub fn send_if_modified(&self, mut f: F) + where + F: Fn(&mut Option) -> bool, + { + self.watch.send_if_modified(&mut f) } } @@ -521,38 +551,6 @@ impl<'a, T: Clone, W: WatchBehavior + ?Sized> Rcv<'a, T, W> { } } - /// Returns the current value of the `Watch` once it is initialized, **without** marking it as seen. - /// - /// **Note**: Futures do nothing unless you `.await` or poll them. - pub async fn peek(&self) -> T { - poll_fn(|cx| self.watch.poll_peek(cx)).await - } - - /// Tries to peek the current value of the `Watch` without waiting, and **without** marking it as seen. - pub fn try_peek(&self) -> Option { - self.watch.try_peek() - } - - /// Returns the current value of the `Watch` if it matches the predicate function `f`, - /// or waits for it to match, **without** marking it as seen. - /// - /// **Note**: Futures do nothing unless you `.await` or poll them. - pub async fn peek_and(&self, mut f: F) -> T - where - F: Fn(&T) -> bool, - { - poll_fn(|cx| self.watch.poll_peek_and(&mut f, cx)).await - } - - /// Tries to peek the current value of the `Watch` if it matches the predicate - /// function `f` without waiting, and **without** marking it as seen. - pub fn try_peek_and(&self, mut f: F) -> Option - where - F: Fn(&T) -> bool, - { - self.watch.try_peek_and(&mut f) - } - /// Returns the current value of the `Watch` once it is initialized, marking it as seen. /// /// **Note**: Futures do nothing unless you `.await` or poll them. @@ -562,7 +560,7 @@ impl<'a, T: Clone, W: WatchBehavior + ?Sized> Rcv<'a, T, W> { /// Tries to get the current value of the `Watch` without waiting, marking it as seen. pub fn try_get(&mut self) -> Option { - self.watch.try_get(&mut self.at_id) + self.watch.try_get(Some(&mut self.at_id)) } /// Returns the value of the `Watch` if it matches the predicate function `f`, @@ -582,7 +580,7 @@ impl<'a, T: Clone, W: WatchBehavior + ?Sized> Rcv<'a, T, W> { where F: Fn(&T) -> bool, { - self.watch.try_get_and(&mut self.at_id, &mut f) + self.watch.try_get_and(Some(&mut self.at_id), &mut f) } /// Waits for the `Watch` to change and returns the new value, marking it as seen. @@ -618,7 +616,7 @@ impl<'a, T: Clone, W: WatchBehavior + ?Sized> Rcv<'a, T, W> { } /// Checks if the `Watch` contains a value. If this returns true, - /// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately. + /// then awaiting [`Rcv::get`] will return immediately. pub fn contains_value(&self) -> bool { self.watch.contains_value() } @@ -630,6 +628,58 @@ impl<'a, T: Clone, W: WatchBehavior + ?Sized> Drop for Rcv<'a, T, W> { } } +/// A anonymous receiver can NOT `.await` a change in the `Watch` value. +pub struct AnonRcv<'a, T: Clone, W: WatchBehavior + ?Sized> { + watch: &'a W, + at_id: u64, + _phantom: PhantomData, +} + +impl<'a, T: Clone, W: WatchBehavior + ?Sized> AnonRcv<'a, T, W> { + /// Creates a new `Receiver` with a reference to the `Watch`. + fn new(watch: &'a W, at_id: u64) -> Self { + Self { + watch, + at_id, + _phantom: PhantomData, + } + } + + /// Tries to get the current value of the `Watch` without waiting, marking it as seen. + pub fn try_get(&mut self) -> Option { + self.watch.try_get(Some(&mut self.at_id)) + } + + /// Tries to get the current value of the `Watch` if it matches the predicate + /// function `f` without waiting, marking it as seen. + pub fn try_get_and(&mut self, mut f: F) -> Option + where + F: Fn(&T) -> bool, + { + self.watch.try_get_and(Some(&mut self.at_id), &mut f) + } + + /// Tries to get the new value of the watch without waiting, marking it as seen. + pub fn try_changed(&mut self) -> Option { + self.watch.try_changed(&mut self.at_id) + } + + /// Tries to get the new value of the watch which satisfies the predicate + /// function `f` and returns the new value without waiting, marking it as seen. + pub fn try_changed_and(&mut self, mut f: F) -> Option + where + F: Fn(&T) -> bool, + { + self.watch.try_changed_and(&mut self.at_id, &mut f) + } + + /// Checks if the `Watch` contains a value. If this returns true, + /// then awaiting [`Rcv::get`] will return immediately. + pub fn contains_value(&self) -> bool { + self.watch.contains_value() + } +} + /// A receiver of a `Watch` channel. pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch>); @@ -682,6 +732,58 @@ impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> { } } +/// A receiver of a `Watch` channel that cannot `.await` values. +pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch>); + +impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> { + /// Converts the `Receiver` into a [`DynReceiver`]. + pub fn as_dyn(self) -> DynAnonReceiver<'a, T> { + let rcv = DynAnonReceiver(AnonRcv::new(self.0.watch, self.at_id)); + core::mem::forget(self); // Ensures the destructor is not called + rcv + } +} + +impl<'a, M: RawMutex, T: Clone, const N: usize> Into> for AnonReceiver<'a, M, T, N> { + fn into(self) -> DynAnonReceiver<'a, T> { + self.as_dyn() + } +} + +impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for AnonReceiver<'a, M, T, N> { + type Target = AnonRcv<'a, T, Watch>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for AnonReceiver<'a, M, T, N> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// A receiver that cannot `.await` value, which holds a **dynamic** reference to a `Watch` channel. +/// +/// This is an alternative to [`AnonReceiver`] with a simpler type definition, at the expense of +/// some runtime performance due to dynamic dispatch. +pub struct DynAnonReceiver<'a, T: Clone>(AnonRcv<'a, T, dyn WatchBehavior + 'a>); + +impl<'a, T: Clone> Deref for DynAnonReceiver<'a, T> { + type Target = AnonRcv<'a, T, dyn WatchBehavior + 'a>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Clone> DerefMut for DynAnonReceiver<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + #[cfg(test)] mod tests { use futures_executor::block_on; @@ -715,6 +817,72 @@ mod tests { block_on(f); } + #[test] + fn all_try_get() { + let f = async { + static WATCH: Watch = Watch::new(); + + // Obtain receiver and sender + let mut rcv = WATCH.receiver().unwrap(); + let snd = WATCH.sender(); + + // Not initialized + assert_eq!(WATCH.try_get(), None); + assert_eq!(rcv.try_get(), None); + assert_eq!(snd.try_get(), None); + + // Receive the new value + snd.send(10); + assert_eq!(WATCH.try_get(), Some(10)); + assert_eq!(rcv.try_get(), Some(10)); + assert_eq!(snd.try_get(), Some(10)); + + assert_eq!(WATCH.try_get_and(|x| x > &5), Some(10)); + assert_eq!(rcv.try_get_and(|x| x > &5), Some(10)); + assert_eq!(snd.try_get_and(|x| x > &5), Some(10)); + + assert_eq!(WATCH.try_get_and(|x| x < &5), None); + assert_eq!(rcv.try_get_and(|x| x < &5), None); + assert_eq!(snd.try_get_and(|x| x < &5), None); + }; + block_on(f); + } + + #[test] + fn once_lock_like() { + let f = async { + static CONFIG0: u8 = 10; + static CONFIG1: u8 = 20; + + static WATCH: Watch = Watch::new(); + + // Obtain receiver and sender + let mut rcv = WATCH.receiver().unwrap(); + let snd = WATCH.sender(); + + // Not initialized + assert_eq!(rcv.try_changed(), None); + + // Receive the new value + snd.send(&CONFIG0); + let rcv0 = rcv.changed().await; + assert_eq!(rcv0, &10); + + // Receive another value + snd.send(&CONFIG1); + let rcv1 = rcv.try_changed(); + assert_eq!(rcv1, Some(&20)); + + // No update + assert_eq!(rcv.try_changed(), None); + + // Ensure similarity with original static + assert_eq!(rcv0, &CONFIG0); + assert_eq!(rcv1, Some(&CONFIG1)); + }; + block_on(f); + } + #[test] fn sender_modify() { let f = async { @@ -729,7 +897,7 @@ mod tests { assert_eq!(rcv.try_changed(), Some(10)); // Modify the value inplace - snd.modify(|opt| { + snd.send_modify(|opt| { if let Some(inner) = opt { *inner += 5; } @@ -751,11 +919,6 @@ mod tests { let mut rcv = WATCH.receiver().unwrap(); let snd = WATCH.sender(); - snd.send(10); - assert_eq!(rcv.try_peek_and(|x| x > &5), Some(10)); - assert_eq!(rcv.try_peek_and(|x| x < &5), None); - assert!(rcv.try_changed().is_some()); - snd.send(15); assert_eq!(rcv.try_get_and(|x| x > &5), Some(15)); assert_eq!(rcv.try_get_and(|x| x < &5), None); @@ -771,7 +934,6 @@ mod tests { snd.send(30); assert_eq!(rcv.changed_and(|x| x > &5).await, 30); - assert_eq!(rcv.peek_and(|x| x > &5).await, 30); assert_eq!(rcv.get_and(|x| x > &5).await, 30); }; block_on(f); @@ -825,7 +987,7 @@ mod tests { // Obtain receivers and sender let mut rcv0 = WATCH.receiver().unwrap(); - let mut rcv1 = WATCH.receiver().unwrap(); + let mut rcv1 = WATCH.anon_receiver(); let snd = WATCH.sender(); // No update for both @@ -864,41 +1026,13 @@ mod tests { block_on(f); } - #[test] - fn peek_get_changed() { - let f = async { - static WATCH: Watch = Watch::new(); - - // Obtain receiver and sender - let mut rcv = WATCH.receiver().unwrap(); - let snd = WATCH.sender(); - - // Send a value - snd.send(10); - - // Ensure peek does not mark as seen - assert_eq!(rcv.peek().await, 10); - assert_eq!(rcv.try_changed(), Some(10)); - assert_eq!(rcv.try_changed(), None); - assert_eq!(rcv.try_peek(), Some(10)); - - // Send a value - snd.send(20); - - // Ensure get does mark as seen - assert_eq!(rcv.get().await, 20); - assert_eq!(rcv.try_changed(), None); - assert_eq!(rcv.try_get(), Some(20)); - }; - block_on(f); - } - #[test] fn use_dynamics() { let f = async { static WATCH: Watch = Watch::new(); // Obtain receiver and sender + let mut anon_rcv = WATCH.dyn_anon_receiver(); let mut dyn_rcv = WATCH.dyn_receiver().unwrap(); let dyn_snd = WATCH.dyn_sender(); @@ -906,6 +1040,7 @@ mod tests { dyn_snd.send(10); // Ensure the dynamic receiver receives the value + assert_eq!(anon_rcv.try_changed(), Some(10)); assert_eq!(dyn_rcv.try_changed(), Some(10)); assert_eq!(dyn_rcv.try_changed(), None); }; @@ -918,10 +1053,12 @@ mod tests { static WATCH: Watch = Watch::new(); // Obtain receiver and sender + let anon_rcv = WATCH.anon_receiver(); let rcv = WATCH.receiver().unwrap(); let snd = WATCH.sender(); // Convert to dynamic + let mut dyn_anon_rcv = anon_rcv.as_dyn(); let mut dyn_rcv = rcv.as_dyn(); let dyn_snd = snd.as_dyn(); @@ -929,6 +1066,7 @@ mod tests { dyn_snd.send(10); // Ensure the dynamic receiver receives the value + assert_eq!(dyn_anon_rcv.try_changed(), Some(10)); assert_eq!(dyn_rcv.try_changed(), Some(10)); assert_eq!(dyn_rcv.try_changed(), None); }; From 999807f226623669a9cfc8ca218d3c81f0c04a77 Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Mon, 23 Sep 2024 20:29:50 +0200 Subject: [PATCH 14/15] Added SealedWatchBehavior to limit access to core functions --- embassy-sync/src/watch.rs | 137 ++++++++++++++++++++------------------ 1 file changed, 71 insertions(+), 66 deletions(-) diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs index 1b4a8b5895..4b7ffa5fcd 100644 --- a/embassy-sync/src/watch.rs +++ b/embassy-sync/src/watch.rs @@ -76,28 +76,14 @@ struct WatchState { receiver_count: usize, } -/// A trait representing the 'inner' behavior of the `Watch`. -pub trait WatchBehavior { - /// Sends a new value to the `Watch`. - fn send(&self, val: T); - - /// Clears the value of the `Watch`. - fn clear(&self); - +trait SealedWatchBehavior { /// Poll the `Watch` for the current value, making it as seen. fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll; - /// Tries to get the value of the `Watch`, marking it as seen, if an id is given. - fn try_get(&self, id: Option<&mut u64>) -> Option; - /// Poll the `Watch` for the value if it matches the predicate function /// `f`, making it as seen. fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll; - /// Tries to get the value of the `Watch` if it matches the predicate function - /// `f`, marking it as seen. - fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option; - /// Poll the `Watch` for a changed value, marking it as seen, if an id is given. fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll; @@ -112,8 +98,16 @@ pub trait WatchBehavior { /// predicate function `f`, marking it as seen. fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option; - /// Checks if the `Watch` is been initialized with a value. - fn contains_value(&self) -> bool; + /// Used when a receiver is dropped to decrement the receiver count. + /// + /// ## This method should not be called by the user. + fn drop_receiver(&self); + + /// Clears the value of the `Watch`. + fn clear(&self); + + /// Sends a new value to the `Watch`. + fn send(&self, val: T); /// Modify the value of the `Watch` using a closure. Returns `false` if the /// `Watch` does not already contain a value. @@ -122,30 +116,23 @@ pub trait WatchBehavior { /// Modify the value of the `Watch` using a closure. Returns `false` if the /// `Watch` does not already contain a value. fn send_if_modified(&self, f: &mut dyn Fn(&mut Option) -> bool); - - /// Used when a receiver is dropped to decrement the receiver count. - /// - /// ## This method should not be called by the user. - fn drop_receiver(&self); } -impl WatchBehavior for Watch { - fn send(&self, val: T) { - self.mutex.lock(|state| { - let mut s = state.borrow_mut(); - s.data = Some(val); - s.current_id += 1; - s.wakers.wake(); - }) - } +/// A trait representing the 'inner' behavior of the `Watch`. +#[allow(private_bounds)] +pub trait WatchBehavior: SealedWatchBehavior { + /// Tries to get the value of the `Watch`, marking it as seen, if an id is given. + fn try_get(&self, id: Option<&mut u64>) -> Option; - fn clear(&self) { - self.mutex.lock(|state| { - let mut s = state.borrow_mut(); - s.data = None; - }) - } + /// Tries to get the value of the `Watch` if it matches the predicate function + /// `f`, marking it as seen. + fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option; + + /// Checks if the `Watch` is been initialized with a value. + fn contains_value(&self) -> bool; +} +impl SealedWatchBehavior for Watch { fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll { self.mutex.lock(|state| { let mut s = state.borrow_mut(); @@ -162,16 +149,6 @@ impl WatchBehavior for Watch }) } - fn try_get(&self, id: Option<&mut u64>) -> Option { - self.mutex.lock(|state| { - let s = state.borrow(); - if let Some(id) = id { - *id = s.current_id; - } - s.data.clone() - }) - } - fn poll_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll { self.mutex.lock(|state| { let mut s = state.borrow_mut(); @@ -188,21 +165,6 @@ impl WatchBehavior for Watch }) } - fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option { - self.mutex.lock(|state| { - let s = state.borrow(); - match s.data { - Some(ref data) if f(data) => { - if let Some(id) = id { - *id = s.current_id; - } - Some(data.clone()) - } - _ => None, - } - }) - } - fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll { self.mutex.lock(|state| { let mut s = state.borrow_mut(); @@ -261,10 +223,6 @@ impl WatchBehavior for Watch }) } - fn contains_value(&self) -> bool { - self.mutex.lock(|state| state.borrow().data.is_some()) - } - fn drop_receiver(&self) { self.mutex.lock(|state| { let mut s = state.borrow_mut(); @@ -272,6 +230,22 @@ impl WatchBehavior for Watch }) } + fn clear(&self) { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + s.data = None; + }) + } + + fn send(&self, val: T) { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + s.data = Some(val); + s.current_id += 1; + s.wakers.wake(); + }) + } + fn send_modify(&self, f: &mut dyn Fn(&mut Option)) { self.mutex.lock(|state| { let mut s = state.borrow_mut(); @@ -292,6 +266,37 @@ impl WatchBehavior for Watch } } +impl WatchBehavior for Watch { + fn try_get(&self, id: Option<&mut u64>) -> Option { + self.mutex.lock(|state| { + let s = state.borrow(); + if let Some(id) = id { + *id = s.current_id; + } + s.data.clone() + }) + } + + fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option { + self.mutex.lock(|state| { + let s = state.borrow(); + match s.data { + Some(ref data) if f(data) => { + if let Some(id) = id { + *id = s.current_id; + } + Some(data.clone()) + } + _ => None, + } + }) + } + + fn contains_value(&self) -> bool { + self.mutex.lock(|state| state.borrow().data.is_some()) + } +} + impl Watch { /// Create a new `Watch` channel. pub const fn new() -> Self { From 5e1912a2d3adea920039dae3622643f34289290b Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Tue, 24 Sep 2024 12:37:32 +0200 Subject: [PATCH 15/15] Reverse generics order, remove spin_get --- embassy-sync/src/watch.rs | 28 ++-------------------------- 1 file changed, 2 insertions(+), 26 deletions(-) diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs index 4b7ffa5fcd..336e64ba9d 100644 --- a/embassy-sync/src/watch.rs +++ b/embassy-sync/src/watch.rs @@ -66,10 +66,10 @@ use crate::waitqueue::MultiWakerRegistration; /// block_on(f); /// ``` pub struct Watch { - mutex: Mutex>>, + mutex: Mutex>>, } -struct WatchState { +struct WatchState { data: Option, current_id: u64, wakers: MultiWakerRegistration, @@ -365,30 +365,6 @@ impl Watch { self.mutex.lock(|state| state.borrow().current_id) } - /// Waits for the `Watch` to be initialized with a value using a busy-wait mechanism. - /// - /// This is useful for initialization code where receivers may only be interested in - /// awaiting the value once in the lifetime of the program. It is therefore a temporaryily - /// CPU-inefficient operation, while being more memory efficient than using a `Receiver`. - /// - /// **Note** Be careful about using this within an InterruptExecutor, as it will starve - /// tasks in lower-priority executors. - pub async fn spin_get(&self) -> T { - poll_fn(|cx| { - self.mutex.lock(|state| { - let s = state.borrow(); - match &s.data { - Some(data) => Poll::Ready(data.clone()), - None => { - cx.waker().wake_by_ref(); - Poll::Pending - } - } - }) - }) - .await - } - /// Tries to get the value of the `Watch`. pub fn try_get(&self) -> Option { WatchBehavior::try_get(self, None)