Skip to content

Commit

Permalink
ENH: Add a StableSubscriptionStatus
Browse files Browse the repository at this point in the history
This Status is used for ensuring an ophyd event stays stable for some time.
  • Loading branch information
DominicOram committed Sep 2, 2023
1 parent 5c413cc commit 3961658
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 2 deletions.
29 changes: 29 additions & 0 deletions docs/user_v1/explanations/status.rst
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,35 @@ Note that ``set_finished``, ``subscribe`` and ``clear_sub`` are gone; they are
handled automatically, internally. See
:class:`~ophyd.status.SubscriptionStatus` for additional options.

StableSubscriptionStatus
------------------------

The :class:`~ophyd.status.StableSubscriptionStatus` is a Status object that is
similar to the :class:`~ophyd.status.SubscriptionStatus` but is only marked
finished based on an ophyd event remaining stable for some given time. For
example, this could be used to ensure a temperature remains in a given range
for a set amount of time:

.. code:: python
from ophyd import Device, Component, StableSubscriptionStatus
class MyTempSensor(Device):
...
# The set point and readback of a temperature that
# may fluctuate for a second before it can be considered set
temp_sp = Component(...)
temp_rbv = Component(...)
def set(self, set_value):
def check_value(*, old_value, value, **kwargs):
"Return True when the temperature is in a valid range."
return set_value - 0.01 < value < set_value + 0.01
status = StableSubscriptionStatus(self.temp_rbv, check_value, stability_time=1)
self.temp_sp.set(set_value)
return status
Partial Progress Updates
------------------------

Expand Down
2 changes: 1 addition & 1 deletion docs/user_v1/reference/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Added

* Add support for EPICS area detector pvaDriver cam
* Add status repr to ``WaitTimeoutError`` message

* New ``StableSubscriptionStatus``

Changes
-------
Expand Down
92 changes: 92 additions & 0 deletions ophyd/status.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import threading
import time
from collections import deque
from functools import partial
from logging import LoggerAdapter
from typing import Optional
from warnings import warn

import numpy as np
Expand Down Expand Up @@ -749,6 +751,96 @@ def _handle_failure(self):
return super()._handle_failure()


class StableSubscriptionStatus(SubscriptionStatus):
"""
Status updated via ``ophyd`` events which will wait for the event to be
stable (the callback continuing to return true) until being complete.
If the event becomes unstable and then back to stable this timer will
be reset.
Parameters
----------
device : obj
callback : callable
Callback that takes event information and returns a boolean. Signature
should be ``f(*, old_value, value, **kwargs)``. The arguments
old_value and value will be passed in by keyword, so their order does
not matter
stability_time: float
How long the event should remain stable for the status to be done
event_type : str, optional
Name of event type to check whether the device has finished succesfully
timeout : float, optional
Maximum timeout to wait to mark the request as a failure
settle_time : float, optional
Time to wait after completion until running callbacks
run: bool, optional
Run the callback now
"""
def __init__(
self,
device,
callback,
stability_time,
event_type=None,
timeout=None,
settle_time=None,
run=True,
):
if timeout and stability_time > timeout:
raise ValueError(f"Stability time ({stability_time}) must be less than full status timeout ({timeout})")
self._stability_time = stability_time
self._stable_timer = threading.Timer(self._stability_time, partial(self._finished, success=True))

# Start timeout thread in the background
super().__init__(device, callback, event_type, timeout=timeout, settle_time=settle_time, run=run)

def check_value(self, *args, **kwargs):
"""
Update the status object
"""
try:
success = self.callback(*args, **kwargs)

# If successfull start a timer for completion
if success:
if not self._stable_timer.is_alive():
self._stable_timer.start()
else:
self._stable_timer.cancel()
self._stable_timer = threading.Timer(self._stability_time, partial(self._finished, success=True))

# Do not fail silently
except Exception as e:
self.log.error(e)
raise

def set_finished(self):
"""
Mark as finished successfully.
This method should generally not be called by the *recipient* of this
Status object, but only by the object that created and returned it.
"""
# Cancel timer
self._stable_timer.cancel()
# Run completion
super().set_finished()

def _handle_failure(self):
# This is called whether we fail via the timeout thread or via an
# a call to set_exception.
# Cancel timer
self._stable_timer.cancel()
return super()._handle_failure()


class MoveStatus(DeviceStatus):
"""
Track the state of a movement from some initial to final "position".
Expand Down
82 changes: 81 additions & 1 deletion ophyd/tests/test_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
import pytest

from ophyd import Device
from ophyd.status import MoveStatus, StatusBase, SubscriptionStatus, UseNewProperty
from ophyd.status import (
MoveStatus,
StableSubscriptionStatus,
StatusBase,
SubscriptionStatus,
UseNewProperty,
)
from ophyd.utils import (
InvalidState,
StatusTimeoutError,
Expand Down Expand Up @@ -136,6 +142,80 @@ def cb(*args, done=False, **kwargs):
assert status.done and status.success


def test_given_stability_time_greater_than_timeout_then_exception_on_initialisation():
# Arbitrary device
d = Device("Tst:Prefix", name="test")

with pytest.raises(ValueError):
StableSubscriptionStatus(d, Mock(), stability_time=2, timeout=1, event_type=d.SUB_ACQ_DONE)


def test_given_callback_stays_stable_then_stable_status_eventual_returns_done():
# Arbitrary device
d = Device("Tst:Prefix", name="test")
# Mock callback
m = Mock()

# Full fake callback signature
def cb(*args, done=False, **kwargs):
# Run mock callback
m()
# Return finished or not
return done

status = StableSubscriptionStatus(d, cb, 0.2, event_type=d.SUB_ACQ_DONE)

# Run callbacks that return complete but status waits until stable
d._run_subs(sub_type=d.SUB_ACQ_DONE, done=True)
time.sleep(0.1) # Wait for callbacks to run.
assert m.called
assert not status.done and not status.success

time.sleep(0.15)
assert status.done and status.success


def test_given_callback_fluctuates_and_stabalises_then_stable_status_eventual_returns_done():
# Arbitrary device
d = Device("Tst:Prefix", name="test")
# Mock callback
m = Mock()

# Full fake callback signature
def cb(*args, done=False, **kwargs):
# Run mock callback
m()
# Return finished or not
return done

status = StableSubscriptionStatus(d, cb, 0.2, event_type=d.SUB_ACQ_DONE)

# First start as looking stable
d._run_subs(sub_type=d.SUB_ACQ_DONE, done=True)
time.sleep(0.1) # Wait for callbacks to run.
assert m.called
assert not status.done and not status.success

# Then become unstable
d._run_subs(sub_type=d.SUB_ACQ_DONE, done=False)
time.sleep(0.1) # Wait for callbacks to run.
assert m.called
assert not status.done and not status.success

# Still not successful
time.sleep(0.15)
assert not status.done and not status.success

# Now test properly stable
d._run_subs(sub_type=d.SUB_ACQ_DONE, done=True)
time.sleep(0.1) # Wait for callbacks to run.
assert m.called
assert not status.done and not status.success

time.sleep(0.15)
assert status.done and status.success


def test_and():
st1 = StatusBase()
st2 = StatusBase()
Expand Down

0 comments on commit 3961658

Please sign in to comment.