From 5367703c9812680a563fb904b8c9d187da310bd3 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Tue, 1 Aug 2023 12:44:50 -0400 Subject: [PATCH] Fix spin_once_until_future_complete to quit when the future finishes. (#1143) * Fix spin_once_until_future_complete to quit when the future finishes. This makes it match the function name much more closely. While we are in here, also fix a bug where the multi-threaded version would not quit immediately after the future completes. And also add tests for both of these situations. Signed-off-by: Chris Lalancette --- rclpy/rclpy/executors.py | 22 +++++++++++--- rclpy/test/test_executor.py | 57 +++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 4 deletions(-) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index 0a50bd7ec..6f318791c 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -745,24 +745,37 @@ class SingleThreadedExecutor(Executor): def __init__(self, *, context: Optional[Context] = None) -> None: super().__init__(context=context) - def spin_once(self, timeout_sec: Optional[float] = None) -> None: + def _spin_once_impl( + self, + timeout_sec: Optional[float] = None, + wait_condition: Callable[[], bool] = lambda: False + ) -> None: try: - handler, entity, node = self.wait_for_ready_callbacks(timeout_sec=timeout_sec) + handler, entity, node = self.wait_for_ready_callbacks( + timeout_sec, None, wait_condition) except ShutdownException: pass except TimeoutException: pass + except ConditionReachedException: + pass else: handler() if handler.exception() is not None: raise handler.exception() + handler.result() # raise any exceptions + + def spin_once(self, timeout_sec: Optional[float] = None) -> None: + self._spin_once_impl(timeout_sec) + def spin_once_until_future_complete( self, future: Future, timeout_sec: Optional[float] = None ) -> None: - self.spin_once(timeout_sec) + future.add_done_callback(lambda x: self.wake()) + self._spin_once_impl(timeout_sec, future.done) class MultiThreadedExecutor(Executor): @@ -823,7 +836,7 @@ def _spin_once_impl( for future in self._futures[:]: if future.done(): self._futures.remove(future) - future.result() # re-raise any exceptions + future.result() # raise any exceptions def spin_once(self, timeout_sec: Optional[float] = None) -> None: self._spin_once_impl(timeout_sec) @@ -833,4 +846,5 @@ def spin_once_until_future_complete( future: Future, timeout_sec: Optional[float] = None ) -> None: + future.add_done_callback(lambda x: self.wake()) self._spin_once_impl(timeout_sec, future.done) diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index db03b02ae..efffe8f3c 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -37,6 +37,7 @@ def setUp(self): def tearDown(self): self.node.destroy_node() rclpy.shutdown(context=self.context) + self.context.destroy() def func_execution(self, executor): got_callback = False @@ -520,6 +521,62 @@ def test_context_manager(self): # Make sure it does not raise (smoke test) executor.shutdown() + def test_single_threaded_spin_once_until_future(self): + self.assertIsNotNone(self.node.handle) + executor = SingleThreadedExecutor(context=self.context) + + future = Future(executor=executor) + + # Setup a thread to spin_once_until_future_complete, which will spin + # for a maximum of 10 seconds. + start = time.time() + thread = threading.Thread(target=executor.spin_once_until_future_complete, + args=(future, 10)) + thread.start() + + # Mark the future as complete immediately + future.set_result(True) + + thread.join() + end = time.time() + + time_spent = end - start + + # Since we marked the future as complete immediately, the amount of + # time we spent should be *substantially* less than the 10 second + # timeout we set on the spin. + assert time_spent < 10 + + executor.shutdown() + + def test_multi_threaded_spin_once_until_future(self): + self.assertIsNotNone(self.node.handle) + executor = MultiThreadedExecutor(context=self.context) + + future = Future(executor=executor) + + # Setup a thread to spin_once_until_future_complete, which will spin + # for a maximum of 10 seconds. + start = time.time() + thread = threading.Thread(target=executor.spin_once_until_future_complete, + args=(future, 10)) + thread.start() + + # Mark the future as complete immediately + future.set_result(True) + + thread.join() + end = time.time() + + time_spent = end - start + + # Since we marked the future as complete immediately, the amount of + # time we spent should be *substantially* less than the 10 second + # timeout we set on the spin. + assert time_spent < 10 + + executor.shutdown() + if __name__ == '__main__': unittest.main()