diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index bd157abe8d..9b0e2d0c39 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -320,17 +320,14 @@ impl TraceExporterBuilder { let libdatadog_version = tag!("libdatadog_version", env!("CARGO_PKG_VERSION")); // On native, `C::new_client()` may capture `tokio::runtime::Handle::current()` - // internally (e.g. `NativeCapabilities`). Enter the SharedRuntime's tokio context - // so that handle is available. On wasm this is a no-op — the JS event loop is - // always the implicit executor. - #[cfg(not(target_arch = "wasm32"))] - let _guard = shared_runtime - .runtime_handle() + // internally (e.g. `NativeCapabilities`). Run it inside the SharedRuntime's + // tokio context so that handle is available. On wasm this is a no-op — the + // JS event loop is always the implicit executor. + let capabilities = shared_runtime + .with_runtime_context(C::new_client) .map_err(|e| { TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(e.to_string())) - })? - .enter(); - let capabilities = C::new_client(); + })?; // --- Platform-specific worker setup --- // The blocks below spawn background workers via `SharedRuntime`. On diff --git a/libdd-shared-runtime-ffi/src/shared_runtime.rs b/libdd-shared-runtime-ffi/src/shared_runtime.rs index 7a9a0ac084..e7d37ad6a8 100644 --- a/libdd-shared-runtime-ffi/src/shared_runtime.rs +++ b/libdd-shared-runtime-ffi/src/shared_runtime.rs @@ -26,6 +26,8 @@ pub enum SharedRuntimeErrorCode { /// An unexpected panic occurred inside the FFI call. #[cfg(feature = "catch_panic")] Panic, + /// Operation rejected because the runtime has already been shut down. + AlreadyShutdown, } /// Error returned by SharedRuntime FFI functions. @@ -50,6 +52,7 @@ impl From for SharedRuntimeFFIError { fn from(err: SharedRuntimeError) -> Self { let code = match &err { SharedRuntimeError::RuntimeUnavailable => SharedRuntimeErrorCode::RuntimeUnavailable, + SharedRuntimeError::AlreadyShutdown => SharedRuntimeErrorCode::AlreadyShutdown, SharedRuntimeError::LockFailed(_) => SharedRuntimeErrorCode::LockFailed, SharedRuntimeError::WorkerError(_) => SharedRuntimeErrorCode::WorkerError, SharedRuntimeError::RuntimeCreation(_) => SharedRuntimeErrorCode::RuntimeCreation, diff --git a/libdd-shared-runtime/src/shared_runtime/mod.rs b/libdd-shared-runtime/src/shared_runtime/mod.rs index 058bf730a5..48743f988e 100644 --- a/libdd-shared-runtime/src/shared_runtime/mod.rs +++ b/libdd-shared-runtime/src/shared_runtime/mod.rs @@ -14,7 +14,7 @@ use crate::worker::Worker; use futures::stream::{FuturesUnordered, StreamExt}; use libdd_common::MutexExt; use pausable_worker::{PausableWorker, PausableWorkerError}; -use std::sync::atomic::AtomicU64; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::{fmt, io}; use tracing::{debug, error}; @@ -26,7 +26,6 @@ use tracing::{debug, error}; mod native { use super::*; use pausable_worker::tokio_spawn_fn; - use std::sync::atomic::Ordering; use tokio::runtime::{Builder, Runtime}; fn build_runtime() -> Result { @@ -42,21 +41,39 @@ mod native { runtime: Arc::new(Mutex::new(Some(Arc::new(build_runtime()?)))), workers: Arc::new(Mutex::new(Vec::new())), next_worker_id: AtomicU64::new(1), + shutdown: AtomicBool::new(false), }) } - /// Returns a clone of the tokio runtime handle managed by this SharedRuntime. + /// Run `f` with the shared tokio runtime entered as the current context. + /// + /// Useful for synchronous initialization that calls into + /// [`tokio::runtime::Handle::current()`] (e.g., constructing an HTTP + /// client that captures the current handle internally). + /// + /// # Fork safety + /// Tasks spawned via `tokio::spawn` / `Handle::current().spawn(...)` + /// inside `f` are NOT tracked by `SharedRuntime`: they will not be + /// paused before fork, restarted after fork, or shut down by + /// [`Self::shutdown`]. For background work, register a + /// [`crate::Worker`] via [`Self::spawn_worker`] instead. /// /// # Errors - /// Returns [`SharedRuntimeError::RuntimeUnavailable`] if the runtime has been shut down. - pub fn runtime_handle(&self) -> Result { - Ok(self + /// Returns [`SharedRuntimeError::RuntimeUnavailable`] if the runtime + /// has been shut down or is in a fork window. + pub fn with_runtime_context(&self, f: F) -> Result + where + F: FnOnce() -> T, + { + let handle = self .runtime .lock_or_panic() .as_ref() .ok_or(SharedRuntimeError::RuntimeUnavailable)? .handle() - .clone()) + .clone(); + let _guard = handle.enter(); + Ok(f()) } /// Spawn a PausableWorker on this runtime. @@ -89,6 +106,10 @@ mod native { let runtime_guard = self.runtime.lock_or_panic(); let mut workers_guard = self.workers.lock_or_panic(); + if self.shutdown.load(Ordering::Acquire) { + return Err(SharedRuntimeError::AlreadyShutdown); + } + if let Some(rt) = runtime_guard.as_ref() { if let Err(e) = pausable_worker.start(tokio_spawn_fn(rt.handle())) { return Err(e.into()); @@ -166,7 +187,12 @@ mod native { let mut workers_lock = self.workers.lock_or_panic(); for worker_entry in workers_lock.iter_mut() { - worker_entry.worker.start(tokio_spawn_fn(&handle))?; + if let Err(e) = worker_entry.worker.start(tokio_spawn_fn(&handle)) { + error!( + worker_id = worker_entry.id, + "Worker failed to restart after fork in parent: {:?}", e + ); + } } Ok(()) @@ -198,7 +224,12 @@ mod native { for worker_entry in workers_lock.iter_mut() { worker_entry.worker.reset(); - worker_entry.worker.start(tokio_spawn_fn(&handle))?; + if let Err(e) = worker_entry.worker.start(tokio_spawn_fn(&handle)) { + error!( + worker_id = worker_entry.id, + "Worker failed to restart after fork in child: {:?}", e + ); + } } Ok(()) @@ -234,6 +265,7 @@ mod native { timeout: Option, ) -> Result<(), SharedRuntimeError> { debug!(?timeout, "Shutting down SharedRuntime"); + self.shutdown.store(true, Ordering::Release); match self.runtime.lock_or_panic().take() { Some(runtime) => { if let Some(timeout) = timeout { @@ -337,6 +369,8 @@ impl WorkerHandle { pub enum SharedRuntimeError { /// The runtime is not available or in an invalid state. RuntimeUnavailable, + /// Operation rejected because the runtime has already been shut down. + AlreadyShutdown, /// Failed to acquire a lock on internal state. LockFailed(String), /// A worker operation failed. @@ -353,6 +387,7 @@ impl fmt::Display for SharedRuntimeError { Self::RuntimeUnavailable => { write!(f, "Runtime is not available or in an invalid state") } + Self::AlreadyShutdown => write!(f, "Runtime has already been shut down"), Self::LockFailed(msg) => write!(f, "Failed to acquire lock: {}", msg), Self::WorkerError(err) => write!(f, "Worker error: {}", err), Self::RuntimeCreation(err) => { @@ -397,6 +432,8 @@ pub struct SharedRuntime { runtime: Arc>>>, workers: Arc>>, next_worker_id: AtomicU64, + /// Set once `shutdown` / `shutdown_async` is called + shutdown: AtomicBool, } impl SharedRuntime { @@ -419,6 +456,7 @@ impl SharedRuntime { Ok(Self { workers: Arc::new(Mutex::new(Vec::new())), next_worker_id: AtomicU64::new(1), + shutdown: AtomicBool::new(false), }) } } @@ -430,14 +468,16 @@ impl SharedRuntime { worker: T, restart_on_fork: bool, ) -> Result { - use std::sync::atomic::Ordering; - let boxed_worker: BoxedWorker = Box::new(worker); debug!(?boxed_worker, "Spawning worker on SharedRuntime"); let mut pausable_worker = PausableWorker::new(boxed_worker); let mut workers_guard = self.workers.lock_or_panic(); + if self.shutdown.load(Ordering::Acquire) { + return Err(SharedRuntimeError::AlreadyShutdown); + } + if let Err(e) = pausable_worker.start(|future| { use futures_util::FutureExt; let (remote, handle) = future.remote_handle(); @@ -461,6 +501,17 @@ impl SharedRuntime { }) } + /// On wasm32, [`Self::with_runtime_context`] is a no-op — the JS event + /// loop is the implicit executor, so there is no tokio context to enter. + /// The closure is invoked unchanged so callers can be platform-agnostic. + #[cfg(target_arch = "wasm32")] + pub fn with_runtime_context(&self, f: F) -> Result + where + F: FnOnce() -> T, + { + Ok(f()) + } + /// Shutdown all workers asynchronously. /// /// This should be called during application shutdown to cleanly stop all @@ -469,6 +520,7 @@ impl SharedRuntime { /// Worker errors are logged but do not cause the function to fail. pub async fn shutdown_async(&self) { debug!("Shutting down all workers asynchronously"); + self.shutdown.store(true, Ordering::Release); let workers = { let mut workers_lock = self.workers.lock_or_panic(); std::mem::take(&mut *workers_lock) @@ -696,4 +748,71 @@ mod tests { "worker should not run or shut down after fork in child when restart_on_fork is false" ); } + + /// A single `PausableWorker` in `InvalidState` must + /// not abort the whole restart loop in `after_fork_parent` / + /// `after_fork_child`. The bad entry is logged and skipped; every + /// other worker still resumes after fork. + #[test] + fn after_fork_parent_skips_invalid_state_workers() { + let shared_runtime = SharedRuntime::new().unwrap(); + + let (good, good_rx) = make_test_worker(); + let _ = shared_runtime.spawn_worker(good, true).unwrap(); + + // Second worker — we'll corrupt its entry into InvalidState below, + // simulating a previously-aborted task. + let (bad, _bad_rx) = make_test_worker(); + let _ = shared_runtime.spawn_worker(bad, true).unwrap(); + + good_rx + .recv_timeout(Duration::from_secs(1)) + .expect("good worker did not run before fork"); + + { + let mut workers = shared_runtime.workers.lock_or_panic(); + workers[1].worker = PausableWorker::InvalidState; + } + + shared_runtime.before_fork(); + while good_rx.try_recv().is_ok() {} + + let result = shared_runtime.after_fork_parent(); + + assert!( + result.is_ok(), + "after_fork_parent should not bail on a single InvalidState worker" + ); + assert!( + good_rx.recv_timeout(Duration::from_secs(1)).is_ok(), + "good worker should resume after fork even if a peer is InvalidState" + ); + } + + /// `spawn_worker` after `shutdown` must reject with + /// `AlreadyShutdown` rather than silently registering a worker that + /// will never run. The shutdown state is observed under the workers + /// lock so the same guarantee holds against the during-shutdown race. + #[test] + fn spawn_worker_after_shutdown_should_be_rejected() { + let shared_runtime = SharedRuntime::new().unwrap(); + shared_runtime.shutdown(None).unwrap(); + + let (worker, rx) = make_test_worker(); + let result = shared_runtime.spawn_worker(worker, true); + + assert!( + matches!(result, Err(SharedRuntimeError::AlreadyShutdown)), + "spawn_worker after shutdown should return AlreadyShutdown, got {result:?}" + ); + assert_eq!( + shared_runtime.workers.lock_or_panic().len(), + 0, + "no dead worker should be registered" + ); + assert!( + rx.recv_timeout(Duration::from_millis(200)).is_err(), + "no worker should be running after shutdown" + ); + } }