From 516254efb354fc7672f1d799ae0de9ba2249f196 Mon Sep 17 00:00:00 2001 From: Nadav Elkabets Date: Tue, 7 Apr 2026 15:04:35 +0000 Subject: [PATCH 1/4] Schedule the original task when task awaits a future Signed-off-by: Nadav Elkabets --- rclpy/rclpy/executors.py | 4 ++++ rclpy/rclpy/task.py | 27 +++++++++++++++++++++------ rclpy/test/test_executor.py | 29 +++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 6 deletions(-) diff --git a/rclpy/rclpy/executors.py b/rclpy/rclpy/executors.py index f35b3ba3e..2e87302fa 100644 --- a/rclpy/rclpy/executors.py +++ b/rclpy/rclpy/executors.py @@ -743,6 +743,10 @@ def _wait_for_ready_callbacks( ready_tasks_count = len(self._ready_tasks) for _ in range(ready_tasks_count): task = self._ready_tasks.popleft() + # Skip tasks that were cancelled or set done while awaiting a + # future and got rescheduled when the future completed + if task.cancelled() or task.done(): + continue task_data = self._pending_tasks[task] node = task_data.source_node if node is None or node in nodes_to_use: diff --git a/rclpy/rclpy/task.py b/rclpy/rclpy/task.py index 144c865ba..221f11452 100644 --- a/rclpy/rclpy/task.py +++ b/rclpy/rclpy/task.py @@ -50,8 +50,8 @@ def __init__(self, *, executor: Optional['Executor'] = None) -> None: # An exception raised by the handler when called self._exception: Optional[Exception] = None self._exception_fetched = False - # callbacks to be scheduled after this task completes - self._callbacks: List[Callable[['Future[T]'], None]] = [] + # callbacks or tasks to be scheduled after this task completes + self._callbacks: List[Union[Callable[['Future[T]'], None], 'Task[Any]']] = [] # Lock for threadsafety self._lock = threading.Lock() # An executor to use when scheduling done callbacks @@ -164,10 +164,15 @@ def _schedule_or_invoke_done_callbacks(self) -> None: if executor is not None: # Have the executor take care of the callbacks for callback in callbacks: - executor.create_task(callback, self) + if isinstance(callback, Task): + executor._call_task_in_next_spin(callback) + else: + executor.create_task(callback, self) else: # No executor, call right away for callback in callbacks: + if isinstance(callback, Task): + continue try: callback(self) except Exception as e: @@ -209,6 +214,18 @@ def add_done_callback(self, callback: Callable[['Future[T]'], None]) -> None: if invoke: callback(self) + def _add_waiting_task(self, task: 'Task[Any]') -> None: + """Schedule a task to resume when this future completes.""" + with self._lock: + if not self._pending(): + assert self._executor is not None + executor = self._executor() + if executor is not None: + executor._call_task_in_next_spin(task) + + else: + self._callbacks.append(task) + def remove_done_callback(self, callback: Callable[['Future[T]'], None]) -> bool: """ Remove a previously-added done callback. @@ -353,9 +370,7 @@ def _add_resume_callback(self, future: Future[T], executor: 'Executor') -> None: elif future_executor is not executor: raise RuntimeError('A task can only await futures associated with the same executor') - # The future is associated with the same executor, so we can resume the task directly - # in the done callback - future.add_done_callback(lambda _: self.__call__()) + future._add_waiting_task(self) def _complete_task(self) -> None: """Cleanup after task finished.""" diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index a0bbaf9f6..435d7de6f 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -485,6 +485,35 @@ async def coro2() -> str: self.assertTrue(future1.done()) self.assertEqual('Sentinel Result 1', future1.result()) + def test_coroutine_exception_after_await(self) -> None: + """Exception in a coroutine after awaiting a future must propagate.""" + self.assertIsNotNone(self.node.handle) + # EventsExecutor excluded - segfaults on exception propagation (#1641) + for cls in [SingleThreadedExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + executor.add_node(self.node) + + first_fut = executor.create_future() + second_fut = executor.create_future() + + async def coro_that_raises() -> None: + first_fut.set_result(None) + await second_fut + raise RuntimeError('Expected error after await') + + task = executor.create_task(coro_that_raises) + + executor.spin_until_future_complete(first_fut, timeout_sec=5) + self.assertFalse(task.done()) + # Resolve the inner future — triggers resume + second_fut.set_result(None) + + # The resumed coroutine should raise, and spin must propagate it + with self.assertRaises(RuntimeError) as cm: + executor.spin_until_future_complete(task, timeout_sec=5) + self.assertIn('Expected error after await', str(cm.exception)) + def test_create_task_during_spin(self) -> None: self.assertIsNotNone(self.node.handle) for cls in [SingleThreadedExecutor, EventsExecutor]: From 62af0c8fc49511b3563de74b37b6d7f26075c52b Mon Sep 17 00:00:00 2001 From: Nadav Elkabets Date: Tue, 7 Apr 2026 15:34:15 +0000 Subject: [PATCH 2/4] Add MultiThreadedExecutor to test Signed-off-by: Nadav Elkabets --- rclpy/test/test_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index 435d7de6f..e0ce16efb 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -489,7 +489,7 @@ def test_coroutine_exception_after_await(self) -> None: """Exception in a coroutine after awaiting a future must propagate.""" self.assertIsNotNone(self.node.handle) # EventsExecutor excluded - segfaults on exception propagation (#1641) - for cls in [SingleThreadedExecutor]: + for cls in [SingleThreadedExecutor, MultiThreadedExecutor]: with self.subTest(cls=cls): executor = cls(context=self.context) executor.add_node(self.node) From d9087265ba26a10a78b0f72055e8e5a4012df7ac Mon Sep 17 00:00:00 2001 From: Nadav Elkabets Date: Tue, 7 Apr 2026 16:49:45 +0000 Subject: [PATCH 3/4] Add tests for awaiting a done future and task cancellation during await Signed-off-by: Nadav Elkabets --- rclpy/rclpy/task.py | 9 +++++- rclpy/test/test_executor.py | 55 ++++++++++++++++++++++++++++++++++++- 2 files changed, 62 insertions(+), 2 deletions(-) diff --git a/rclpy/rclpy/task.py b/rclpy/rclpy/task.py index 221f11452..ff49d2afa 100644 --- a/rclpy/rclpy/task.py +++ b/rclpy/rclpy/task.py @@ -172,6 +172,9 @@ def _schedule_or_invoke_done_callbacks(self) -> None: # No executor, call right away for callback in callbacks: if isinstance(callback, Task): + warnings.warn( + 'Dropping task awaiting future: ' + 'executor reference could not be resolved') continue try: callback(self) @@ -222,7 +225,10 @@ def _add_waiting_task(self, task: 'Task[Any]') -> None: executor = self._executor() if executor is not None: executor._call_task_in_next_spin(task) - + else: + warnings.warn( + 'Dropping task awaiting future: ' + 'executor reference could not be resolved') else: self._callbacks.append(task) @@ -370,6 +376,7 @@ def _add_resume_callback(self, future: Future[T], executor: 'Executor') -> None: elif future_executor is not executor: raise RuntimeError('A task can only await futures associated with the same executor') + # Register the task to resume when the future is done or cancelled future._add_waiting_task(self) def _complete_task(self) -> None: diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index e0ce16efb..1cc09d2db 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -509,11 +509,64 @@ async def coro_that_raises() -> None: # Resolve the inner future — triggers resume second_fut.set_result(None) - # The resumed coroutine should raise, and spin must propagate it with self.assertRaises(RuntimeError) as cm: executor.spin_until_future_complete(task, timeout_sec=5) self.assertIn('Expected error after await', str(cm.exception)) + def test_cancel_task_while_awaiting_future(self) -> None: + """Cancelling a task parked on a future must not crash the dispatch loop.""" + self.assertIsNotNone(self.node.handle) + # EventsExecutor excluded - see #1641 + for cls in [SingleThreadedExecutor, MultiThreadedExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + executor.add_node(self.node) + + first_fut = executor.create_future() + second_fut = executor.create_future() + third_fut = executor.create_future() + resumed = False + + async def coro() -> None: + nonlocal resumed + first_fut.set_result(None) + await second_fut + third_fut.set_result(None) + + task = executor.create_task(coro) + + executor.spin_until_future_complete(first_fut, timeout_sec=5) + self.assertFalse(task.done()) + + task.cancel() + self.assertTrue(task.cancelled()) + + second_fut.set_result(None) + + executor.spin_until_future_complete(first_fut, timeout_sec=5) + self.assertFalse(third_fut.done()) + + def test_await_already_completed_future(self) -> None: + """Awaiting an already-completed future must resume and return its result.""" + self.assertIsNotNone(self.node.handle) + # EventsExecutor excluded - see #1641 + for cls in [SingleThreadedExecutor, MultiThreadedExecutor]: + with self.subTest(cls=cls): + executor = cls(context=self.context) + executor.add_node(self.node) + + fut: Future[str] = executor.create_future() + fut.set_result('done') # complete before the task runs + + async def coro() -> str: + return await fut # type: ignore[return-value] + + task = executor.create_task(coro) + + executor.spin_until_future_complete(task, timeout_sec=5) + self.assertTrue(task.done()) + self.assertEqual('done', task.result()) + def test_create_task_during_spin(self) -> None: self.assertIsNotNone(self.node.handle) for cls in [SingleThreadedExecutor, EventsExecutor]: From 0b18e3873d1acc5e6141120cf023b945c868b8d4 Mon Sep 17 00:00:00 2001 From: Nadav Elkabets Date: Tue, 7 Apr 2026 20:48:28 +0000 Subject: [PATCH 4/4] Removed unused variable Signed-off-by: Nadav Elkabets --- rclpy/test/test_executor.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/rclpy/test/test_executor.py b/rclpy/test/test_executor.py index 1cc09d2db..f9af330aa 100644 --- a/rclpy/test/test_executor.py +++ b/rclpy/test/test_executor.py @@ -525,10 +525,8 @@ def test_cancel_task_while_awaiting_future(self) -> None: first_fut = executor.create_future() second_fut = executor.create_future() third_fut = executor.create_future() - resumed = False async def coro() -> None: - nonlocal resumed first_fut.set_result(None) await second_fut third_fut.set_result(None)