Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@
// action_clients: [],
// },

////
//// bare_dds_publishers: Allowlist of bare DDS message publishers that should be exposed to Zenoh
//// even if they do not appear in `ros_discovery_info` as ROS 2 publishers.
//// Matching uses ROS-style topic names such as "/low_state", not raw DDS topic names
//// such as "rt/low_state". Existing publisher allow/deny rules still apply.
// bare_dds_publishers: ["/low_state", ".*/telemetry"],

////
//// pub_max_frequencies: Specify a list of maximum frequency of publications routing over zenoh for a set of Publishers.
//// The strings must have the format "<regex>=<float>":
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ The `"ros2dds"` part of this same configuration file can also be used in the con

The command line arguments overwrite the equivalent keys configured in a configuration file.

For DDS applications that publish plain ROS-compatible DDS topics without contributing to the ROS graph, the bridge can optionally expose selected message topics to Zenoh via the `bare_dds_publishers` config. This setting is an allowlist of ROS-style topic names such as `/low_state`; it does not match raw DDS names like `rt/low_state`, and normal publisher allow/deny rules still apply.

## Connectivity configurations

### DDS communications
Expand Down
56 changes: 56 additions & 0 deletions zenoh-plugin-ros2dds/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ pub struct Config {
pub ros_static_peers: Option<Vec<String>>,
#[serde(default, flatten)]
pub allowance: Option<Allowance>,
#[serde(
default,
deserialize_with = "deserialize_vec_regex",
serialize_with = "serialize_vec_regex"
)]
pub bare_dds_publishers: Vec<Regex>,
#[serde(
default,
deserialize_with = "deserialize_vec_regex_f32",
Expand Down Expand Up @@ -83,6 +89,19 @@ pub struct Config {
}

impl Config {
pub fn is_publisher_allowed(&self, ros2_name: &str) -> bool {
self.allowance
.as_ref()
.map(|allowance| allowance.is_publisher_allowed(ros2_name))
.unwrap_or(true)
}

pub fn is_bare_dds_publisher_enabled(&self, ros2_name: &str) -> bool {
self.bare_dds_publishers
.iter()
.any(|regex| regex.is_match(ros2_name))
}

pub fn get_pub_max_frequencies(&self, ros2_name: &str) -> Option<f32> {
for (re, freq) in &self.pub_max_frequencies {
if re.is_match(ros2_name) {
Expand Down Expand Up @@ -661,6 +680,31 @@ where
}
}

fn deserialize_vec_regex<'de, D>(deserializer: D) -> Result<Vec<Regex>, D::Error>
where
D: Deserializer<'de>,
{
let strs: Vec<String> = Deserialize::deserialize(deserializer).unwrap();
let mut result: Vec<Regex> = Vec::with_capacity(strs.len());
for s in strs {
let regex = Regex::new(&s)
.map_err(|e| de::Error::custom(format!("Invalid regex '{s}': {e}")))?;
result.push(regex);
}
Ok(result)
}

fn serialize_vec_regex<S>(v: &Vec<Regex>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut seq = serializer.serialize_seq(Some(v.len()))?;
for r in v {
seq.serialize_element(r.as_str())?;
}
seq.end()
}

fn serialize_vec_regex_f32<S>(v: &Vec<(Regex, f32)>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
Expand Down Expand Up @@ -902,6 +946,18 @@ mod tests {
assert_eq!(__required__, None);
}

#[test]
fn test_bare_dds_publishers() {
let config = serde_json::from_str::<Config>(
r#"{"bare_dds_publishers": ["/low_state", ".*/telemetry"]}"#,
)
.unwrap();

assert!(config.is_bare_dds_publisher_enabled("/low_state"));
assert!(config.is_bare_dds_publisher_enabled("/robot/telemetry"));
assert!(!config.is_bare_dds_publisher_enabled("/image_left_raw"));
}

#[test]
fn test_required_field() {
// See: https://github.com/eclipse-zenoh/zenoh-plugin-webserver/issues/19
Expand Down
70 changes: 59 additions & 11 deletions zenoh-plugin-ros2dds/src/discovered_entities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//

use std::{
collections::HashMap,
collections::{HashMap, HashSet},
fmt::{self, Debug},
};

Expand All @@ -27,10 +27,12 @@ use zenoh::{
};

use crate::{
config::Config,
dds_discovery::{DdsEntity, DdsParticipant},
events::ROS2DiscoveryEvent,
events::{BareDdsMsgPub, ROS2DiscoveryEvent},
gid::Gid,
node_info::*,
ros2_utils::{dds_message_topic_to_ros2_name, dds_type_to_ros2_message_type},
ros_discovery::{NodeEntitiesInfo, ParticipantEntitiesInfo},
};

Expand All @@ -46,6 +48,7 @@ pub struct DiscoveredEntities {
participants: HashMap<Gid, DdsParticipant>,
writers: HashMap<Gid, DdsEntity>,
readers: HashMap<Gid, DdsEntity>,
bare_dds_writers: HashSet<Gid>,
ros_participant_info: HashMap<Gid, ParticipantEntitiesInfo>,
nodes_info: HashMap<Gid, HashMap<String, NodeInfo>>,
admin_space: HashMap<OwnedKeyExpr, EntityRef>,
Expand All @@ -68,6 +71,7 @@ impl Debug for DiscoveredEntities {
"readers: {:?}",
self.readers.keys().collect::<Vec<&Gid>>()
)?;
writeln!(f, "bare_dds_writers: {:?}", self.bare_dds_writers)?;
writeln!(f, "ros_participant_info: {:?}", self.ros_participant_info)?;
writeln!(f, "nodes_info: {:?}", self.nodes_info)?;
writeln!(
Expand Down Expand Up @@ -119,7 +123,7 @@ impl DiscoveredEntities {
}

#[inline]
pub fn add_writer(&mut self, writer: DdsEntity) -> Option<ROS2DiscoveryEvent> {
pub fn add_writer(&mut self, writer: DdsEntity, config: &Config) -> Vec<ROS2DiscoveryEvent> {
// insert in admin space
self.admin_space.insert(
keformat!(
Expand All @@ -133,7 +137,7 @@ impl DiscoveredEntities {
);

// Check if this Writer is present in some NodeInfo.undiscovered_writer list
let mut event: Option<ROS2DiscoveryEvent> = None;
let mut events = Vec::new();
for nodes_map in self.nodes_info.values_mut() {
for node in nodes_map.values_mut() {
if let Some(i) = node
Expand All @@ -143,18 +147,27 @@ impl DiscoveredEntities {
{
// update the NodeInfo with this Writer's info
node.undiscovered_writer.remove(i);
event = node.update_with_writer(&writer);
if let Some(event) = node.update_with_writer(&writer) {
events.push(event);
}
break;
}
}
if event.is_some() {
if !events.is_empty() {
break;
}
}

if events.is_empty() {
if let Some(bare_msg_pub) = self.make_bare_dds_msg_pub(&writer, config) {
self.bare_dds_writers.insert(writer.key);
events.push(ROS2DiscoveryEvent::DiscoveredBareDdsMsgPub(bare_msg_pub));
}
}

// insert in Writers list
self.writers.insert(writer.key, writer);
event
events
}

#[inline]
Expand All @@ -163,7 +176,8 @@ impl DiscoveredEntities {
}

#[inline]
pub fn remove_writer(&mut self, gid: &Gid) -> Option<ROS2DiscoveryEvent> {
pub fn remove_writer(&mut self, gid: &Gid) -> Vec<ROS2DiscoveryEvent> {
let mut events = Vec::new();
if let Some(writer) = self.writers.remove(gid) {
self.admin_space.remove(
&keformat!(
Expand All @@ -175,17 +189,36 @@ impl DiscoveredEntities {
.unwrap(),
);

if self.bare_dds_writers.remove(gid) {
events.push(ROS2DiscoveryEvent::UndiscoveredBareDdsMsgPub(
BareDdsMsgPub {
name: dds_message_topic_to_ros2_name(&writer.topic_name)
.unwrap_or_else(|| writer.topic_name[2..].to_string()),
typ: dds_type_to_ros2_message_type(&writer.type_name),
writer: writer.key,
keyless: writer.keyless,
qos: writer.qos.clone(),
},
));
}

// Remove the Writer from any NodeInfo that might use it, possibly leading to a UndiscoveredX event
for nodes_map in self.nodes_info.values_mut() {
for node in nodes_map.values_mut() {
if let Some(e) = node.remove_writer(gid) {
// A Reader can be used by only 1 Node, no need to go on with loops
return Some(e);
events.push(e);
break;
}
}
if events
.iter()
.any(|event| matches!(event, ROS2DiscoveryEvent::UndiscoveredMsgPub(_, _)))
{
break;
}
}
}
None
events
}

#[inline]
Expand Down Expand Up @@ -380,6 +413,21 @@ impl DiscoveredEntities {
events
}

fn make_bare_dds_msg_pub(&self, writer: &DdsEntity, config: &Config) -> Option<BareDdsMsgPub> {
let name = dds_message_topic_to_ros2_name(&writer.topic_name)?;
if !config.is_bare_dds_publisher_enabled(&name) || !config.is_publisher_allowed(&name) {
return None;
}

Some(BareDdsMsgPub {
name,
typ: dds_type_to_ros2_message_type(&writer.type_name),
writer: writer.key,
keyless: writer.keyless,
qos: writer.qos.clone(),
})
}

fn get_entity_json_value(
&self,
entity_ref: &EntityRef,
Expand Down
13 changes: 9 additions & 4 deletions zenoh-plugin-ros2dds/src/discovery_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,26 @@ use zenoh::{
};

use crate::{
config::Config,
dds_discovery::*, discovered_entities::DiscoveredEntities, events::ROS2DiscoveryEvent,
ros_discovery::*, ChannelEvent, ROS_DISCOVERY_INFO_POLL_INTERVAL_MS,
};

pub struct DiscoveryMgr {
pub config: Arc<Config>,
pub participant: dds_entity_t,
pub ros_discovery_mgr: Arc<RosDiscoveryInfoMgr>,
pub discovered_entities: Arc<RwLock<DiscoveredEntities>>,
}

impl DiscoveryMgr {
pub fn create(
config: Arc<Config>,
participant: dds_entity_t,
ros_discovery_mgr: Arc<RosDiscoveryInfoMgr>,
) -> DiscoveryMgr {
DiscoveryMgr {
config,
participant,
ros_discovery_mgr,
discovered_entities: Arc::new(RwLock::new(Default::default())),
Expand All @@ -59,6 +63,7 @@ impl DiscoveryMgr {

let ros_discovery_mgr = self.ros_discovery_mgr.clone();
let discovered_entities = self.discovered_entities.clone();
let config = self.config.clone();

task::spawn(async move {
// Timer for periodic read of "ros_discovery_info" topic
Expand Down Expand Up @@ -86,16 +91,16 @@ impl DiscoveryMgr {
}
},
DDSDiscoveryEvent::DiscoveredPublication{entity} => {
let e = zwrite!(discovered_entities).add_writer(entity);
if let Some(e) = e {
let events = zwrite!(discovered_entities).add_writer(entity, &config);
for e in events {
if let Err(err) = evt_sender.try_send(e) {
tracing::error!("Internal error: failed to send DDSDiscoveryEvent to main loop: {err}");
}
}
},
DDSDiscoveryEvent::UndiscoveredPublication{key} => {
let e = zwrite!(discovered_entities).remove_writer(&key);
if let Some(e) = e {
let events = zwrite!(discovered_entities).remove_writer(&key);
for e in events {
if let Err(err) = evt_sender.try_send(e) {
tracing::error!("Internal error: failed to send DDSDiscoveryEvent to main loop: {err}");
}
Expand Down
21 changes: 20 additions & 1 deletion zenoh-plugin-ros2dds/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,30 @@ use std::fmt::Display;
use cyclors::qos::Qos;
use zenoh::key_expr::OwnedKeyExpr;

use crate::node_info::*;
use crate::{gid::Gid, node_info::*};

#[derive(Clone, Debug)]
pub struct BareDdsMsgPub {
pub name: String,
pub typ: String,
pub writer: Gid,
pub keyless: bool,
pub qos: Qos,
}

impl Display for BareDdsMsgPub {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Bare DDS Publisher {}: {}", self.name, self.typ)
}
}

/// A (local) discovery event of a ROS2 interface
#[derive(Debug)]
pub enum ROS2DiscoveryEvent {
DiscoveredMsgPub(String, MsgPub),
UndiscoveredMsgPub(String, MsgPub),
DiscoveredBareDdsMsgPub(BareDdsMsgPub),
UndiscoveredBareDdsMsgPub(BareDdsMsgPub),
DiscoveredMsgSub(String, MsgSub),
UndiscoveredMsgSub(String, MsgSub),
DiscoveredServiceSrv(String, ServiceSrv),
Expand All @@ -41,12 +58,14 @@ impl std::fmt::Display for ROS2DiscoveryEvent {
use ROS2DiscoveryEvent::*;
match self {
DiscoveredMsgPub(node, iface) => write!(f, "Node {node} declares {iface}"),
DiscoveredBareDdsMsgPub(iface) => write!(f, "Discovered {iface}"),
DiscoveredMsgSub(node, iface) => write!(f, "Node {node} declares {iface}"),
DiscoveredServiceSrv(node, iface) => write!(f, "Node {node} declares {iface}"),
DiscoveredServiceCli(node, iface) => write!(f, "Node {node} declares {iface}"),
DiscoveredActionSrv(node, iface) => write!(f, "Node {node} declares {iface}"),
DiscoveredActionCli(node, iface) => write!(f, "Node {node} declares {iface}"),
UndiscoveredMsgPub(node, iface) => write!(f, "Node {node} undeclares {iface}"),
UndiscoveredBareDdsMsgPub(iface) => write!(f, "Undiscovered {iface}"),
UndiscoveredMsgSub(node, iface) => write!(f, "Node {node} undeclares {iface}"),
UndiscoveredServiceSrv(node, iface) => write!(f, "Node {node} undeclares {iface}"),
UndiscoveredServiceCli(node, iface) => write!(f, "Node {node} undeclares {iface}"),
Expand Down
Loading
Loading