Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,24 @@ jobs:
- name: Code format check
run: cargo fmt --check -- --config "unstable_features=true,imports_granularity=Crate,group_imports=StdExternalCrate"
- name: Clippy without Cargo.lock
if: ${{ !startsWith(matrix.os, 'windows') }}
run: |
rm Cargo.lock
cargo clippy --workspace --all-targets --features stats,dynamic_plugin,dds_shm -- --deny warnings
git restore Cargo.lock

- name: Clippy without Cargo.lock (Windows, no dds_shm)
if: startsWith(matrix.os, 'windows')
run: |
rm Cargo.lock
cargo clippy --workspace --all-targets --features stats,dynamic_plugin -- --deny warnings
git restore Cargo.lock

- name: Build zenoh-plugin-ros2dds
run: cargo build -p zenoh-plugin-ros2dds --verbose --all-targets

- name: Build zenoh-plugin-ros2dds (with dds_shm)
if: ${{ !startsWith(matrix.os, 'windows') }}
run: cargo build -p zenoh-plugin-ros2dds --features dds_shm --verbose --all-targets

- name: Build zenoh-plugin-ros2dds (with prefix_symbols)
Expand All @@ -77,6 +86,7 @@ jobs:
run: cargo build -p zenoh-bridge-ros2dds --verbose --all-targets

- name: Build zenoh-bridge-ros2dds (with dds_shm)
if: ${{ !startsWith(matrix.os, 'windows') }}
run: cargo build -p zenoh-bridge-ros2dds --features dds_shm --verbose --all-targets

- name: Build zenoh-bridge-ros2dds (with prefix_symbols)
Expand All @@ -86,6 +96,7 @@ jobs:
run: cargo test --verbose

- name: Run tests (with dds_shm)
if: ${{ !startsWith(matrix.os, 'windows') }}
run: cargo test --features dds_shm --verbose

- name: Run tests (with prefix_symbols)
Expand Down
20 changes: 15 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async-trait = "0.1.66"
bincode = "1.3.3"
cdr = "0.2.4"
clap = "4.4.11"
cyclors = "=0.2.7"
cyclors = "=0.3.10"
derivative = "2.2.0"
flume = "0.11.0"
futures = "0.3.26"
Expand Down
125 changes: 7 additions & 118 deletions zenoh-plugin-ros2dds/src/dds_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,87 +42,25 @@ impl Drop for TypeInfo {
unsafe impl Send for TypeInfo {}
unsafe impl Sync for TypeInfo {}

#[cfg(feature = "dds_shm")]
#[derive(Clone, Copy)]
struct IoxChunk {
ptr: *mut std::ffi::c_void,
header: *mut iceoryx_header_t,
}

#[cfg(feature = "dds_shm")]
impl IoxChunk {
fn as_slice(&self) -> &[u8] {
unsafe { slice::from_raw_parts(self.ptr as *const u8, (*self.header).data_size as usize) }
}

fn len(&self) -> usize {
unsafe { (*self.header).data_size as usize }
}
}

pub struct DDSRawSample {
sdref: *mut ddsi_serdata,
data: ddsrt_iovec_t,
#[cfg(feature = "dds_shm")]
iox_chunk: Option<IoxChunk>,
}

impl DDSRawSample {
pub unsafe fn create(serdata: *const ddsi_serdata) -> DDSRawSample {
let mut sdref: *mut ddsi_serdata = std::ptr::null_mut();
let mut data = ddsrt_iovec_t {
iov_base: std::ptr::null_mut(),
iov_len: 0,
};

#[cfg(feature = "dds_shm")]
let iox_chunk: Option<IoxChunk> = match ((*serdata).iox_chunk).is_null() {
false => {
let iox_chunk_ptr = (*serdata).iox_chunk;
let header = iceoryx_header_from_chunk(iox_chunk_ptr);

// If the Iceoryx chunk contains raw sample data this needs to be serialized before forwading to Zenoh
if (*header).shm_data_state == iox_shm_data_state_t_IOX_CHUNK_CONTAINS_RAW_DATA {
let serialized_serdata = ddsi_serdata_from_sample(
(*serdata).type_,
(*serdata).kind,
(*serdata).iox_chunk,
);

let size = ddsi_serdata_size(serialized_serdata);
sdref =
ddsi_serdata_to_ser_ref(serialized_serdata, 0, size as usize, &mut data);
ddsi_serdata_unref(serialized_serdata);

// IoxChunk not needed where raw data has been serialized
None
} else {
Some(IoxChunk {
ptr: iox_chunk_ptr,
header,
})
}
}
true => None,
};
// With cyclors 0.3.x / CycloneDDS PSMX, shared memory transport is handled
// transparently by CycloneDDS. Data is always available in serialized form
// through ddsi_serdata_to_ser_ref, regardless of the underlying transport.
let size = ddsi_serdata_size(serdata);
let sdref = ddsi_serdata_to_ser_ref(serdata, 0, size as usize, &mut data);

// At this point sdref will be null if:
//
// * Iceoryx was not enabled/used - in this case data will contain the CDR header and payload
// * Iceoryx chunk contained serialized data - in this case data will contain the CDR header
if sdref.is_null() {
let size = ddsi_serdata_size(serdata);
sdref = ddsi_serdata_to_ser_ref(serdata, 0, size as usize, &mut data);
}

#[cfg(feature = "dds_shm")]
return DDSRawSample {
sdref,
data,
iox_chunk,
};
#[cfg(not(feature = "dds_shm"))]
return DDSRawSample { sdref, data };
DDSRawSample { sdref, data }
}

fn data_as_slice(&self) -> &[u8] {
Expand All @@ -136,12 +74,6 @@ impl DDSRawSample {

pub fn payload_as_slice(&self) -> &[u8] {
unsafe {
#[cfg(feature = "dds_shm")]
{
if let Some(iox_chunk) = self.iox_chunk.as_ref() {
return iox_chunk.as_slice();
}
}
&slice::from_raw_parts(
self.data.iov_base as *const u8,
ddsrt_iov_len_to_usize(self.data.iov_len).unwrap(),
Expand All @@ -150,29 +82,10 @@ impl DDSRawSample {
}

pub fn hex_encode(&self) -> String {
let mut encoded = String::new();
let data_encoded = hex::encode(self.data_as_slice());
encoded.push_str(data_encoded.as_str());

#[cfg(feature = "dds_shm")]
{
if let Some(iox_chunk) = self.iox_chunk.as_ref() {
let iox_encoded = hex::encode(iox_chunk.as_slice());
encoded.push_str(iox_encoded.as_str());
}
}

encoded
hex::encode(self.data_as_slice())
}

pub fn len(&self) -> usize {
#[cfg(feature = "dds_shm")]
{
TryInto::<usize>::try_into(self.data.iov_len).unwrap()
+ self.iox_chunk.as_ref().map(IoxChunk::len).unwrap_or(0)
}

#[cfg(not(feature = "dds_shm"))]
ddsrt_iov_len_to_usize(self.data.iov_len).unwrap()
}
}
Expand All @@ -187,36 +100,12 @@ impl Drop for DDSRawSample {

impl fmt::Debug for DDSRawSample {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#[cfg(feature = "dds_shm")]
{
// Where data was received via Iceoryx write both the header (contained in buf.data) and
// payload (contained in buf.iox_chunk) to the formatter.
if let Some(iox_chunk) = self.iox_chunk {
return write!(
f,
"[{:02x?}, {:02x?}]",
self.data_as_slice(),
iox_chunk.as_slice()
);
}
}
write!(f, "{:02x?}", self.data_as_slice())
}
}

impl From<&DDSRawSample> for ZBytes {
fn from(buf: &DDSRawSample) -> Self {
#[cfg(feature = "dds_shm")]
{
// Where data was received via Iceoryx return both the header (contained in buf.data) and
// payload (contained in buf.iox_chunk) in a buffer.
if let Some(iox_chunk) = buf.iox_chunk {
let mut buf_and_iox_chunk = buf.data_as_slice().to_vec();
buf_and_iox_chunk.append(&mut iox_chunk.as_slice().to_vec());
let z_bytes = ZBytes::from(buf_and_iox_chunk);
return z_bytes;
}
}
buf.data_as_slice().into()
}
}
Loading