diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 5498efbe..47a89f43 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -91,6 +91,13 @@ // action_clients: [], // }, + //// + //// bare_dds_publishers: Allowlist of bare DDS message publishers that should be exposed to Zenoh + //// even if they do not appear in `ros_discovery_info` as ROS 2 publishers. + //// Matching uses ROS-style topic names such as "/low_state", not raw DDS topic names + //// such as "rt/low_state". Existing publisher allow/deny rules still apply. + // bare_dds_publishers: ["/low_state", ".*/telemetry"], + //// //// pub_max_frequencies: Specify a list of maximum frequency of publications routing over zenoh for a set of Publishers. //// The strings must have the format "=": diff --git a/README.md b/README.md index 59428ffd..67bbd98c 100644 --- a/README.md +++ b/README.md @@ -210,6 +210,8 @@ The `"ros2dds"` part of this same configuration file can also be used in the con The command line arguments overwrite the equivalent keys configured in a configuration file. +For DDS applications that publish plain ROS-compatible DDS topics without contributing to the ROS graph, the bridge can optionally expose selected message topics to Zenoh via the `bare_dds_publishers` config. This setting is an allowlist of ROS-style topic names such as `/low_state`; it does not match raw DDS names like `rt/low_state`, and normal publisher allow/deny rules still apply. + ## Connectivity configurations ### DDS communications diff --git a/zenoh-plugin-ros2dds/src/config.rs b/zenoh-plugin-ros2dds/src/config.rs index 30bf5044..4ef150ed 100644 --- a/zenoh-plugin-ros2dds/src/config.rs +++ b/zenoh-plugin-ros2dds/src/config.rs @@ -52,6 +52,12 @@ pub struct Config { pub ros_static_peers: Option>, #[serde(default, flatten)] pub allowance: Option, + #[serde( + default, + deserialize_with = "deserialize_vec_regex", + serialize_with = "serialize_vec_regex" + )] + pub bare_dds_publishers: Vec, #[serde( default, deserialize_with = "deserialize_vec_regex_f32", @@ -83,6 +89,19 @@ pub struct Config { } impl Config { + pub fn is_publisher_allowed(&self, ros2_name: &str) -> bool { + self.allowance + .as_ref() + .map(|allowance| allowance.is_publisher_allowed(ros2_name)) + .unwrap_or(true) + } + + pub fn is_bare_dds_publisher_enabled(&self, ros2_name: &str) -> bool { + self.bare_dds_publishers + .iter() + .any(|regex| regex.is_match(ros2_name)) + } + pub fn get_pub_max_frequencies(&self, ros2_name: &str) -> Option { for (re, freq) in &self.pub_max_frequencies { if re.is_match(ros2_name) { @@ -661,6 +680,31 @@ where } } +fn deserialize_vec_regex<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let strs: Vec = Deserialize::deserialize(deserializer).unwrap(); + let mut result: Vec = Vec::with_capacity(strs.len()); + for s in strs { + let regex = Regex::new(&s) + .map_err(|e| de::Error::custom(format!("Invalid regex '{s}': {e}")))?; + result.push(regex); + } + Ok(result) +} + +fn serialize_vec_regex(v: &Vec, serializer: S) -> Result +where + S: Serializer, +{ + let mut seq = serializer.serialize_seq(Some(v.len()))?; + for r in v { + seq.serialize_element(r.as_str())?; + } + seq.end() +} + fn serialize_vec_regex_f32(v: &Vec<(Regex, f32)>, serializer: S) -> Result where S: Serializer, @@ -902,6 +946,18 @@ mod tests { assert_eq!(__required__, None); } + #[test] + fn test_bare_dds_publishers() { + let config = serde_json::from_str::( + r#"{"bare_dds_publishers": ["/low_state", ".*/telemetry"]}"#, + ) + .unwrap(); + + assert!(config.is_bare_dds_publisher_enabled("/low_state")); + assert!(config.is_bare_dds_publisher_enabled("/robot/telemetry")); + assert!(!config.is_bare_dds_publisher_enabled("/image_left_raw")); + } + #[test] fn test_required_field() { // See: https://github.com/eclipse-zenoh/zenoh-plugin-webserver/issues/19 diff --git a/zenoh-plugin-ros2dds/src/discovered_entities.rs b/zenoh-plugin-ros2dds/src/discovered_entities.rs index 2b713d5d..3654e95e 100644 --- a/zenoh-plugin-ros2dds/src/discovered_entities.rs +++ b/zenoh-plugin-ros2dds/src/discovered_entities.rs @@ -13,7 +13,7 @@ // use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, fmt::{self, Debug}, }; @@ -27,10 +27,12 @@ use zenoh::{ }; use crate::{ + config::Config, dds_discovery::{DdsEntity, DdsParticipant}, - events::ROS2DiscoveryEvent, + events::{BareDdsMsgPub, ROS2DiscoveryEvent}, gid::Gid, node_info::*, + ros2_utils::{dds_message_topic_to_ros2_name, dds_type_to_ros2_message_type}, ros_discovery::{NodeEntitiesInfo, ParticipantEntitiesInfo}, }; @@ -46,6 +48,7 @@ pub struct DiscoveredEntities { participants: HashMap, writers: HashMap, readers: HashMap, + bare_dds_writers: HashSet, ros_participant_info: HashMap, nodes_info: HashMap>, admin_space: HashMap, @@ -68,6 +71,7 @@ impl Debug for DiscoveredEntities { "readers: {:?}", self.readers.keys().collect::>() )?; + writeln!(f, "bare_dds_writers: {:?}", self.bare_dds_writers)?; writeln!(f, "ros_participant_info: {:?}", self.ros_participant_info)?; writeln!(f, "nodes_info: {:?}", self.nodes_info)?; writeln!( @@ -119,7 +123,7 @@ impl DiscoveredEntities { } #[inline] - pub fn add_writer(&mut self, writer: DdsEntity) -> Option { + pub fn add_writer(&mut self, writer: DdsEntity, config: &Config) -> Vec { // insert in admin space self.admin_space.insert( keformat!( @@ -133,7 +137,7 @@ impl DiscoveredEntities { ); // Check if this Writer is present in some NodeInfo.undiscovered_writer list - let mut event: Option = None; + let mut events = Vec::new(); for nodes_map in self.nodes_info.values_mut() { for node in nodes_map.values_mut() { if let Some(i) = node @@ -143,18 +147,27 @@ impl DiscoveredEntities { { // update the NodeInfo with this Writer's info node.undiscovered_writer.remove(i); - event = node.update_with_writer(&writer); + if let Some(event) = node.update_with_writer(&writer) { + events.push(event); + } break; } } - if event.is_some() { + if !events.is_empty() { break; } } + if events.is_empty() { + if let Some(bare_msg_pub) = self.make_bare_dds_msg_pub(&writer, config) { + self.bare_dds_writers.insert(writer.key); + events.push(ROS2DiscoveryEvent::DiscoveredBareDdsMsgPub(bare_msg_pub)); + } + } + // insert in Writers list self.writers.insert(writer.key, writer); - event + events } #[inline] @@ -163,7 +176,8 @@ impl DiscoveredEntities { } #[inline] - pub fn remove_writer(&mut self, gid: &Gid) -> Option { + pub fn remove_writer(&mut self, gid: &Gid) -> Vec { + let mut events = Vec::new(); if let Some(writer) = self.writers.remove(gid) { self.admin_space.remove( &keformat!( @@ -175,17 +189,36 @@ impl DiscoveredEntities { .unwrap(), ); + if self.bare_dds_writers.remove(gid) { + events.push(ROS2DiscoveryEvent::UndiscoveredBareDdsMsgPub( + BareDdsMsgPub { + name: dds_message_topic_to_ros2_name(&writer.topic_name) + .unwrap_or_else(|| writer.topic_name[2..].to_string()), + typ: dds_type_to_ros2_message_type(&writer.type_name), + writer: writer.key, + keyless: writer.keyless, + qos: writer.qos.clone(), + }, + )); + } + // Remove the Writer from any NodeInfo that might use it, possibly leading to a UndiscoveredX event for nodes_map in self.nodes_info.values_mut() { for node in nodes_map.values_mut() { if let Some(e) = node.remove_writer(gid) { - // A Reader can be used by only 1 Node, no need to go on with loops - return Some(e); + events.push(e); + break; } } + if events + .iter() + .any(|event| matches!(event, ROS2DiscoveryEvent::UndiscoveredMsgPub(_, _))) + { + break; + } } } - None + events } #[inline] @@ -380,6 +413,21 @@ impl DiscoveredEntities { events } + fn make_bare_dds_msg_pub(&self, writer: &DdsEntity, config: &Config) -> Option { + let name = dds_message_topic_to_ros2_name(&writer.topic_name)?; + if !config.is_bare_dds_publisher_enabled(&name) || !config.is_publisher_allowed(&name) { + return None; + } + + Some(BareDdsMsgPub { + name, + typ: dds_type_to_ros2_message_type(&writer.type_name), + writer: writer.key, + keyless: writer.keyless, + qos: writer.qos.clone(), + }) + } + fn get_entity_json_value( &self, entity_ref: &EntityRef, diff --git a/zenoh-plugin-ros2dds/src/discovery_mgr.rs b/zenoh-plugin-ros2dds/src/discovery_mgr.rs index 4f27ad72..cb11eecc 100644 --- a/zenoh-plugin-ros2dds/src/discovery_mgr.rs +++ b/zenoh-plugin-ros2dds/src/discovery_mgr.rs @@ -27,11 +27,13 @@ use zenoh::{ }; use crate::{ + config::Config, dds_discovery::*, discovered_entities::DiscoveredEntities, events::ROS2DiscoveryEvent, ros_discovery::*, ChannelEvent, ROS_DISCOVERY_INFO_POLL_INTERVAL_MS, }; pub struct DiscoveryMgr { + pub config: Arc, pub participant: dds_entity_t, pub ros_discovery_mgr: Arc, pub discovered_entities: Arc>, @@ -39,10 +41,12 @@ pub struct DiscoveryMgr { impl DiscoveryMgr { pub fn create( + config: Arc, participant: dds_entity_t, ros_discovery_mgr: Arc, ) -> DiscoveryMgr { DiscoveryMgr { + config, participant, ros_discovery_mgr, discovered_entities: Arc::new(RwLock::new(Default::default())), @@ -59,6 +63,7 @@ impl DiscoveryMgr { let ros_discovery_mgr = self.ros_discovery_mgr.clone(); let discovered_entities = self.discovered_entities.clone(); + let config = self.config.clone(); task::spawn(async move { // Timer for periodic read of "ros_discovery_info" topic @@ -86,16 +91,16 @@ impl DiscoveryMgr { } }, DDSDiscoveryEvent::DiscoveredPublication{entity} => { - let e = zwrite!(discovered_entities).add_writer(entity); - if let Some(e) = e { + let events = zwrite!(discovered_entities).add_writer(entity, &config); + for e in events { if let Err(err) = evt_sender.try_send(e) { tracing::error!("Internal error: failed to send DDSDiscoveryEvent to main loop: {err}"); } } }, DDSDiscoveryEvent::UndiscoveredPublication{key} => { - let e = zwrite!(discovered_entities).remove_writer(&key); - if let Some(e) = e { + let events = zwrite!(discovered_entities).remove_writer(&key); + for e in events { if let Err(err) = evt_sender.try_send(e) { tracing::error!("Internal error: failed to send DDSDiscoveryEvent to main loop: {err}"); } diff --git a/zenoh-plugin-ros2dds/src/events.rs b/zenoh-plugin-ros2dds/src/events.rs index a2210bce..ca314eb2 100644 --- a/zenoh-plugin-ros2dds/src/events.rs +++ b/zenoh-plugin-ros2dds/src/events.rs @@ -17,13 +17,30 @@ use std::fmt::Display; use cyclors::qos::Qos; use zenoh::key_expr::OwnedKeyExpr; -use crate::node_info::*; +use crate::{gid::Gid, node_info::*}; + +#[derive(Clone, Debug)] +pub struct BareDdsMsgPub { + pub name: String, + pub typ: String, + pub writer: Gid, + pub keyless: bool, + pub qos: Qos, +} + +impl Display for BareDdsMsgPub { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Bare DDS Publisher {}: {}", self.name, self.typ) + } +} /// A (local) discovery event of a ROS2 interface #[derive(Debug)] pub enum ROS2DiscoveryEvent { DiscoveredMsgPub(String, MsgPub), UndiscoveredMsgPub(String, MsgPub), + DiscoveredBareDdsMsgPub(BareDdsMsgPub), + UndiscoveredBareDdsMsgPub(BareDdsMsgPub), DiscoveredMsgSub(String, MsgSub), UndiscoveredMsgSub(String, MsgSub), DiscoveredServiceSrv(String, ServiceSrv), @@ -41,12 +58,14 @@ impl std::fmt::Display for ROS2DiscoveryEvent { use ROS2DiscoveryEvent::*; match self { DiscoveredMsgPub(node, iface) => write!(f, "Node {node} declares {iface}"), + DiscoveredBareDdsMsgPub(iface) => write!(f, "Discovered {iface}"), DiscoveredMsgSub(node, iface) => write!(f, "Node {node} declares {iface}"), DiscoveredServiceSrv(node, iface) => write!(f, "Node {node} declares {iface}"), DiscoveredServiceCli(node, iface) => write!(f, "Node {node} declares {iface}"), DiscoveredActionSrv(node, iface) => write!(f, "Node {node} declares {iface}"), DiscoveredActionCli(node, iface) => write!(f, "Node {node} declares {iface}"), UndiscoveredMsgPub(node, iface) => write!(f, "Node {node} undeclares {iface}"), + UndiscoveredBareDdsMsgPub(iface) => write!(f, "Undiscovered {iface}"), UndiscoveredMsgSub(node, iface) => write!(f, "Node {node} undeclares {iface}"), UndiscoveredServiceSrv(node, iface) => write!(f, "Node {node} undeclares {iface}"), UndiscoveredServiceCli(node, iface) => write!(f, "Node {node} undeclares {iface}"), diff --git a/zenoh-plugin-ros2dds/src/lib.rs b/zenoh-plugin-ros2dds/src/lib.rs index 8dd30b0e..8eb19717 100644 --- a/zenoh-plugin-ros2dds/src/lib.rs +++ b/zenoh-plugin-ros2dds/src/lib.rs @@ -468,7 +468,8 @@ impl ROS2PluginRuntime { // Create and start DiscoveryManager let (tx, discovery_rcv): (Sender, Receiver) = unbounded(); - let mut discovery_mgr = DiscoveryMgr::create(self.participant, ros_discovery_mgr.clone()); + let mut discovery_mgr = + DiscoveryMgr::create(self.config.clone(), self.participant, ros_discovery_mgr.clone()); discovery_mgr.run(tx).await; // Create RoutesManager @@ -666,6 +667,10 @@ impl ROS2PluginRuntime { if let Some(allowance) = &self.config.allowance { use ROS2DiscoveryEvent::*; match evt { + DiscoveredBareDdsMsgPub(iface) | UndiscoveredBareDdsMsgPub(iface) => { + self.config.is_publisher_allowed(&iface.name) + && self.config.is_bare_dds_publisher_enabled(&iface.name) + } DiscoveredMsgPub(_, iface) | UndiscoveredMsgPub(_, iface) => { allowance.is_publisher_allowed(&iface.name) } @@ -686,8 +691,13 @@ impl ROS2PluginRuntime { } } } else { - // no allow/deny configured => allow all - true + match evt { + ROS2DiscoveryEvent::DiscoveredBareDdsMsgPub(iface) + | ROS2DiscoveryEvent::UndiscoveredBareDdsMsgPub(iface) => self + .config + .is_bare_dds_publisher_enabled(&iface.name), + _ => true, + } } } diff --git a/zenoh-plugin-ros2dds/src/ros2_utils.rs b/zenoh-plugin-ros2dds/src/ros2_utils.rs index 87d4239a..8ecfcb31 100644 --- a/zenoh-plugin-ros2dds/src/ros2_utils.rs +++ b/zenoh-plugin-ros2dds/src/ros2_utils.rs @@ -126,6 +126,17 @@ pub fn key_expr_to_ros2_name(key_expr: &keyexpr, config: &Config) -> String { } } +/// Convert a DDS message topic name to a ROS2 message name. +/// Returns `None` for non-message topics and action helper topics. +pub fn dds_message_topic_to_ros2_name(dds_topic: &str) -> Option { + let ros2_name = dds_topic.strip_prefix("rt").map(str::to_string)?; + if is_message_for_action(&ros2_name) { + None + } else { + Some(ros2_name) + } +} + /// Convert DDS Topic type to ROS2 Message type pub fn dds_type_to_ros2_message_type(dds_topic: &str) -> String { let result = dds_topic.replace("::dds_::", "::").replace("::", "/"); diff --git a/zenoh-plugin-ros2dds/src/route_publisher.rs b/zenoh-plugin-ros2dds/src/route_publisher.rs index 90b7d473..93bf7378 100644 --- a/zenoh-plugin-ros2dds/src/route_publisher.rs +++ b/zenoh-plugin-ros2dds/src/route_publisher.rs @@ -40,6 +40,7 @@ use crate::{ create_dds_reader, delete_dds_entity, get_guid, serialize_atomic_entity_guid, AtomicDDSEntity, DDS_ENTITY_NULL, }, + gid::Gid, liveliness_mgt::new_ke_liveliness_pub, qos_helpers::*, ros2_utils::{is_message_for_action, ros2_message_type_to_dds_type}, @@ -105,6 +106,8 @@ pub struct RoutePublisher { remote_routes: HashSet, // the list of nodes served by this route local_nodes: HashSet, + // the list of local bare DDS Writers served by this route + bare_dds_publishers: HashSet, } impl Drop for RoutePublisher { @@ -277,6 +280,7 @@ impl RoutePublisher { liveliness_token: None, remote_routes: HashSet::new(), local_nodes: HashSet::new(), + bare_dds_publishers: HashSet::new(), }) } @@ -347,12 +351,17 @@ impl RoutePublisher { !self.remote_routes.is_empty() } + #[inline] + fn local_source_count(&self) -> usize { + self.local_nodes.len() + self.bare_dds_publishers.len() + } + #[inline] pub async fn add_local_node(&mut self, node: String, discovered_writer_qos: &Qos) { if self.local_nodes.insert(node) { tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if 1st local node added, announce the route - if self.local_nodes.len() == 1 { + // if 1st local source added, announce the route + if self.local_source_count() == 1 { if let Err(e) = self.announce_route(discovered_writer_qos).await { tracing::error!("{self} announcement failed: {e}"); } @@ -364,8 +373,8 @@ impl RoutePublisher { pub fn remove_local_node(&mut self, node: &str) { if self.local_nodes.remove(node) { tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if last local node removed, retire the route - if self.local_nodes.is_empty() { + // if last local source removed, retire the route + if self.local_source_count() == 0 { self.retire_route(); } } @@ -373,7 +382,39 @@ impl RoutePublisher { #[inline] pub fn is_serving_local_node(&self) -> bool { - !self.local_nodes.is_empty() + self.local_source_count() > 0 + } + + #[inline] + pub async fn add_local_bare_publisher( + &mut self, + writer: Gid, + discovered_writer_qos: &Qos, + ) { + if self.bare_dds_publishers.insert(writer) { + tracing::debug!( + "{self} now serving bare DDS publishers {:?}", + self.bare_dds_publishers + ); + if self.local_source_count() == 1 { + if let Err(e) = self.announce_route(discovered_writer_qos).await { + tracing::error!("{self} announcement failed: {e}"); + } + } + } + } + + #[inline] + pub fn remove_local_bare_publisher(&mut self, writer: &Gid) { + if self.bare_dds_publishers.remove(writer) { + tracing::debug!( + "{self} now serving bare DDS publishers {:?}", + self.bare_dds_publishers + ); + if self.local_source_count() == 0 { + self.retire_route(); + } + } } #[inline] diff --git a/zenoh-plugin-ros2dds/src/routes_mgr.rs b/zenoh-plugin-ros2dds/src/routes_mgr.rs index 4da1680c..8d0fdff0 100644 --- a/zenoh-plugin-ros2dds/src/routes_mgr.rs +++ b/zenoh-plugin-ros2dds/src/routes_mgr.rs @@ -167,6 +167,19 @@ impl RoutesMgr { } } + DiscoveredBareDdsMsgPub(iface) => { + let route = self + .get_or_create_route_publisher( + iface.name, + iface.typ, + iface.keyless, + adapt_writer_qos_for_reader(&iface.qos), + true, + ) + .await?; + route.add_local_bare_publisher(iface.writer, &iface.qos).await; + } + UndiscoveredMsgPub(node, iface) => { if let Entry::Occupied(mut entry) = self.routes_publishers.entry(iface.name.clone()) { @@ -181,6 +194,21 @@ impl RoutesMgr { } } + UndiscoveredBareDdsMsgPub(iface) => { + if let Entry::Occupied(mut entry) = self.routes_publishers.entry(iface.name.clone()) + { + let route = entry.get_mut(); + route.remove_local_bare_publisher(&iface.writer); + if route.is_unused() { + let iface_ke = unsafe { keyexpr::from_str_unchecked(&iface.name[1..]) }; + self.admin_space + .remove(&(*KE_PREFIX_ROUTE_PUBLISHER / iface_ke)); + let route = entry.remove(); + tracing::info!("{route} removed"); + } + } + } + DiscoveredMsgSub(node, iface) => { // Pick 1 discovered Reader amongst the possibly multiple ones listed in MsgSub let entity = {