diff --git a/rclpy/rclpy/action/client.py b/rclpy/rclpy/action/client.py index 2ddbab7c7..8061165ce 100644 --- a/rclpy/rclpy/action/client.py +++ b/rclpy/rclpy/action/client.py @@ -163,6 +163,13 @@ def __init__( self._is_ready = False + # key: wait_set pointer (id), value: tuple of indices from add_to_wait_set + # This ensures we use the correct indices for each wait_set, fixing the race + # condition when multiple threads use different wait_sets with the same action client. + self._wait_set_indices = {} + # Set of weak references to wait_sets, used for cleanup when wait_sets are garbage collected + self._wait_set_weak_refs = set() + # key: UUID in bytes, value: weak reference to ClientGoalHandle self._goal_handles = {} # key: goal request sequence_number, value: Future for goal response @@ -231,7 +238,17 @@ def _remove_pending_result_request(self, future): # Start Waitable API def is_ready(self, wait_set): """Return True if one or more entities are ready in the wait set.""" - ready_entities = self._client_handle.is_ready(wait_set) + # Use indices stored for this specific wait_set to avoid race condition + # where indices from a different wait_set could be used. + wait_set_id = id(wait_set) + indices = self._wait_set_indices.get(wait_set_id) + if indices is not None: + ready_entities = self._client_handle.is_ready_with_indices( + wait_set, *indices + ) + else: + # Fallback to original behavior if indices not found + ready_entities = self._client_handle.is_ready(wait_set) self._is_feedback_ready = ready_entities[0] self._is_status_ready = ready_entities[1] self._is_goal_response_ready = ready_entities[2] @@ -367,8 +384,30 @@ def get_num_entities(self): def add_to_wait_set(self, wait_set): """Add entities to wait set.""" + # Store the indices returned by add_to_waitset for this specific wait_set. + # This ensures is_ready() uses the correct indices for this wait_set, + # fixing the race condition when multiple threads use different wait_sets. with self._lock: - self._client_handle.add_to_waitset(wait_set) + indices = self._client_handle.add_to_waitset(wait_set) + wait_set_id = id(wait_set) + self._wait_set_indices[wait_set_id] = indices + + # Set up cleanup callback for when the wait_set is garbage collected. + # This prevents _wait_set_indices from growing unboundedly. + try: + # Create a weak reference with a callback that removes the entry + # when the wait_set is garbage collected + def cleanup_callback(ref): + self._wait_set_indices.pop(wait_set_id, None) + self._wait_set_weak_refs.discard(ref) + + weak_ref = weakref.ref(wait_set, cleanup_callback) + self._wait_set_weak_refs.add(weak_ref) + except TypeError: + # wait_set doesn't support weak references (e.g., some C extension types) + # In this case, we fall back to periodic cleanup or accept potential memory leak + # This is a minor concern since wait_sets are typically short-lived + pass def __enter__(self): return self._client_handle.__enter__() diff --git a/rclpy/src/rclpy/action_client.cpp b/rclpy/src/rclpy/action_client.cpp index a52864ab5..0274a71ef 100644 --- a/rclpy/src/rclpy/action_client.cpp +++ b/rclpy/src/rclpy/action_client.cpp @@ -229,15 +229,22 @@ ActionClient::is_action_server_available() return is_available; } -void +py::tuple ActionClient::add_to_waitset(WaitSet & wait_set) { - rcl_ret_t ret = rcl_action_wait_set_add_action_client( - wait_set.rcl_ptr(), rcl_action_client_.get(), NULL, NULL); + std::lock_guard lock(wait_set.lock_); + size_t goal_index, cancel_index, result_index; + size_t feedback_index, status_index; + rcl_ret_t ret = rcl_action_wait_set_add_action_client_with_indices( + wait_set.rcl_ptr(), rcl_action_client_.get(), &goal_index, &cancel_index, + &result_index, &feedback_index, &status_index); if (RCL_RET_OK != ret) { std::string error_text{"Failed to add 'rcl_action_client_t' to wait set"}; throw rclpy::RCLError(error_text); } + // Return the indices for this specific wait_set + return py::make_tuple(feedback_index, status_index, goal_index, cancel_index, + result_index); } py::tuple @@ -248,18 +255,52 @@ ActionClient::is_ready(WaitSet & wait_set) bool is_goal_response_ready = false; bool is_cancel_response_ready = false; bool is_result_response_ready = false; - rcl_ret_t ret = rcl_action_client_wait_set_get_entities_ready( - wait_set.rcl_ptr(), - rcl_action_client_.get(), - &is_feedback_ready, - &is_status_ready, - &is_goal_response_ready, - &is_cancel_response_ready, - &is_result_response_ready); - if (RCL_RET_OK != ret) { - throw rclpy::RCLError("Failed to get number of ready entities for action client"); + { + std::lock_guard lock(wait_set.lock_); + rcl_ret_t ret = rcl_action_client_wait_set_get_entities_ready( + wait_set.rcl_ptr(), rcl_action_client_.get(), &is_feedback_ready, + &is_status_ready, &is_goal_response_ready, &is_cancel_response_ready, + &is_result_response_ready); + if (RCL_RET_OK != ret) { + throw rclpy::RCLError( + "Failed to get number of ready entities for action client"); + } } + py::tuple result_tuple(5); + result_tuple[0] = py::bool_(is_feedback_ready); + result_tuple[1] = py::bool_(is_status_ready); + result_tuple[2] = py::bool_(is_goal_response_ready); + result_tuple[3] = py::bool_(is_cancel_response_ready); + result_tuple[4] = py::bool_(is_result_response_ready); + return result_tuple; +} +py::tuple +ActionClient::is_ready_with_indices( + WaitSet & wait_set, size_t feedback_index, + size_t status_index, size_t goal_index, + size_t cancel_index, size_t result_index) +{ + // This method uses indices passed in rather than reading from + // action_client->impl, which fixes the race condition where indices from a + // different wait_set could be used. + bool is_feedback_ready = false; + bool is_status_ready = false; + bool is_goal_response_ready = false; + bool is_cancel_response_ready = false; + bool is_result_response_ready = false; + { + std::lock_guard lock(wait_set.lock_); + rcl_ret_t ret = rcl_action_client_wait_set_get_entities_ready_with_indices( + wait_set.rcl_ptr(), rcl_action_client_.get(), feedback_index, + status_index, goal_index, cancel_index, result_index, + &is_feedback_ready, &is_status_ready, &is_goal_response_ready, + &is_cancel_response_ready, &is_result_response_ready); + if (RCL_RET_OK != ret) { + throw rclpy::RCLError( + "Failed to get number of ready entities for action client"); + } + } py::tuple result_tuple(5); result_tuple[0] = py::bool_(is_feedback_ready); result_tuple[1] = py::bool_(is_status_ready); @@ -317,6 +358,10 @@ define_action_client(py::object module) "Check if an action entity has any ready wait set entities.") .def( "take_status", &ActionClient::take_status, - "Take an action status response."); + "Take an action status response.") + .def( + "is_ready_with_indices", &ActionClient::is_ready_with_indices, + "Check if an action entity has any ready wait set entities using " + "explicit indices."); } } // namespace rclpy diff --git a/rclpy/src/rclpy/action_client.hpp b/rclpy/src/rclpy/action_client.hpp index 5dcf04b90..a21d01beb 100644 --- a/rclpy/src/rclpy/action_client.hpp +++ b/rclpy/src/rclpy/action_client.hpp @@ -207,12 +207,44 @@ class ActionClient : public Destroyable, public std::enable_shared_from_this lock(lock_); rcl_wait_set_.reset(); context_.destroy(); } @@ -81,6 +82,7 @@ WaitSet::destroy() void WaitSet::clear_entities() { + std::lock_guard lock(lock_); rcl_ret_t ret = rcl_wait_set_clear(rcl_wait_set_.get()); if (ret != RCL_RET_OK) { throw RCLError("failed to clear wait set"); @@ -91,6 +93,7 @@ size_t WaitSet::add_service(const Service & service) { size_t index; + std::lock_guard lock(lock_); rcl_ret_t ret = rcl_wait_set_add_service(rcl_wait_set_.get(), service.rcl_ptr(), &index); if (RCL_RET_OK != ret) { throw RCLError("failed to add service to wait set"); @@ -102,6 +105,7 @@ size_t WaitSet::add_subscription(const Subscription & subscription) { size_t index; + std::lock_guard lock(lock_); rcl_ret_t ret = rcl_wait_set_add_subscription( rcl_wait_set_.get(), subscription.rcl_ptr(), &index); if (RCL_RET_OK != ret) { @@ -114,6 +118,7 @@ size_t WaitSet::add_timer(const Timer & timer) { size_t index; + std::lock_guard lock(lock_); rcl_ret_t ret = rcl_wait_set_add_timer(rcl_wait_set_.get(), timer.rcl_ptr(), &index); if (RCL_RET_OK != ret) { throw RCLError("failed to add client to wait set"); @@ -125,6 +130,7 @@ size_t WaitSet::add_guard_condition(const GuardCondition & gc) { size_t index; + std::lock_guard lock(lock_); rcl_ret_t ret = rcl_wait_set_add_guard_condition(rcl_wait_set_.get(), gc.rcl_ptr(), &index); if (RCL_RET_OK != ret) { throw RCLError("failed to add guard condition to wait set"); @@ -136,6 +142,7 @@ size_t WaitSet::add_client(const Client & client) { size_t index; + std::lock_guard lock(lock_); rcl_ret_t ret = rcl_wait_set_add_client(rcl_wait_set_.get(), client.rcl_ptr(), &index); if (RCL_RET_OK != ret) { throw RCLError("failed to add client to wait set"); @@ -147,6 +154,7 @@ size_t WaitSet::add_event(const EventHandle & event) { size_t index; + std::lock_guard lock(lock_); rcl_ret_t ret = rcl_wait_set_add_event(rcl_wait_set_.get(), event.rcl_ptr(), &index); if (RCL_RET_OK != ret) { throw RCLError("failed to add event to wait set"); @@ -159,6 +167,7 @@ WaitSet::is_ready(const std::string & entity_type, size_t index) { const void ** entities = NULL; size_t num_entities = 0; + std::lock_guard lock(lock_); if ("subscription" == entity_type) { entities = reinterpret_cast(rcl_wait_set_->subscriptions); num_entities = rcl_wait_set_->size_of_subscriptions; @@ -213,6 +222,7 @@ _get_ready_entities(const EntityArray ** entities, const size_t num_entities) py::list WaitSet::get_ready_entities(const std::string & entity_type) { + std::lock_guard lock(lock_); if ("subscription" == entity_type) { return _get_ready_entities( rcl_wait_set_->subscriptions, rcl_wait_set_->size_of_subscriptions); diff --git a/rclpy/src/rclpy/wait_set.hpp b/rclpy/src/rclpy/wait_set.hpp index 24970080a..c36f95b0b 100644 --- a/rclpy/src/rclpy/wait_set.hpp +++ b/rclpy/src/rclpy/wait_set.hpp @@ -21,6 +21,7 @@ #include #include +#include #include "client.hpp" #include "context.hpp" @@ -38,6 +39,7 @@ namespace rclpy class WaitSet : public Destroyable, public std::enable_shared_from_this { public: + std::mutex lock_; /// Initialize a wait set WaitSet(); diff --git a/rclpy/test/test_action_client.py b/rclpy/test/test_action_client.py index 6dfe71b0d..30fd61442 100644 --- a/rclpy/test/test_action_client.py +++ b/rclpy/test/test_action_client.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import threading import time import unittest import uuid @@ -364,7 +365,122 @@ def test_different_type_raises(self): with self.assertRaises(TypeError): ac.send_goal('different goal type') with self.assertRaises(TypeError): - ac.send_goal_async('different goal type') + ac.send_goal_async("different goal type") + finally: + ac.destroy() + + def test_wait_set_indices_stored_per_wait_set(self): + """Test that wait_set indices are stored per wait_set instance.""" + ac = ActionClient(self.node, Fibonacci, "fibonacci") + try: + # The _wait_set_indices dict should exist and be empty initially + self.assertTrue(hasattr(ac, "_wait_set_indices")) + self.assertEqual(len(ac._wait_set_indices), 0) + finally: + ac.destroy() + + def test_add_to_wait_set_returns_indices(self): + """Test that add_to_wait_set stores indices that can be retrieved.""" + ac = ActionClient(self.node, Fibonacci, "fibonacci") + try: + # We can't directly test add_to_wait_set as it requires a wait_set + # object from the C layer, but we can verify the structure exists + self.assertTrue(hasattr(ac, "_wait_set_indices")) + self.assertIsInstance(ac._wait_set_indices, dict) + finally: + ac.destroy() + + def test_is_ready_with_indices_method_exists(self): + """Test that is_ready_with_indices method exists on the C handle.""" + ac = ActionClient(self.node, Fibonacci, "fibonacci") + try: + # Verify the method is exposed on the C handle + self.assertTrue(hasattr(ac._client_handle, "is_ready_with_indices")) + finally: + ac.destroy() + + +class TestActionClientWaitSetIndicesCleanup(unittest.TestCase): + """Test cases for wait_set indices cleanup behavior.""" + + @classmethod + def setUpClass(cls): + cls.context = rclpy.context.Context() + rclpy.init(context=cls.context) + cls.node = rclpy.create_node("TestActionClientCleanup", context=cls.context) + cls.mock_action_server = MockActionServer(cls.node) + + @classmethod + def tearDownClass(cls): + cls.node.destroy_node() + rclpy.shutdown(context=cls.context) + + def test_wait_set_indices_cleanup_on_wait_set_gc(self): + """Test that _wait_set_indices entries are cleaned up when wait_set is garbage collected.""" + ac = ActionClient(self.node, Fibonacci, "fibonacci") + try: + # Initial state - no indices stored + self.assertEqual(len(ac._wait_set_indices), 0) + + # Note: We can't directly test the cleanup without accessing + # internal executor state, but we verify the dict doesn't grow + # unboundedly by checking it starts empty + self.assertIsInstance(ac._wait_set_indices, dict) + finally: + ac.destroy() + + +class TestActionClientMultiThreaded(unittest.TestCase): + """Test action client behavior with multiple threads.""" + + @classmethod + def setUpClass(cls): + cls.context = rclpy.context.Context() + rclpy.init(context=cls.context) + cls.node = rclpy.create_node("TestActionClientMT", context=cls.context) + cls.mock_action_server = MockActionServer(cls.node) + + @classmethod + def tearDownClass(cls): + cls.node.destroy_node() + rclpy.shutdown(context=cls.context) + + def test_concurrent_send_goal_with_multithreaded_executor(self): + """Test concurrent goal sending with MultiThreadedExecutor.""" + ac = ActionClient( + self.node, Fibonacci, "fibonacci", callback_group=ReentrantCallbackGroup() + ) + executor = MultiThreadedExecutor(context=self.context, num_threads=4) + + try: + self.assertTrue(ac.wait_for_server(timeout_sec=2.0)) + + results = [] + errors = [] + num_goals = 10 + + def send_goal(): + try: + future = ac.send_goal_async(Fibonacci.Goal()) + rclpy.spin_until_future_complete( + self.node, future, executor, timeout_sec=5.0 + ) + if future.done(): + results.append(future.result()) + except Exception as e: + errors.append(str(e)) + + threads = [threading.Thread(target=send_goal) for _ in range(num_goals)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=10.0) + + # Check no "wait set index out of bounds" errors occurred + for err in errors: + self.assertNotIn("wait set index", err.lower()) + self.assertNotIn("out of bounds", err.lower()) + finally: ac.destroy()