Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Watch sync primitive (similar to tokio::sync::watch) #2568

Merged
merged 16 commits into from
Oct 1, 2024

Conversation

peterkrull
Copy link
Contributor

@peterkrull peterkrull commented Feb 13, 2024

This PR adds something similar to tokio watch. It is a single-slot channel similar to Signal, but it can have multiple receivers, all of which get to have a clone of the value. So also similar to a PubSubChannel, but with a single-slot queue. Primary methods of the receivers are:

  • changed: Await changes to the value, marking it as seen.
  • get: Await the value (does not have to change) while marking it as seen

All of the above also have try_xxx methods, as well as xxx_and methods which allow for only receiving when the value matches a predicate function. The doc-test comment and regular tests contain examples.

@peterkrull peterkrull marked this pull request as draft February 25, 2024 13:29
@peterkrull peterkrull changed the title Add initial implementation of MultiSignal sync primitive (similar to tokio::sync::watch) Add Watch sync primitive (similar to tokio::sync::watch) Feb 28, 2024
@peterkrull
Copy link
Contributor Author

peterkrull commented Mar 1, 2024

I am considering removing the peek methods for the receivers. Their use case is dubious, and not marking a message as seen can make stuff hard to reason about for the user. I have also reintroduce predicate methods, which allows the user to await (or try) a change which also meets a certain criteria, defined by a closure Fn(&T) -> bool, such that the value is not needlessly cloned to do such a comparison.

@CBJamo
Copy link
Contributor

CBJamo commented Sep 23, 2024

This would be helpful for my work as well. I make a lot of channels with a depth of 1. If there's anything I can do to help push this through let me know.

I agree about the peek functions making reasoning hard. I would remove them, though that isn't a super strong preference if someone has a compelling use.

@peterkrull
Copy link
Contributor Author

I would be happy to get back to this. I do have some changes to the version I have been using locally. I have abandoned the peek functions since I did not find them useful anyway. I have also put in place (for my own sake) an arbitrary limit on the senders, so there can only be one sender for a Watch channel, but I am okay with reverting this. I also have what I call an AnonReceiver, which is just a receiver that cannot use any of the async functions, which saves a Waker, and it is not necessary to unwrap when getting it. I will push my local version soon

@CBJamo
Copy link
Contributor

CBJamo commented Sep 23, 2024

Hmm, I have used depth-1 pubsubs with multiple senders in a couple of cases.

First is having something like a disable motors pubsub, that can be published to by either an estop button or i2c task. This could be fairly easily built up from 3 Watch objects and an intermediate task. But as the number of disable sources increase it could get unwieldy.

Second is a way to request a sensor reading, many tasks can ask for a new value, and the task responsible for reading from that sensor reads and sends it back on another pubsub. The first part of that can be done with a signal, but I'm not a fan of it's ergonomics.

Possibly a Watch-but-backwards would be a useful tool as well. The code may well be identical, but it might be valuable to have a logical separation between the many-to-one and one-to-many primitives.

@peterkrull
Copy link
Contributor Author

I mainly intended for this to be a one-to-many channel, since that is what we cannot do with a signal, but there is really no reason why I should impose this hard constraint on others. I will just revert the one-sender limitation.

First is having something like a disable motors pubsub, that can be published to by either an estop button or i2c task. This could be fairly easily built up from 3 Watch objects and an intermediate task. But as the number of disable sources increase it could get unwieldy.

I actually think it is the other way around, with just a couple of different sources of estop, having shared over-write access is probably fine, but with many more sources I like the approach of having a task specifically with the purpose of aggregating and processing these different sources. I use this for deciding whether it is safe to arm a drone, where I have a whole bunch of different conditions (implemented using bitflags) that must be cleared first. Then the task governing the motors just uses the following to wait for this all-clear.

// Ensure all blockers are lowered before proceeding
rcv_arming_blocker.changed_and(|f| f.is_empty()).await;

@CBJamo
Copy link
Contributor

CBJamo commented Sep 23, 2024

That's a good point, as the number of blocker/disarm sources rises, the odds of wanting some logic in there rises with it.

As an aside, that snippet is from Holsatus, right? Very cool project, thanks for writing and sharing it. I would have thought the lack of an FPU would give the 2040 trouble as a flight controller. The 2350 should be an amazing target though.

@peterkrull peterkrull marked this pull request as ready for review September 23, 2024 18:42
Copy link
Member

@lulf lulf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting in the work, I left some comments

embassy-sync/src/watch.rs Outdated Show resolved Hide resolved
embassy-sync/src/watch.rs Outdated Show resolved Hide resolved
Comment on lines +184 to +224
fn try_changed(&self, id: &mut u64) -> Option<T> {
self.mutex.lock(|state| {
let s = state.borrow();
match s.current_id > *id {
true => {
*id = s.current_id;
s.data.clone()
}
false => None,
}
})
}

fn poll_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
self.mutex.lock(|state| {
let mut s = state.borrow_mut();
match (&s.data, s.current_id > *id) {
(Some(data), true) if f(data) => {
*id = s.current_id;
Poll::Ready(data.clone())
}
_ => {
s.wakers.register(cx.waker());
Poll::Pending
}
}
})
}

fn try_changed_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
self.mutex.lock(|state| {
let s = state.borrow();
match (&s.data, s.current_id > *id) {
(Some(data), true) if f(data) => {
*id = s.current_id;
s.data.clone()
}
_ => None,
}
})
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There appears to be a lot of code duplication in these implementations. Could you combine several of these into a single internal one on the Watch type that these could use?

This applies to the different send_ and get_ variants as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can combine all the poll, try, changed, get, and into a single monolith:

fn receive_inner(
    &self,
    id: Option<&mut u64>,
    f: Option<&mut dyn Fn(&T) -> bool>,
    cx: Option<&mut Context<'_>>,
    must_change: bool,
) -> Poll<T> {
    self.mutex.lock(|state| {
        let mut s = state.borrow_mut();

        // If we require the value to have changed, but the state ID is not larger
        // than the receiver ID, we register the waker and return pending.
        if must_change && !id.as_ref().is_some_and(|id| s.current_id > **id) {
            cx.map(|cx| s.wakers.register(cx.waker()));
            return Poll::Pending;
        }

        // If the data does not exist, we register the waker and return pending.
        let Some(data) = &s.data else {
            cx.map(|cx| s.wakers.register(cx.waker()));
            return Poll::Pending;
        };

        // If the predicate exists, but it does not match,
        // we register the waker and return pending.
        if f.map_or(false, |f| !f(data)) {
            cx.map(|cx| s.wakers.register(cx.waker()));
            return Poll::Pending;
        }

        // Update the receiver ID, if any, and return value
        id.map(|id| *id = s.current_id);
        return Poll::Ready(data.clone());
    })
}

I guess this would still get optimized well, since user-code can only access this functionality through functions where the Options have a known variant, like:

/// Waits for the `Watch` to change and returns the new value, marking it as seen.
pub async fn changed(&mut self) -> T {
    poll_fn(|cx| self.watch.receive_inner(Some(&mut self.at_id), None, Some(cx), true)).await
}

/// Tries to get the new value of the watch without waiting, marking it as seen.
pub fn try_changed(&mut self) -> Option<T> {
    match self.watch.receive_inner(Some(&mut self.at_id), None, None, true) {
        Poll::Ready(data) => Some(data),
        Poll::Pending => None,
    }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yeah. There was more nuance than I saw initially, maybe keep the original then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my quick testing looking at some smaller but similar examples on godbolt, the monolith seems to optimize just fine. So it is mostly a matter of whether you prefer some code duplication, or the monolith. I personally find the duplicated code easier to reason through, and it does not make the code look too crazy elsewhere. I'll leave it up to you. I am fine with either

@peterkrull
Copy link
Contributor Author

@lulf Do you have any further input on this?

Copy link
Member

@lulf lulf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@lulf lulf added this pull request to the merge queue Oct 1, 2024
Merged via the queue into embassy-rs:main with commit e6ce810 Oct 1, 2024
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants