Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions rclpy/rclpy/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,30 @@
QoSSubscriptionEventType: TypeAlias = _rclpy.rcl_subscription_event_type_t


def publisher_event_type_is_supported(
Comment thread
fujitatomoya marked this conversation as resolved.
event_type: QoSPublisherEventType,
) -> bool:
"""
Check if a publisher event type is supported by the active RMW implementation.

:param event_type: The publisher event type to check.
:return: True if the event type is supported, False otherwise.
"""
return _rclpy.publisher_event_type_is_supported(event_type)


def subscription_event_type_is_supported(
event_type: QoSSubscriptionEventType,
) -> bool:
"""
Check if a subscription event type is supported by the active RMW implementation.

:param event_type: The subscription event type to check.
:return: True if the event type is supported, False otherwise.
"""
return _rclpy.subscription_event_type_is_supported(event_type)


# Payload type for Subscription Deadline callback.
QoSRequestedDeadlineMissedInfo: TypeAlias = _rclpy.rmw_requested_deadline_missed_status_t

Expand Down
21 changes: 21 additions & 0 deletions rclpy/src/rclpy/event_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ EventHandle::take_event()
throw std::runtime_error("cannot take event that is neither a publisher or a subscription event");
}

bool
publisher_event_type_is_supported(rcl_publisher_event_type_t event_type)
{
return rcl_publisher_event_type_is_supported(event_type);
}

bool
subscription_event_type_is_supported(rcl_subscription_event_type_t event_type)
{
return rcl_subscription_event_type_is_supported(event_type);
}

void
define_event_handle(py::module module)
{
Expand Down Expand Up @@ -253,5 +265,14 @@ define_event_handle(py::module module)
py::class_<rmw_incompatible_type_status_t>(module, "rmw_incompatible_type_status_t")
.def(py::init<>())
.def_readonly("total_count_change", &rmw_incompatible_type_status_t::total_count_change);

module.def(
"publisher_event_type_is_supported",
&rclpy::publisher_event_type_is_supported,
"Check if a publisher event type is supported by the active RMW implementation.");
module.def(
"subscription_event_type_is_supported",
&rclpy::subscription_event_type_is_supported,
"Check if a subscription event type is supported by the active RMW implementation.");
}
} // namespace rclpy
27 changes: 27 additions & 0 deletions rclpy/test/test_qos_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,3 +494,30 @@ def test_call_subscription_rclpy_event_matched_unmatched(self) -> None:
self.assertEqual(matched_status.total_count_change, 0)
self.assertEqual(matched_status.current_count, 0)
self.assertEqual(matched_status.current_count_change, -1)

def test_publisher_event_type_is_supported(self) -> None:
"""Test publisher_event_type_is_supported returns bool for all event types."""
from rclpy.event_handler import publisher_event_type_is_supported

for event_type in QoSPublisherEventType:
result = publisher_event_type_is_supported(event_type)
self.assertIsInstance(result, bool)

def test_subscription_event_type_is_supported(self) -> None:
"""Test subscription_event_type_is_supported returns bool for all event types."""
from rclpy.event_handler import subscription_event_type_is_supported

for event_type in QoSSubscriptionEventType:
result = subscription_event_type_is_supported(event_type)
self.assertIsInstance(result, bool)

def test_event_type_is_supported_matched(self) -> None:
"""Test that MATCHED event types are reported as supported."""
from rclpy.event_handler import publisher_event_type_is_supported
from rclpy.event_handler import subscription_event_type_is_supported

# MATCHED events should be supported across all RMW implementations.
self.assertTrue(publisher_event_type_is_supported(
QoSPublisherEventType.RCL_PUBLISHER_MATCHED))
self.assertTrue(subscription_event_type_is_supported(
QoSSubscriptionEventType.RCL_SUBSCRIPTION_MATCHED))