From 1e0908e64b5a22843e97875815b4a244f7bf9821 Mon Sep 17 00:00:00 2001 From: iunanua Date: Thu, 14 May 2026 14:12:24 +0200 Subject: [PATCH 1/3] fix(shared-runtime): fork recovery, shutdown, runtime_handle Three issues: - after_fork_parent / after_fork_child used `?` inside the restart loop, so a single InvalidState worker silenced every other component in the fork child. Now log-and-continue, matching before_fork. - spawn_worker after shutdown silently registered a dead worker: there was no shutdown state distinct from "no runtime yet". Add `shutdown: AtomicBool`, set it in shutdown / shutdown_async, and check it under the workers lock in spawn_worker so the during-shutdown race is also covered. New SharedRuntimeError::AlreadyShutdown + FFI mapping. - runtime_handle() handed callers an owned tokio Handle that bypassed every fork-safety guarantee the crate exists to provide. Replace with with_runtime_context(closure), which scopes the entered runtime to a synchronous closure; migrate the lone production caller in the trace exporter builder. Regression tests for the first two are un-ignored and assert the correct behavior. The spawn_worker/before_fork stress test for the previously-fixed TOCTOU stays #[ignore]'d for regression hunting. Co-Authored-By: Claude Opus 4.7 --- .../src/trace_exporter/builder.rs | 15 +- .../src/shared_runtime.rs | 3 + .../src/shared_runtime/mod.rs | 154 ++++++++++++++++-- 3 files changed, 152 insertions(+), 20 deletions(-) diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index bd157abe8d..9b0e2d0c39 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -320,17 +320,14 @@ impl TraceExporterBuilder { let libdatadog_version = tag!("libdatadog_version", env!("CARGO_PKG_VERSION")); // On native, `C::new_client()` may capture `tokio::runtime::Handle::current()` - // internally (e.g. `NativeCapabilities`). Enter the SharedRuntime's tokio context - // so that handle is available. On wasm this is a no-op — the JS event loop is - // always the implicit executor. - #[cfg(not(target_arch = "wasm32"))] - let _guard = shared_runtime - .runtime_handle() + // internally (e.g. `NativeCapabilities`). Run it inside the SharedRuntime's + // tokio context so that handle is available. On wasm this is a no-op — the + // JS event loop is always the implicit executor. + let capabilities = shared_runtime + .with_runtime_context(C::new_client) .map_err(|e| { TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(e.to_string())) - })? - .enter(); - let capabilities = C::new_client(); + })?; // --- Platform-specific worker setup --- // The blocks below spawn background workers via `SharedRuntime`. On diff --git a/libdd-shared-runtime-ffi/src/shared_runtime.rs b/libdd-shared-runtime-ffi/src/shared_runtime.rs index 7a9a0ac084..73bf27968b 100644 --- a/libdd-shared-runtime-ffi/src/shared_runtime.rs +++ b/libdd-shared-runtime-ffi/src/shared_runtime.rs @@ -15,6 +15,8 @@ pub enum SharedRuntimeErrorCode { InvalidArgument, /// The runtime is not available or in an invalid state. RuntimeUnavailable, + /// Operation rejected because the runtime has already been shut down. + AlreadyShutdown, /// Failed to acquire a lock on internal state. LockFailed, /// A worker operation failed. @@ -50,6 +52,7 @@ impl From for SharedRuntimeFFIError { fn from(err: SharedRuntimeError) -> Self { let code = match &err { SharedRuntimeError::RuntimeUnavailable => SharedRuntimeErrorCode::RuntimeUnavailable, + SharedRuntimeError::AlreadyShutdown => SharedRuntimeErrorCode::AlreadyShutdown, SharedRuntimeError::LockFailed(_) => SharedRuntimeErrorCode::LockFailed, SharedRuntimeError::WorkerError(_) => SharedRuntimeErrorCode::WorkerError, SharedRuntimeError::RuntimeCreation(_) => SharedRuntimeErrorCode::RuntimeCreation, diff --git a/libdd-shared-runtime/src/shared_runtime/mod.rs b/libdd-shared-runtime/src/shared_runtime/mod.rs index 058bf730a5..998d5062e1 100644 --- a/libdd-shared-runtime/src/shared_runtime/mod.rs +++ b/libdd-shared-runtime/src/shared_runtime/mod.rs @@ -14,7 +14,7 @@ use crate::worker::Worker; use futures::stream::{FuturesUnordered, StreamExt}; use libdd_common::MutexExt; use pausable_worker::{PausableWorker, PausableWorkerError}; -use std::sync::atomic::AtomicU64; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::{fmt, io}; use tracing::{debug, error}; @@ -26,7 +26,6 @@ use tracing::{debug, error}; mod native { use super::*; use pausable_worker::tokio_spawn_fn; - use std::sync::atomic::Ordering; use tokio::runtime::{Builder, Runtime}; fn build_runtime() -> Result { @@ -42,21 +41,39 @@ mod native { runtime: Arc::new(Mutex::new(Some(Arc::new(build_runtime()?)))), workers: Arc::new(Mutex::new(Vec::new())), next_worker_id: AtomicU64::new(1), + shutdown: AtomicBool::new(false), }) } - /// Returns a clone of the tokio runtime handle managed by this SharedRuntime. + /// Run `f` with the shared tokio runtime entered as the current context. + /// + /// Useful for synchronous initialization that calls into + /// [`tokio::runtime::Handle::current()`] (e.g., constructing an HTTP + /// client that captures the current handle internally). + /// + /// # Fork safety + /// Tasks spawned via `tokio::spawn` / `Handle::current().spawn(...)` + /// inside `f` are NOT tracked by `SharedRuntime`: they will not be + /// paused before fork, restarted after fork, or shut down by + /// [`Self::shutdown`]. For background work, register a + /// [`crate::Worker`] via [`Self::spawn_worker`] instead. /// /// # Errors - /// Returns [`SharedRuntimeError::RuntimeUnavailable`] if the runtime has been shut down. - pub fn runtime_handle(&self) -> Result { - Ok(self + /// Returns [`SharedRuntimeError::RuntimeUnavailable`] if the runtime + /// has been shut down or is in a fork window. + pub fn with_runtime_context(&self, f: F) -> Result + where + F: FnOnce() -> T, + { + let handle = self .runtime .lock_or_panic() .as_ref() .ok_or(SharedRuntimeError::RuntimeUnavailable)? .handle() - .clone()) + .clone(); + let _guard = handle.enter(); + Ok(f()) } /// Spawn a PausableWorker on this runtime. @@ -89,6 +106,14 @@ mod native { let runtime_guard = self.runtime.lock_or_panic(); let mut workers_guard = self.workers.lock_or_panic(); + // Reject post-shutdown spawns under the workers lock — this is the + // same lock `shutdown_async` acquires before draining, so once + // shutdown wins the workers lock, every subsequent spawn observes + // the flag and bails instead of silently registering a dead worker. + if self.shutdown.load(Ordering::Acquire) { + return Err(SharedRuntimeError::AlreadyShutdown); + } + if let Some(rt) = runtime_guard.as_ref() { if let Err(e) = pausable_worker.start(tokio_spawn_fn(rt.handle())) { return Err(e.into()); @@ -165,8 +190,17 @@ mod native { let mut workers_lock = self.workers.lock_or_panic(); + // Log-and-continue: a single worker in `InvalidState` (e.g. its + // previous task was aborted) must not abort the whole restart + // loop and leave every other component dead. This matches the + // failure-tolerance pattern already used by `before_fork`. for worker_entry in workers_lock.iter_mut() { - worker_entry.worker.start(tokio_spawn_fn(&handle))?; + if let Err(e) = worker_entry.worker.start(tokio_spawn_fn(&handle)) { + error!( + worker_id = worker_entry.id, + "Worker failed to restart after fork in parent: {:?}", e + ); + } } Ok(()) @@ -196,9 +230,17 @@ mod native { workers_lock.retain(|entry| entry.restart_on_fork); + // Log-and-continue: see `after_fork_parent`. In the child this + // matters even more — a single InvalidState worker must not + // silence every other component for the lifetime of the process. for worker_entry in workers_lock.iter_mut() { worker_entry.worker.reset(); - worker_entry.worker.start(tokio_spawn_fn(&handle))?; + if let Err(e) = worker_entry.worker.start(tokio_spawn_fn(&handle)) { + error!( + worker_id = worker_entry.id, + "Worker failed to restart after fork in child: {:?}", e + ); + } } Ok(()) @@ -234,6 +276,7 @@ mod native { timeout: Option, ) -> Result<(), SharedRuntimeError> { debug!(?timeout, "Shutting down SharedRuntime"); + self.shutdown.store(true, Ordering::Release); match self.runtime.lock_or_panic().take() { Some(runtime) => { if let Some(timeout) = timeout { @@ -337,6 +380,8 @@ impl WorkerHandle { pub enum SharedRuntimeError { /// The runtime is not available or in an invalid state. RuntimeUnavailable, + /// Operation rejected because the runtime has already been shut down. + AlreadyShutdown, /// Failed to acquire a lock on internal state. LockFailed(String), /// A worker operation failed. @@ -353,6 +398,7 @@ impl fmt::Display for SharedRuntimeError { Self::RuntimeUnavailable => { write!(f, "Runtime is not available or in an invalid state") } + Self::AlreadyShutdown => write!(f, "Runtime has already been shut down"), Self::LockFailed(msg) => write!(f, "Failed to acquire lock: {}", msg), Self::WorkerError(err) => write!(f, "Worker error: {}", err), Self::RuntimeCreation(err) => { @@ -397,6 +443,10 @@ pub struct SharedRuntime { runtime: Arc>>>, workers: Arc>>, next_worker_id: AtomicU64, + /// Set once `shutdown` / `shutdown_async` is called. After this point + /// `spawn_worker` rejects with `AlreadyShutdown` instead of silently + /// registering a worker that will never run. + shutdown: AtomicBool, } impl SharedRuntime { @@ -419,6 +469,7 @@ impl SharedRuntime { Ok(Self { workers: Arc::new(Mutex::new(Vec::new())), next_worker_id: AtomicU64::new(1), + shutdown: AtomicBool::new(false), }) } } @@ -430,14 +481,16 @@ impl SharedRuntime { worker: T, restart_on_fork: bool, ) -> Result { - use std::sync::atomic::Ordering; - let boxed_worker: BoxedWorker = Box::new(worker); debug!(?boxed_worker, "Spawning worker on SharedRuntime"); let mut pausable_worker = PausableWorker::new(boxed_worker); let mut workers_guard = self.workers.lock_or_panic(); + if self.shutdown.load(Ordering::Acquire) { + return Err(SharedRuntimeError::AlreadyShutdown); + } + if let Err(e) = pausable_worker.start(|future| { use futures_util::FutureExt; let (remote, handle) = future.remote_handle(); @@ -461,6 +514,17 @@ impl SharedRuntime { }) } + /// On wasm32, [`Self::with_runtime_context`] is a no-op — the JS event + /// loop is the implicit executor, so there is no tokio context to enter. + /// The closure is invoked unchanged so callers can be platform-agnostic. + #[cfg(target_arch = "wasm32")] + pub fn with_runtime_context(&self, f: F) -> Result + where + F: FnOnce() -> T, + { + Ok(f()) + } + /// Shutdown all workers asynchronously. /// /// This should be called during application shutdown to cleanly stop all @@ -469,6 +533,7 @@ impl SharedRuntime { /// Worker errors are logged but do not cause the function to fail. pub async fn shutdown_async(&self) { debug!("Shutting down all workers asynchronously"); + self.shutdown.store(true, Ordering::Release); let workers = { let mut workers_lock = self.workers.lock_or_panic(); std::mem::take(&mut *workers_lock) @@ -696,4 +761,71 @@ mod tests { "worker should not run or shut down after fork in child when restart_on_fork is false" ); } + + /// A single `PausableWorker` in `InvalidState` must + /// not abort the whole restart loop in `after_fork_parent` / + /// `after_fork_child`. The bad entry is logged and skipped; every + /// other worker still resumes after fork. + #[test] + fn after_fork_parent_skips_invalid_state_workers() { + let shared_runtime = SharedRuntime::new().unwrap(); + + let (good, good_rx) = make_test_worker(); + let _ = shared_runtime.spawn_worker(good, true).unwrap(); + + // Second worker — we'll corrupt its entry into InvalidState below, + // simulating a previously-aborted task. + let (bad, _bad_rx) = make_test_worker(); + let _ = shared_runtime.spawn_worker(bad, true).unwrap(); + + good_rx + .recv_timeout(Duration::from_secs(1)) + .expect("good worker did not run before fork"); + + { + let mut workers = shared_runtime.workers.lock_or_panic(); + workers[1].worker = PausableWorker::InvalidState; + } + + shared_runtime.before_fork(); + while good_rx.try_recv().is_ok() {} + + let result = shared_runtime.after_fork_parent(); + + assert!( + result.is_ok(), + "after_fork_parent should not bail on a single InvalidState worker" + ); + assert!( + good_rx.recv_timeout(Duration::from_secs(1)).is_ok(), + "good worker should resume after fork even if a peer is InvalidState" + ); + } + + /// `spawn_worker` after `shutdown` must reject with + /// `AlreadyShutdown` rather than silently registering a worker that + /// will never run. The shutdown state is observed under the workers + /// lock so the same guarantee holds against the during-shutdown race. + #[test] + fn spawn_worker_after_shutdown_should_be_rejected() { + let shared_runtime = SharedRuntime::new().unwrap(); + shared_runtime.shutdown(None).unwrap(); + + let (worker, rx) = make_test_worker(); + let result = shared_runtime.spawn_worker(worker, true); + + assert!( + matches!(result, Err(SharedRuntimeError::AlreadyShutdown)), + "spawn_worker after shutdown should return AlreadyShutdown, got {result:?}" + ); + assert_eq!( + shared_runtime.workers.lock_or_panic().len(), + 0, + "no dead worker should be registered" + ); + assert!( + rx.recv_timeout(Duration::from_millis(200)).is_err(), + "no worker should be running after shutdown" + ); + } } From 6f5534690321c8fad81157f097100f8791e7ff22 Mon Sep 17 00:00:00 2001 From: iunanua Date: Thu, 14 May 2026 16:15:31 +0200 Subject: [PATCH 2/3] fix(shared-runtime-ffi): pin error code discriminants for ABI stability The previous commit inserted AlreadyShutdown between RuntimeUnavailable and LockFailed, which silently renumbered every subsequent variant of the #[repr(C)] enum. Callers compiled against the older generated header would then misclassify error codes after upgrading the shared library (e.g. interpret AlreadyShutdown as LockFailed). Pin every discriminant explicitly and append AlreadyShutdown with value 7, restoring the original numeric values for all pre-existing variants. Add a doc note so future variants are appended with a fresh value rather than inserted. Caught by Codex on PR #1986. Co-Authored-By: Claude Opus 4.7 --- .../src/shared_runtime.rs | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/libdd-shared-runtime-ffi/src/shared_runtime.rs b/libdd-shared-runtime-ffi/src/shared_runtime.rs index 73bf27968b..0eb74ad089 100644 --- a/libdd-shared-runtime-ffi/src/shared_runtime.rs +++ b/libdd-shared-runtime-ffi/src/shared_runtime.rs @@ -8,26 +8,33 @@ use std::ptr::NonNull; use std::sync::Arc; /// Error codes for SharedRuntime FFI operations. +/// +/// # ABI stability +/// Discriminants are pinned explicitly. The numeric values are part of the +/// C ABI: existing variants must never be renumbered or reused, and new +/// variants must be appended with a fresh value. Inserting a variant in +/// the middle of the list silently misclassifies errors on any caller +/// compiled against an older header. #[repr(C)] #[derive(Copy, Clone, Debug, PartialEq)] pub enum SharedRuntimeErrorCode { /// Invalid argument provided (e.g. null handle). - InvalidArgument, + InvalidArgument = 0, /// The runtime is not available or in an invalid state. - RuntimeUnavailable, - /// Operation rejected because the runtime has already been shut down. - AlreadyShutdown, + RuntimeUnavailable = 1, /// Failed to acquire a lock on internal state. - LockFailed, + LockFailed = 2, /// A worker operation failed. - WorkerError, + WorkerError = 3, /// Failed to create the tokio runtime. - RuntimeCreation, + RuntimeCreation = 4, /// Shutdown timed out. - ShutdownTimedOut, + ShutdownTimedOut = 5, /// An unexpected panic occurred inside the FFI call. #[cfg(feature = "catch_panic")] - Panic, + Panic = 6, + /// Operation rejected because the runtime has already been shut down. + AlreadyShutdown = 7, } /// Error returned by SharedRuntime FFI functions. From 6e95bcaf92b84e3f3d8eda6390a0d36d10a0cce0 Mon Sep 17 00:00:00 2001 From: iunanua Date: Fri, 15 May 2026 10:13:18 +0200 Subject: [PATCH 3/3] Cleanup --- .../src/shared_runtime.rs | 23 +++++++------------ .../src/shared_runtime/mod.rs | 15 +----------- 2 files changed, 9 insertions(+), 29 deletions(-) diff --git a/libdd-shared-runtime-ffi/src/shared_runtime.rs b/libdd-shared-runtime-ffi/src/shared_runtime.rs index 0eb74ad089..e7d37ad6a8 100644 --- a/libdd-shared-runtime-ffi/src/shared_runtime.rs +++ b/libdd-shared-runtime-ffi/src/shared_runtime.rs @@ -8,33 +8,26 @@ use std::ptr::NonNull; use std::sync::Arc; /// Error codes for SharedRuntime FFI operations. -/// -/// # ABI stability -/// Discriminants are pinned explicitly. The numeric values are part of the -/// C ABI: existing variants must never be renumbered or reused, and new -/// variants must be appended with a fresh value. Inserting a variant in -/// the middle of the list silently misclassifies errors on any caller -/// compiled against an older header. #[repr(C)] #[derive(Copy, Clone, Debug, PartialEq)] pub enum SharedRuntimeErrorCode { /// Invalid argument provided (e.g. null handle). - InvalidArgument = 0, + InvalidArgument, /// The runtime is not available or in an invalid state. - RuntimeUnavailable = 1, + RuntimeUnavailable, /// Failed to acquire a lock on internal state. - LockFailed = 2, + LockFailed, /// A worker operation failed. - WorkerError = 3, + WorkerError, /// Failed to create the tokio runtime. - RuntimeCreation = 4, + RuntimeCreation, /// Shutdown timed out. - ShutdownTimedOut = 5, + ShutdownTimedOut, /// An unexpected panic occurred inside the FFI call. #[cfg(feature = "catch_panic")] - Panic = 6, + Panic, /// Operation rejected because the runtime has already been shut down. - AlreadyShutdown = 7, + AlreadyShutdown, } /// Error returned by SharedRuntime FFI functions. diff --git a/libdd-shared-runtime/src/shared_runtime/mod.rs b/libdd-shared-runtime/src/shared_runtime/mod.rs index 998d5062e1..48743f988e 100644 --- a/libdd-shared-runtime/src/shared_runtime/mod.rs +++ b/libdd-shared-runtime/src/shared_runtime/mod.rs @@ -106,10 +106,6 @@ mod native { let runtime_guard = self.runtime.lock_or_panic(); let mut workers_guard = self.workers.lock_or_panic(); - // Reject post-shutdown spawns under the workers lock — this is the - // same lock `shutdown_async` acquires before draining, so once - // shutdown wins the workers lock, every subsequent spawn observes - // the flag and bails instead of silently registering a dead worker. if self.shutdown.load(Ordering::Acquire) { return Err(SharedRuntimeError::AlreadyShutdown); } @@ -190,10 +186,6 @@ mod native { let mut workers_lock = self.workers.lock_or_panic(); - // Log-and-continue: a single worker in `InvalidState` (e.g. its - // previous task was aborted) must not abort the whole restart - // loop and leave every other component dead. This matches the - // failure-tolerance pattern already used by `before_fork`. for worker_entry in workers_lock.iter_mut() { if let Err(e) = worker_entry.worker.start(tokio_spawn_fn(&handle)) { error!( @@ -230,9 +222,6 @@ mod native { workers_lock.retain(|entry| entry.restart_on_fork); - // Log-and-continue: see `after_fork_parent`. In the child this - // matters even more — a single InvalidState worker must not - // silence every other component for the lifetime of the process. for worker_entry in workers_lock.iter_mut() { worker_entry.worker.reset(); if let Err(e) = worker_entry.worker.start(tokio_spawn_fn(&handle)) { @@ -443,9 +432,7 @@ pub struct SharedRuntime { runtime: Arc>>>, workers: Arc>>, next_worker_id: AtomicU64, - /// Set once `shutdown` / `shutdown_async` is called. After this point - /// `spawn_worker` rejects with `AlreadyShutdown` instead of silently - /// registering a worker that will never run. + /// Set once `shutdown` / `shutdown_async` is called shutdown: AtomicBool, }