diff --git a/Cargo.lock b/Cargo.lock index 7e7b4220f1..b976bdca1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3058,6 +3058,7 @@ dependencies = [ "libdd-tinybytes", "libdd-trace-utils", "rmp-serde", + "tokio-util", "tracing", ] @@ -3256,6 +3257,7 @@ dependencies = [ "libdd-library-config-ffi", "libdd-log-ffi", "libdd-profiling", + "libdd-shared-runtime-ffi", "libdd-telemetry-ffi", "serde_json", "symbolizer-ffi", diff --git a/builder/Cargo.toml b/builder/Cargo.toml index e8efb90671..4c07580c7d 100644 --- a/builder/Cargo.toml +++ b/builder/Cargo.toml @@ -19,6 +19,7 @@ default = [ "log", "ddsketch", "ffe", + "shared-runtime", ] crashtracker = [] profiling = [] @@ -29,6 +30,7 @@ library-config = [] log = [] ddsketch = [] ffe = [] +shared-runtime = [] regex-lite = ["libdd-common/regex-lite"] [lib] diff --git a/builder/src/bin/release.rs b/builder/src/bin/release.rs index dc0d972cf6..ff0904ee7e 100644 --- a/builder/src/bin/release.rs +++ b/builder/src/bin/release.rs @@ -74,6 +74,8 @@ pub fn main() { f.push("ddsketch-ffi".to_string()); #[cfg(feature = "ffe")] f.push("datadog-ffe-ffi".to_string()); + #[cfg(feature = "shared-runtime")] + f.push("shared-runtime".to_string()); f }; diff --git a/builder/src/profiling.rs b/builder/src/profiling.rs index ca3f9f6ada..8c8f213aee 100644 --- a/builder/src/profiling.rs +++ b/builder/src/profiling.rs @@ -59,6 +59,8 @@ impl Profiling { headers.push("ddsketch.h"); #[cfg(feature = "ffe")] headers.push("ffe.h"); + #[cfg(feature = "shared-runtime")] + headers.push("shared-runtime.h"); let mut origin_path: PathBuf = [&self.source_include, "dummy.h"].iter().collect(); let mut target_path: PathBuf = [&self.target_include, "dummy.h"].iter().collect(); diff --git a/libdd-data-pipeline-ffi/Cargo.toml b/libdd-data-pipeline-ffi/Cargo.toml index 758788ae90..300786b331 100644 --- a/libdd-data-pipeline-ffi/Cargo.toml +++ b/libdd-data-pipeline-ffi/Cargo.toml @@ -36,4 +36,5 @@ libdd-shared-runtime = { version = "1.0.0", path = "../libdd-shared-runtime" } libdd-common-ffi = { path = "../libdd-common-ffi", default-features = false } libdd-tinybytes = { path = "../libdd-tinybytes" } libdd-trace-utils = { path = "../libdd-trace-utils" } +tokio-util = "0.7.11" tracing = { version = "0.1", default-features = false } diff --git a/libdd-data-pipeline-ffi/README.md b/libdd-data-pipeline-ffi/README.md index bd053ad832..377bcc5204 100644 --- a/libdd-data-pipeline-ffi/README.md +++ b/libdd-data-pipeline-ffi/README.md @@ -5,3 +5,7 @@ C FFI bindings for the libdd-data-pipeline library. ## Overview `libdd-data-pipeline-ffi` provides C-compatible FFI bindings for `libdd-data-pipeline`, enabling high-performance trace processing from C, C++, PHP, Ruby, Python, and other languages. + +## Dependencies + +This crate depends on `tokio-util` for its `CancellationToken` type. The cancellation token created by `ddog_trace_exporter_cancel_token_new` and passed to `ddog_trace_exporter_send_trace_chunks` is a `tokio_util::sync::CancellationToken`, which the data pipeline uses to cooperatively abort an in-flight send. The token is exposed opaquely to C, so callers never need to depend on `tokio-util` themselves. diff --git a/libdd-data-pipeline-ffi/cbindgen.toml b/libdd-data-pipeline-ffi/cbindgen.toml index a0fe97f467..d3e36b4945 100644 --- a/libdd-data-pipeline-ffi/cbindgen.toml +++ b/libdd-data-pipeline-ffi/cbindgen.toml @@ -13,12 +13,13 @@ after_includes = """ typedef struct ddog_TraceExporter ddog_TraceExporter; typedef struct ddog_TracerSpan ddog_TracerSpan; typedef struct ddog_TracerTraceChunks ddog_TracerTraceChunks; +typedef struct ddog_TraceExporterCancelToken ddog_TraceExporterCancelToken; """ [export] prefix = "ddog_" renaming_overrides_prefixing = true -exclude = ["TraceExporter", "TracerSpan", "TracerTraceChunks"] +exclude = ["TraceExporter", "TracerSpan", "TracerTraceChunks", "TokioCancellationToken"] [export.rename] "ByteSlice" = "ddog_ByteSlice" @@ -32,6 +33,7 @@ exclude = ["TraceExporter", "TracerSpan", "TracerTraceChunks"] "TracerSpan" = "ddog_TracerSpan" "TracerSpanFields" = "ddog_TracerSpanFields" "TracerTraceChunks" = "ddog_TracerTraceChunks" +"TokioCancellationToken" = "ddog_TraceExporterCancelToken" [export.mangle] rename_types = "PascalCase" diff --git a/libdd-data-pipeline-ffi/src/tracer.rs b/libdd-data-pipeline-ffi/src/tracer.rs index 97d05dbe47..0ca46cd001 100644 --- a/libdd-data-pipeline-ffi/src/tracer.rs +++ b/libdd-data-pipeline-ffi/src/tracer.rs @@ -19,6 +19,8 @@ use libdd_tinybytes::BytesString; use libdd_trace_utils::span::v04::SpanBytes; use std::ptr::NonNull; +type TokioCancellationToken = tokio_util::sync::CancellationToken; + // --------------------------------------------------------------------------- // Helper // --------------------------------------------------------------------------- @@ -295,6 +297,49 @@ pub unsafe extern "C" fn ddog_tracer_trace_chunks_push_span( ) } +// --------------------------------------------------------------------------- +// Cancellation token +// --------------------------------------------------------------------------- + +/// Create a new cancellation token. +/// +/// The returned token must be freed with +/// [`ddog_trace_exporter_cancel_token_drop`]. +#[no_mangle] +pub extern "C" fn ddog_trace_exporter_cancel_token_new() -> Box { + Box::new(TokioCancellationToken::new()) +} + +/// Cancel a cancellation token. +/// +/// All clones of the same token observe the cancellation. If a +/// [`ddog_trace_exporter_send_trace_chunks`] call is using this token at the +/// time of cancellation, that send stops waiting for the agent at its next +/// await point and returns an error; the trace chunks it was sending may be +/// lost. +/// +/// Cancellation only affects a send that is in progress. If no send is using +/// the token, cancelling it has no immediate effect: a send started afterwards +/// with an already-cancelled token returns an error without contacting the +/// agent, and a token cancelled after its send has already finished does +/// nothing. +#[no_mangle] +pub extern "C" fn ddog_trace_exporter_cancel_token_cancel(token: Option<&TokioCancellationToken>) { + if let Some(token) = token { + token.cancel(); + } +} + +/// Free a cancellation token. +/// +/// After this call the token is invalid and must not be reused. +#[no_mangle] +pub extern "C" fn ddog_trace_exporter_cancel_token_drop( + token: Option>, +) { + drop(token); +} + // --------------------------------------------------------------------------- // Send trace chunks // --------------------------------------------------------------------------- @@ -305,13 +350,22 @@ pub unsafe extern "C" fn ddog_tracer_trace_chunks_push_span( /// serializes in the configured output format, and sends to the agent /// with retry logic. /// +/// When `cancel` is non-null, cancelling that token (via +/// [`ddog_trace_exporter_cancel_token_cancel`]) while the send is in progress +/// aborts the in-flight request and returns an error with code +/// [`ExporterErrorCode::IoError`]. Cancellation is cooperative: it only takes +/// effect while a request is actually in flight. A token that is already +/// cancelled when the send starts makes this function return that error +/// immediately, and cancelling after the send has finished has no effect. +/// Cancelling an in-flight send may cause the trace chunks being sent to be +/// lost. +/// /// On success, if `response_out` is non-null, a heap-allocated /// [`ExporterResponse`] is written there. The caller owns it and must /// free it with `ddog_trace_exporter_response_free`. /// /// # Safety /// -/// * `exporter` must be a valid `TraceExporter` pointer. /// * `chunks` is consumed and must not be used after this call. /// * If `response_out` is non-null it must point to valid writable memory for a /// `Box`. @@ -320,6 +374,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_send_trace_chunks( exporter: Option<&TraceExporter>, chunks: Option>, response_out: Option>>, + cancel: Option<&TokioCancellationToken>, ) -> Option> { let Some(exporter) = exporter else { return gen_error!(ErrorCode::InvalidArgument); @@ -329,7 +384,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_send_trace_chunks( }; catch_panic!( - match exporter.send_trace_chunks(chunks.0) { + match exporter.send_trace_chunks(chunks.0, cancel) { Ok(resp) => { if let Some(out) = response_out { out.as_ptr().write(Box::new(ExporterResponse::from(resp))); @@ -651,7 +706,7 @@ mod tests { fn send_trace_chunks_null_exporter_returns_error() { unsafe { let chunks = make_chunks(0); - let err = ddog_trace_exporter_send_trace_chunks(None, Some(chunks), None); + let err = ddog_trace_exporter_send_trace_chunks(None, Some(chunks), None, None); assert!(err.is_some()); assert_eq!(err.as_ref().unwrap().code, ErrorCode::InvalidArgument); ddog_trace_exporter_error_free(err); @@ -687,4 +742,32 @@ mod tests { ddog_tracer_trace_chunks_free(chunks); } } + + // -- Cancellation token ------------------------------------------------- + + #[test] + fn cancel_token_new_and_drop() { + let token = ddog_trace_exporter_cancel_token_new(); + ddog_trace_exporter_cancel_token_drop(Some(token)); + } + + #[test] + fn cancel_token_cancel() { + let token = ddog_trace_exporter_cancel_token_new(); + ddog_trace_exporter_cancel_token_cancel(Some(&token)); + ddog_trace_exporter_cancel_token_drop(Some(token)); + } + + #[test] + fn send_trace_chunks_null_cancel_is_accepted() { + // Passing a null (None) cancel argument behaves like no cancellation. + unsafe { + let chunks = make_chunks(0); + let err = ddog_trace_exporter_send_trace_chunks(None, Some(chunks), None, None); + // exporter is None, so we get InvalidArgument, but no crash + // from the absent cancel argument. + assert!(err.is_some()); + ddog_trace_exporter_error_free(err); + } + } } diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 839c6b7247..3850d46215 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -52,6 +52,7 @@ use std::sync::{Arc, Once}; use std::time::Duration; use std::{borrow::Borrow, str::FromStr}; use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, warn}; const INFO_ENDPOINT: &str = "/info"; @@ -477,14 +478,39 @@ impl Tra /// Send a list of trace chunks to the agent (or OTLP endpoint when configured). /// /// Sync facade over [`Self::send_trace_chunks_async`]; panics inside an existing - /// tokio context. Returns the agent response (or `Unchanged` for OTLP). + /// tokio context. + /// + /// # Arguments + /// * trace_chunks: A list of trace chunks. Each trace chunk is a list of spans. + /// * cancellation_token: When provided, cancelling the token aborts the send while it is in + /// progress. The send only observes a token that is cancelled while the request is in-flight; + /// a token cancelled before this call returns immediately, and a token cancelled after the + /// send has already finished has no effect. Cancelling an in-flight send may cause the trace + /// chunks being sent to be lost. + /// + /// # Returns + /// * Ok(AgentResponse): The response from the agent (or Unchanged for OTLP) + /// * Err(TraceExporterError): An error detailing what went wrong in the process #[cfg(not(target_arch = "wasm32"))] pub fn send_trace_chunks( &self, trace_chunks: Vec>>, + cancellation_token: Option<&CancellationToken>, ) -> Result { - self.shared_runtime - .block_on(self.send_trace_chunks_async(trace_chunks))? + self.shared_runtime.block_on(async { + match cancellation_token { + Some(token) => { + tokio::select! { + res = self.send_trace_chunks_async(trace_chunks) => res, + _ = token.cancelled() => Err(TraceExporterError::Io(std::io::Error::new( + std::io::ErrorKind::Interrupted, + "send cancelled via cancellation token", + ))), + } + } + None => self.send_trace_chunks_async(trace_chunks).await, + } + })? } /// Send a list of trace chunks to the agent, asynchronously (or OTLP when configured). diff --git a/libdd-profiling-ffi/Cargo.toml b/libdd-profiling-ffi/Cargo.toml index bc2dccf5f1..e1b40f7063 100644 --- a/libdd-profiling-ffi/Cargo.toml +++ b/libdd-profiling-ffi/Cargo.toml @@ -18,7 +18,7 @@ name = "datadog_profiling_ffi" [features] default = ["ddcommon-ffi"] -cbindgen = ["build_common/cbindgen", "libdd-common-ffi/cbindgen"] +cbindgen = ["build_common/cbindgen", "libdd-common-ffi/cbindgen", "libdd-shared-runtime-ffi?/cbindgen"] ddtelemetry-ffi = ["dep:libdd-telemetry-ffi"] datadog-log-ffi = ["dep:libdd-log-ffi"] symbolizer = ["symbolizer-ffi"] @@ -33,6 +33,7 @@ datadog-library-config-ffi = ["dep:libdd-library-config-ffi"] ddcommon-ffi = ["dep:libdd-common-ffi"] ddsketch-ffi = ["dep:libdd-ddsketch-ffi"] datadog-ffe-ffi = ["dep:datadog-ffe-ffi"] +shared-runtime = ["dep:libdd-shared-runtime-ffi", "libdd-shared-runtime-ffi/catch_panic"] regex-lite = ["libdd-common/regex-lite"] [build-dependencies] @@ -60,3 +61,4 @@ symbolizer-ffi = { path = "../symbolizer-ffi", optional = true, default-features thiserror = "2" tokio-util = "0.7.1" datadog-ffe-ffi = { path = "../datadog-ffe-ffi", default-features = false, optional = true } +libdd-shared-runtime-ffi = { path = "../libdd-shared-runtime-ffi", default-features = false, optional = true } diff --git a/libdd-profiling-ffi/src/lib.rs b/libdd-profiling-ffi/src/lib.rs index aac70dab2c..2edd58dec2 100644 --- a/libdd-profiling-ffi/src/lib.rs +++ b/libdd-profiling-ffi/src/lib.rs @@ -50,6 +50,11 @@ pub use libdd_log_ffi::*; #[cfg(feature = "datadog-ffe-ffi")] pub use datadog_ffe_ffi; +// re-export shared-runtime ffi (fork-lifecycle management) +#[cfg(feature = "shared-runtime")] +#[allow(unused_imports)] +pub use libdd_shared_runtime_ffi::*; + // re-export tracer metadata functions #[cfg(feature = "ddcommon-ffi")] pub use libdd_common_ffi::*;