Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions libdd-data-pipeline/src/trace_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,19 @@ impl<T: Send + 'static> TraceBuffer<T> {
self.tx.trigger_flush()
}

/// Trigger a flush and synchronously wait for it to be processed by the worker.
///
/// Useful at shutdown to make sure the last batch has been handed off to the export
/// operation (and therefore any side effects like spawning the stats worker have
/// happened) before tearing down the runtime. Returns immediately if the buffer is
/// empty.
pub fn flush_and_wait(&self, timeout: Duration) -> Result<(), TraceBufferError> {
let Some(flush_gen) = self.tx.trigger_flush_and_capture_gen()? else {
return Ok(());
};
self.tx.wait_flush_done(flush_gen, Some(timeout))
}

pub fn queue_metrics(&self) -> QueueMetricsFetcher<T> {
QueueMetricsFetcher {
waiter: self.tx.waiter.clone(),
Expand All @@ -290,6 +303,19 @@ impl<T> fmt::Debug for TraceBuffer<T> {
}
}

impl<T> Drop for TraceBuffer<T> {
/// Best-effort flush so any buffered chunks are handed to the worker before the producer
/// end disappears. The worker itself is owned by the [`SharedRuntime`] and continues to
/// run independently — its own [`Worker::shutdown`] hook (invoked by `SharedRuntime`)
/// will drain whatever remains after this notify.
///
/// Errors are intentionally ignored: a `TraceBuffer` dropped after the runtime has
/// already been shut down has nothing useful to do here.
fn drop(&mut self) {
let _ = self.tx.trigger_flush();
}
}

pub struct QueueMetricsFetcher<T> {
waiter: Arc<Waiter<T>>,
}
Expand Down Expand Up @@ -416,6 +442,20 @@ impl<T> Sender<T> {
Ok(())
}

/// Trigger a flush and capture the batch generation that the worker must overtake
/// before the flush can be considered done. Returns `Ok(None)` if the batch is
/// currently empty (nothing to flush, no need to wait).
fn trigger_flush_and_capture_gen(&self) -> Result<Option<BatchGeneration>, TraceBufferError> {
let mut state = self.get_running_state()?;
if state.batch.span_count == 0 {
return Ok(None);
}
state.flush_needed = true;
let gen = state.batch.batch_gen;
self.waiter.notify_receiver(state);
Ok(Some(gen))
}

fn wait_shutdown_done(&self, timeout: Duration) -> Result<(), TraceBufferError> {
if timeout.is_zero() {
return Err(TraceBufferError::TimedOut(Duration::ZERO));
Expand Down Expand Up @@ -506,6 +546,16 @@ impl<T> Receiver<T> {
self.waiter.notify_sender(state);
Ok(())
}

/// Synchronously drain the current batch without waiting for a flush trigger.
///
/// Used during shutdown to recover any chunks that the sender accumulated but never had
/// the chance to flush (e.g. the worker loop was cancelled before the next timeout tick).
fn drain(&self) -> Result<Vec<TraceChunk<T>>, MutexPoisonedError> {
let mut state = self.lock_state()?;
state.flush_needed = false;
Ok(state.batch.export())
}
}

#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Default)]
Expand Down Expand Up @@ -693,6 +743,19 @@ impl<T: Send + Debug + 'static> Worker for TraceExporterWorker<T> {
}

async fn shutdown(&mut self) {
// Drain any chunks the sender has buffered but not yet flushed. Without this the
// final partial batch is silently dropped on shutdown — including the common case
// where a tokio app calls `tracer.shutdown()` immediately after producing spans.
match self.rx.drain() {
Ok(trace_chunks) if !trace_chunks.is_empty() => {
self.export_trace_chunks(trace_chunks).await;
let _ = self.rx.ack_export();
}
Ok(_) => {}
Err(MutexPoisonedError) => {
tracing::error!("TraceExporterWorker mailbox poisoned during shutdown drain");
}
}
let _ = self.rx.shutdown_done();
}

Expand Down Expand Up @@ -889,6 +952,69 @@ mod tests {
rt.shutdown(None).unwrap();
}

#[test]
#[cfg_attr(miri, ignore)]
fn test_shutdown_drains_pending_batch() {
// Set thresholds high enough that send_chunk alone never triggers a flush,
// and the timer long enough that it won't fire during the test. The only way
// the assert_export closure should be invoked is via the shutdown drain path.
let exported = Arc::new(std::sync::Mutex::new(Vec::<usize>::new()));
let exported_clone = exported.clone();
let (rt, _, sender) = make_buffer(
Box::new(move |chunks| {
let mut lengths = chunks.into_iter().map(|c| c.len()).collect::<Vec<_>>();
lengths.sort();
exported_clone.lock().unwrap().extend(lengths);
}),
TraceBufferConfig::default()
.max_buffered_spans(100)
.span_flush_threshold(100)
.max_flush_interval(Duration::from_secs(u32::MAX as u64)),
);

sender.send_chunk(vec![()]).unwrap();
sender.send_chunk(vec![(), ()]).unwrap();

// Shutdown must export the two buffered chunks even though no flush ever fired.
rt.shutdown(Some(Duration::from_secs(10))).unwrap();
sender.wait_shutdown_done(Duration::from_secs(10)).unwrap();

let exported = exported.lock().unwrap().clone();
assert_eq!(exported, vec![1, 2]);
}

#[test]
#[cfg_attr(miri, ignore)]
fn test_flush_and_wait() {
// Same setup as test_force_flush, but verify flush_and_wait blocks until the
// worker has actually processed the batch.
let (rt, sem, sender) = make_buffer(
Box::new(|chunks| assert_eq!(chunks.len(), 2)),
TraceBufferConfig::default()
.max_buffered_spans(100)
.span_flush_threshold(100)
.max_flush_interval(Duration::from_secs(u32::MAX as u64)),
);

sender.send_chunk(vec![()]).unwrap();
sender.send_chunk(vec![(), ()]).unwrap();
assert_eq!(sem.available_permits(), 0);

sender
.flush_and_wait(Duration::from_secs(10))
.expect("flush_and_wait failed");
// After flush_and_wait returns, the worker must have actually exported.
assert_eq!(sem.available_permits(), 1);

// Calling flush_and_wait on an empty buffer is a no-op and must not block.
sender
.flush_and_wait(Duration::from_secs(10))
.expect("flush_and_wait on empty buffer should not error");

rt.shutdown(None).unwrap();
sender.wait_shutdown_done(Duration::from_secs(10)).unwrap();
}

#[test]
#[cfg_attr(miri, ignore)]
fn test_force_flush() {
Expand Down Expand Up @@ -936,4 +1062,55 @@ mod tests {
assert_eq!(sender.queue_metrics().get_metrics().spans_queued, 2);
rt.shutdown(None).unwrap();
}

/// Regression coverage for the [`Drop`] impl on [`TraceBuffer`]: dropping the producer
/// after spans have been buffered (but without an explicit `force_flush`) should
/// trigger one final flush so the worker exports the pending chunks instead of losing
/// them.
#[test]
#[cfg_attr(miri, ignore)]
fn test_drop_triggers_flush() {
let exported = Arc::new(std::sync::Mutex::new(Vec::<usize>::new()));
let exported_clone = exported.clone();
let (rt, sem, sender) = make_buffer(
Box::new(move |chunks| {
let mut lengths = chunks.into_iter().map(|c| c.len()).collect::<Vec<_>>();
lengths.sort();
exported_clone.lock().unwrap().extend(lengths);
}),
TraceBufferConfig::default()
.max_buffered_spans(100)
.span_flush_threshold(100)
.max_flush_interval(Duration::from_secs(u32::MAX as u64)),
);

sender.send_chunk(vec![()]).unwrap();
sender.send_chunk(vec![(), ()]).unwrap();
assert_eq!(sem.available_permits(), 0);

// Drop the sender — no `force_flush`/`flush_and_wait` call — and verify the
// background worker still receives the pending chunks because Drop notifies it.
drop(sender);
let _ = rt.block_on(sem.acquire_many(1)).unwrap().unwrap();

let exported = exported.lock().unwrap().clone();
assert_eq!(exported, vec![1, 2]);
rt.shutdown(None).unwrap();
}

/// Dropping a [`TraceBuffer`] from inside a tokio runtime must not panic. The Drop
/// impl only does a non-blocking notify, so this is mostly a smoke test — but it
/// guards against accidentally introducing a blocking call (like `block_on` or a
/// `Condvar::wait`) in the future.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_drop_inside_tokio_runtime() {
let (rt, _sem, sender) = make_buffer(Box::new(|_chunks| {}), TraceBufferConfig::default());

// Send a chunk so the Drop has something to flush — exercises the full
// notify path under a tokio scheduler.
sender.send_chunk(vec![()]).unwrap();
drop(sender);

rt.shutdown(None).unwrap();
}
}
9 changes: 8 additions & 1 deletion libdd-data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,14 @@ impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> Tra
pub fn send_trace_chunks<T: TraceData>(
&self,
trace_chunks: Vec<Vec<Span<T>>>,
) -> Result<AgentResponse, TraceExporterError> {
) -> Result<AgentResponse, TraceExporterError>
where
// The runtime-aware `SharedRuntime::block_on` requires a `Send` future. All concrete
// `TraceData` implementations have `Send` associated types, so this bound is satisfied
// by every in-tree caller.
T::Text: Send,
T::Bytes: Send,
{
self.check_agent_info();
self.shared_runtime
.block_on(async { self.send_trace_chunks_inner(trace_chunks).await })?
Expand Down
Loading
Loading