Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions rclpy/rclpy/action/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ def __init__(

self._is_ready = False

# key: wait_set pointer (id), value: tuple of indices from add_to_wait_set
# This ensures we use the correct indices for each wait_set, fixing the race
# condition when multiple threads use different wait_sets with the same action client.
self._wait_set_indices = {}
# Set of weak references to wait_sets, used for cleanup when wait_sets are garbage collected
self._wait_set_weak_refs = set()

# key: UUID in bytes, value: weak reference to ClientGoalHandle
self._goal_handles = {}
# key: goal request sequence_number, value: Future for goal response
Expand Down Expand Up @@ -231,7 +238,17 @@ def _remove_pending_result_request(self, future):
# Start Waitable API
def is_ready(self, wait_set):
"""Return True if one or more entities are ready in the wait set."""
ready_entities = self._client_handle.is_ready(wait_set)
# Use indices stored for this specific wait_set to avoid race condition
# where indices from a different wait_set could be used.
wait_set_id = id(wait_set)
indices = self._wait_set_indices.get(wait_set_id)
if indices is not None:
ready_entities = self._client_handle.is_ready_with_indices(
wait_set, *indices
)
else:
# Fallback to original behavior if indices not found
ready_entities = self._client_handle.is_ready(wait_set)
self._is_feedback_ready = ready_entities[0]
self._is_status_ready = ready_entities[1]
self._is_goal_response_ready = ready_entities[2]
Expand Down Expand Up @@ -367,8 +384,30 @@ def get_num_entities(self):

def add_to_wait_set(self, wait_set):
"""Add entities to wait set."""
# Store the indices returned by add_to_waitset for this specific wait_set.
# This ensures is_ready() uses the correct indices for this wait_set,
# fixing the race condition when multiple threads use different wait_sets.
with self._lock:
self._client_handle.add_to_waitset(wait_set)
indices = self._client_handle.add_to_waitset(wait_set)
wait_set_id = id(wait_set)
self._wait_set_indices[wait_set_id] = indices

# Set up cleanup callback for when the wait_set is garbage collected.
# This prevents _wait_set_indices from growing unboundedly.
try:
# Create a weak reference with a callback that removes the entry
# when the wait_set is garbage collected
def cleanup_callback(ref):
self._wait_set_indices.pop(wait_set_id, None)
self._wait_set_weak_refs.discard(ref)

weak_ref = weakref.ref(wait_set, cleanup_callback)
self._wait_set_weak_refs.add(weak_ref)
except TypeError:
# wait_set doesn't support weak references (e.g., some C extension types)
# In this case, we fall back to periodic cleanup or accept potential memory leak
# This is a minor concern since wait_sets are typically short-lived
pass

def __enter__(self):
return self._client_handle.__enter__()
Expand Down
73 changes: 59 additions & 14 deletions rclpy/src/rclpy/action_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,22 @@ ActionClient::is_action_server_available()
return is_available;
}

void
py::tuple
ActionClient::add_to_waitset(WaitSet & wait_set)
{
rcl_ret_t ret = rcl_action_wait_set_add_action_client(
wait_set.rcl_ptr(), rcl_action_client_.get(), NULL, NULL);
std::lock_guard<std::mutex> lock(wait_set.lock_);
size_t goal_index, cancel_index, result_index;
size_t feedback_index, status_index;
rcl_ret_t ret = rcl_action_wait_set_add_action_client_with_indices(
wait_set.rcl_ptr(), rcl_action_client_.get(), &goal_index, &cancel_index,
&result_index, &feedback_index, &status_index);
if (RCL_RET_OK != ret) {
std::string error_text{"Failed to add 'rcl_action_client_t' to wait set"};
throw rclpy::RCLError(error_text);
}
// Return the indices for this specific wait_set
return py::make_tuple(feedback_index, status_index, goal_index, cancel_index,
result_index);
}

py::tuple
Expand All @@ -248,18 +255,52 @@ ActionClient::is_ready(WaitSet & wait_set)
bool is_goal_response_ready = false;
bool is_cancel_response_ready = false;
bool is_result_response_ready = false;
rcl_ret_t ret = rcl_action_client_wait_set_get_entities_ready(
wait_set.rcl_ptr(),
rcl_action_client_.get(),
&is_feedback_ready,
&is_status_ready,
&is_goal_response_ready,
&is_cancel_response_ready,
&is_result_response_ready);
if (RCL_RET_OK != ret) {
throw rclpy::RCLError("Failed to get number of ready entities for action client");
{
std::lock_guard<std::mutex> lock(wait_set.lock_);
rcl_ret_t ret = rcl_action_client_wait_set_get_entities_ready(
wait_set.rcl_ptr(), rcl_action_client_.get(), &is_feedback_ready,
&is_status_ready, &is_goal_response_ready, &is_cancel_response_ready,
&is_result_response_ready);
if (RCL_RET_OK != ret) {
throw rclpy::RCLError(
"Failed to get number of ready entities for action client");
}
}
py::tuple result_tuple(5);
result_tuple[0] = py::bool_(is_feedback_ready);
result_tuple[1] = py::bool_(is_status_ready);
result_tuple[2] = py::bool_(is_goal_response_ready);
result_tuple[3] = py::bool_(is_cancel_response_ready);
result_tuple[4] = py::bool_(is_result_response_ready);
return result_tuple;
}

py::tuple
ActionClient::is_ready_with_indices(
WaitSet & wait_set, size_t feedback_index,
size_t status_index, size_t goal_index,
size_t cancel_index, size_t result_index)
{
// This method uses indices passed in rather than reading from
// action_client->impl, which fixes the race condition where indices from a
// different wait_set could be used.
bool is_feedback_ready = false;
bool is_status_ready = false;
bool is_goal_response_ready = false;
bool is_cancel_response_ready = false;
bool is_result_response_ready = false;
{
std::lock_guard<std::mutex> lock(wait_set.lock_);
rcl_ret_t ret = rcl_action_client_wait_set_get_entities_ready_with_indices(
wait_set.rcl_ptr(), rcl_action_client_.get(), feedback_index,
status_index, goal_index, cancel_index, result_index,
&is_feedback_ready, &is_status_ready, &is_goal_response_ready,
&is_cancel_response_ready, &is_result_response_ready);
if (RCL_RET_OK != ret) {
throw rclpy::RCLError(
"Failed to get number of ready entities for action client");
}
}
py::tuple result_tuple(5);
result_tuple[0] = py::bool_(is_feedback_ready);
result_tuple[1] = py::bool_(is_status_ready);
Expand Down Expand Up @@ -317,6 +358,10 @@ define_action_client(py::object module)
"Check if an action entity has any ready wait set entities.")
.def(
"take_status", &ActionClient::take_status,
"Take an action status response.");
"Take an action status response.")
.def(
"is_ready_with_indices", &ActionClient::is_ready_with_indices,
"Check if an action entity has any ready wait set entities using "
"explicit indices.");
}
} // namespace rclpy
34 changes: 33 additions & 1 deletion rclpy/src/rclpy/action_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,44 @@ class ActionClient : public Destroyable, public std::enable_shared_from_this<Act
py::tuple
is_ready(WaitSet & wait_set);

/// Check if an action entity has any ready wait set entities using explicit
/// indices.
/**
* This method accepts explicit wait set indices rather than reading them from
* the action client's internal state. This is thread-safe when multiple
* threads use different wait sets with the same action client.
*
* \param[in] wait_set Capsule pointing to the wait set structure.
* \param[in] feedback_index Index of feedback subscription in wait set.
* \param[in] status_index Index of status subscription in wait set.
* \param[in] goal_index Index of goal client in wait set.
* \param[in] cancel_index Index of cancel client in wait set.
* \param[in] result_index Index of result client in wait set.
* \return A tuple of booleans representing the sub-entities ready:
* (is_feedback_ready,
* is_status_ready,
* is_goal_response_ready,
* is_cancel_response_ready,
* is_result_response_ready)
*/
py::tuple
is_ready_with_indices(
WaitSet & wait_set, size_t feedback_index,
size_t status_index, size_t goal_index,
size_t cancel_index, size_t result_index);

/// Add an action entitiy to a wait set.
/**
* Raises RuntimeError on failure.
* \param[in] wait_set Capsule pointer to an rcl_wait_set_t.
* \return A tuple of indices:
* (feedback_subscription_index,
* status_subscription_index,
* goal_client_index,
* cancel_client_index,
* result_client_index)
*/
void
py::tuple
add_to_waitset(WaitSet & wait_set);

/// Get rcl_action_client_t pointer
Expand Down
10 changes: 10 additions & 0 deletions rclpy/src/rclpy/wait_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ WaitSet::WaitSet(
void
WaitSet::destroy()
{
std::lock_guard<std::mutex> lock(lock_);
rcl_wait_set_.reset();
context_.destroy();
}

void
WaitSet::clear_entities()
{
std::lock_guard<std::mutex> lock(lock_);
rcl_ret_t ret = rcl_wait_set_clear(rcl_wait_set_.get());
if (ret != RCL_RET_OK) {
throw RCLError("failed to clear wait set");
Expand All @@ -91,6 +93,7 @@ size_t
WaitSet::add_service(const Service & service)
{
size_t index;
std::lock_guard<std::mutex> lock(lock_);
rcl_ret_t ret = rcl_wait_set_add_service(rcl_wait_set_.get(), service.rcl_ptr(), &index);
if (RCL_RET_OK != ret) {
throw RCLError("failed to add service to wait set");
Expand All @@ -102,6 +105,7 @@ size_t
WaitSet::add_subscription(const Subscription & subscription)
{
size_t index;
std::lock_guard<std::mutex> lock(lock_);
rcl_ret_t ret = rcl_wait_set_add_subscription(
rcl_wait_set_.get(), subscription.rcl_ptr(), &index);
if (RCL_RET_OK != ret) {
Expand All @@ -114,6 +118,7 @@ size_t
WaitSet::add_timer(const Timer & timer)
{
size_t index;
std::lock_guard<std::mutex> lock(lock_);
rcl_ret_t ret = rcl_wait_set_add_timer(rcl_wait_set_.get(), timer.rcl_ptr(), &index);
if (RCL_RET_OK != ret) {
throw RCLError("failed to add client to wait set");
Expand All @@ -125,6 +130,7 @@ size_t
WaitSet::add_guard_condition(const GuardCondition & gc)
{
size_t index;
std::lock_guard<std::mutex> lock(lock_);
rcl_ret_t ret = rcl_wait_set_add_guard_condition(rcl_wait_set_.get(), gc.rcl_ptr(), &index);
if (RCL_RET_OK != ret) {
throw RCLError("failed to add guard condition to wait set");
Expand All @@ -136,6 +142,7 @@ size_t
WaitSet::add_client(const Client & client)
{
size_t index;
std::lock_guard<std::mutex> lock(lock_);
rcl_ret_t ret = rcl_wait_set_add_client(rcl_wait_set_.get(), client.rcl_ptr(), &index);
if (RCL_RET_OK != ret) {
throw RCLError("failed to add client to wait set");
Expand All @@ -147,6 +154,7 @@ size_t
WaitSet::add_event(const EventHandle & event)
{
size_t index;
std::lock_guard<std::mutex> lock(lock_);
rcl_ret_t ret = rcl_wait_set_add_event(rcl_wait_set_.get(), event.rcl_ptr(), &index);
if (RCL_RET_OK != ret) {
throw RCLError("failed to add event to wait set");
Expand All @@ -159,6 +167,7 @@ WaitSet::is_ready(const std::string & entity_type, size_t index)
{
const void ** entities = NULL;
size_t num_entities = 0;
std::lock_guard<std::mutex> lock(lock_);
if ("subscription" == entity_type) {
entities = reinterpret_cast<const void **>(rcl_wait_set_->subscriptions);
num_entities = rcl_wait_set_->size_of_subscriptions;
Expand Down Expand Up @@ -213,6 +222,7 @@ _get_ready_entities(const EntityArray ** entities, const size_t num_entities)
py::list
WaitSet::get_ready_entities(const std::string & entity_type)
{
std::lock_guard<std::mutex> lock(lock_);
if ("subscription" == entity_type) {
return _get_ready_entities(
rcl_wait_set_->subscriptions, rcl_wait_set_->size_of_subscriptions);
Expand Down
2 changes: 2 additions & 0 deletions rclpy/src/rclpy/wait_set.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <memory>
#include <string>
#include <mutex>

#include "client.hpp"
#include "context.hpp"
Expand All @@ -38,6 +39,7 @@ namespace rclpy
class WaitSet : public Destroyable, public std::enable_shared_from_this<WaitSet>
{
public:
std::mutex lock_;
/// Initialize a wait set
WaitSet();

Expand Down
Loading