Skip to content

Commit

Permalink
Fix spin_once_until_future_complete to quit when the future finishes. (
Browse files Browse the repository at this point in the history
…#1143)

* Fix spin_once_until_future_complete to quit when the future finishes.

This makes it match the function name much more closely.
While we are in here, also fix a bug where the multi-threaded
version would not quit immediately after the future completes.
And also add tests for both of these situations.

Signed-off-by: Chris Lalancette <[email protected]>
  • Loading branch information
clalancette authored Aug 1, 2023
1 parent 913afa0 commit 5367703
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 4 deletions.
22 changes: 18 additions & 4 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,24 +745,37 @@ class SingleThreadedExecutor(Executor):
def __init__(self, *, context: Optional[Context] = None) -> None:
super().__init__(context=context)

def spin_once(self, timeout_sec: Optional[float] = None) -> None:
def _spin_once_impl(
self,
timeout_sec: Optional[float] = None,
wait_condition: Callable[[], bool] = lambda: False
) -> None:
try:
handler, entity, node = self.wait_for_ready_callbacks(timeout_sec=timeout_sec)
handler, entity, node = self.wait_for_ready_callbacks(
timeout_sec, None, wait_condition)
except ShutdownException:
pass
except TimeoutException:
pass
except ConditionReachedException:
pass
else:
handler()
if handler.exception() is not None:
raise handler.exception()

handler.result() # raise any exceptions

def spin_once(self, timeout_sec: Optional[float] = None) -> None:
self._spin_once_impl(timeout_sec)

def spin_once_until_future_complete(
self,
future: Future,
timeout_sec: Optional[float] = None
) -> None:
self.spin_once(timeout_sec)
future.add_done_callback(lambda x: self.wake())
self._spin_once_impl(timeout_sec, future.done)


class MultiThreadedExecutor(Executor):
Expand Down Expand Up @@ -823,7 +836,7 @@ def _spin_once_impl(
for future in self._futures[:]:
if future.done():
self._futures.remove(future)
future.result() # re-raise any exceptions
future.result() # raise any exceptions

def spin_once(self, timeout_sec: Optional[float] = None) -> None:
self._spin_once_impl(timeout_sec)
Expand All @@ -833,4 +846,5 @@ def spin_once_until_future_complete(
future: Future,
timeout_sec: Optional[float] = None
) -> None:
future.add_done_callback(lambda x: self.wake())
self._spin_once_impl(timeout_sec, future.done)
57 changes: 57 additions & 0 deletions rclpy/test/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def setUp(self):
def tearDown(self):
self.node.destroy_node()
rclpy.shutdown(context=self.context)
self.context.destroy()

def func_execution(self, executor):
got_callback = False
Expand Down Expand Up @@ -520,6 +521,62 @@ def test_context_manager(self):
# Make sure it does not raise (smoke test)
executor.shutdown()

def test_single_threaded_spin_once_until_future(self):
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)

future = Future(executor=executor)

# Setup a thread to spin_once_until_future_complete, which will spin
# for a maximum of 10 seconds.
start = time.time()
thread = threading.Thread(target=executor.spin_once_until_future_complete,
args=(future, 10))
thread.start()

# Mark the future as complete immediately
future.set_result(True)

thread.join()
end = time.time()

time_spent = end - start

# Since we marked the future as complete immediately, the amount of
# time we spent should be *substantially* less than the 10 second
# timeout we set on the spin.
assert time_spent < 10

executor.shutdown()

def test_multi_threaded_spin_once_until_future(self):
self.assertIsNotNone(self.node.handle)
executor = MultiThreadedExecutor(context=self.context)

future = Future(executor=executor)

# Setup a thread to spin_once_until_future_complete, which will spin
# for a maximum of 10 seconds.
start = time.time()
thread = threading.Thread(target=executor.spin_once_until_future_complete,
args=(future, 10))
thread.start()

# Mark the future as complete immediately
future.set_result(True)

thread.join()
end = time.time()

time_spent = end - start

# Since we marked the future as complete immediately, the amount of
# time we spent should be *substantially* less than the 10 second
# timeout we set on the spin.
assert time_spent < 10

executor.shutdown()


if __name__ == '__main__':
unittest.main()

0 comments on commit 5367703

Please sign in to comment.