From 6c4702b7e5860eddb521a5aa5a4b1649ccb77e83 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Wed, 6 May 2026 21:13:49 +0200 Subject: [PATCH] feat(trace_buffer): flush based on size of chunks in bytes # Motivation When we buffer data, we want to put a bound on the memory we are going to use. Spans can contain a varying amount of data depending on the workload instrumented. For this reason, it is better to trigger flush and limit the trace buffer based on the amount of data stored, not the number of spans. This of doesn't take into account that a lot of payloads contain duplicate string that will be interned if a user uses v0.5 or v1 encoding, but it would make this a lot harder to compute # Changes * Add a BufferSize trait, with a byte_size method returning the number of bytes each item in the queue allocates * Use this method to compute the total size of each trace-chunk * Use the total size in bytes in the queue to flush and limit the size of the buffer --- libdd-data-pipeline/benches/trace_buffer.rs | 90 +++++++--- libdd-data-pipeline/src/trace_buffer/mod.rs | 173 ++++++++++++++------ 2 files changed, 193 insertions(+), 70 deletions(-) diff --git a/libdd-data-pipeline/benches/trace_buffer.rs b/libdd-data-pipeline/benches/trace_buffer.rs index 5f180a28f1..bce4e26deb 100644 --- a/libdd-data-pipeline/benches/trace_buffer.rs +++ b/libdd-data-pipeline/benches/trace_buffer.rs @@ -1,30 +1,64 @@ // Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput}; use libdd_data_pipeline::trace_buffer::{Export, TraceBuffer, TraceBufferConfig, TraceChunk}; use libdd_data_pipeline::trace_exporter::{ agent_response::AgentResponse, error::TraceExporterError, }; use libdd_shared_runtime::SharedRuntime; - -type Span = [u8; 100]; +use libdd_tinybytes::BytesString; +use libdd_trace_utils::span::v04::SpanBytes; // Number of chunks each sender thread sends per benchmark iteration. const CHUNKS_PER_SENDER: usize = 900; +fn bs(s: &'static str) -> BytesString { + BytesString::from_static(s) +} + +fn make_span() -> SpanBytes { + SpanBytes { + service: bs("my-web-service"), + name: bs("http.request"), + resource: bs("GET /api/v1/users"), + r#type: bs("web"), + trace_id: 1_234_567_890_123_456_789_u128, + span_id: 987_654_321_u64, + parent_id: 0, + start: 1_700_000_000_000_000_000_i64, + duration: 5_000_000_i64, + error: 0, + meta: HashMap::from_iter([ + (bs("env"), bs("prod")), + (bs("version"), bs("1.0.0")), + (bs("http.method"), bs("GET")), + (bs("http.url"), bs("/api/v1/users")), + (bs("peer.service"), bs("users-service")), + ]), + metrics: HashMap::from_iter([ + (bs("_sampling_priority_v1"), 1.0_f64), + (bs("_dd.agent_psr"), 1.0_f64), + ]), + meta_struct: HashMap::new(), + span_links: vec![], + span_events: vec![], + } +} + // Simulates async IO by sleeping 2ms per export batch. #[derive(Debug)] struct SleepExport; -impl Export for SleepExport { +impl Export for SleepExport { fn export_trace_chunks( &mut self, - _trace_chunks: Vec>, + _trace_chunks: Vec>, ) -> Pin< Box< dyn std::future::Future> + Send + '_, @@ -37,11 +71,11 @@ impl Export for SleepExport { } } -fn setup_buffer() -> (Arc, Arc>) { +fn setup_buffer() -> (Arc, Arc>) { let rt = Arc::new(SharedRuntime::new().expect("SharedRuntime::new")); let cfg = TraceBufferConfig::new() - .max_buffered_spans(1_000) - .span_flush_threshold(500) + .max_buffered_bytes(1_000_000) + .flush_threshold_bytes(100_000) .max_flush_interval(Duration::from_secs(2)); let (buf, worker) = TraceBuffer::new(cfg, Box::new(|_| {}), Box::new(SleepExport)); rt.spawn_worker(worker, true).expect("spawn_worker"); @@ -69,22 +103,32 @@ fn bench_trace_buffer(c: &mut Criterion) { group.bench_function( BenchmarkId::new(format!("{}_senders", num_senders), delay_label), |b| { - b.iter(|| { - std::thread::scope(|s| { - for _ in 0..num_senders { - let sender = sender.clone(); - s.spawn(move || { - for _ in 0..CHUNKS_PER_SENDER { - // BatchFull errors are expected under high load. - let _ = sender.send_chunk(vec![[0u8; 100]]); - if let Some(d) = delay { - std::thread::sleep(d); + b.iter_batched( + || { + Vec::from_iter( + (0..num_senders) + .map(|_| (0..CHUNKS_PER_SENDER).map(|_| vec![make_span()])) + .map(Vec::from_iter), + ) + }, + |input| { + std::thread::scope(|s| { + for sender_spans in input { + let sender = sender.clone(); + s.spawn(move || { + for s in sender_spans { + // BatchFull errors are expected under high load. + let _ = sender.send_chunk(s); + if let Some(d) = delay { + std::thread::sleep(d); + } } - } - }); - } - }); - }); + }); + } + }); + }, + BatchSize::PerIteration, + ); }, ); diff --git a/libdd-data-pipeline/src/trace_buffer/mod.rs b/libdd-data-pipeline/src/trace_buffer/mod.rs index 95e7ee52e8..f7b18f473a 100644 --- a/libdd-data-pipeline/src/trace_buffer/mod.rs +++ b/libdd-data-pipeline/src/trace_buffer/mod.rs @@ -20,13 +20,80 @@ use crate::trace_exporter::{ agent_response::AgentResponse, error::TraceExporterError, TraceExporter, }; +/// Trait for types stored in a [`TraceBuffer`] that can report their approximate byte size. +pub trait BufferSize { + fn byte_size(&self) -> usize; +} + +impl BufferSize for libdd_trace_utils::span::v04::Span +where + T: libdd_trace_utils::span::TraceData, + T::Text: AsRef, + T::Bytes: AsRef<[u8]>, +{ + fn byte_size(&self) -> usize { + use libdd_trace_utils::span::v04::AttributeAnyValue; + + // trace_id(16) + span_id(8) + parent_id(8) + start(8) + duration(8) + error(4) + let mut size: usize = 52; + + size += self.service.as_ref().len(); + size += self.name.as_ref().len(); + size += self.resource.as_ref().len(); + size += self.r#type.as_ref().len(); + + for (k, v) in &self.meta { + size += k.as_ref().len() + v.as_ref().len(); + } + for k in self.metrics.keys() { + size += k.as_ref().len() + 8; + } + for (k, v) in &self.meta_struct { + size += k.as_ref().len() + v.as_ref().len(); + } + for link in &self.span_links { + // trace_id(8) + trace_id_high(8) + span_id(8) + flags(4) = 28 + size += 28 + link.tracestate.as_ref().len(); + for (k, v) in &link.attributes { + size += k.as_ref().len() + v.as_ref().len(); + } + } + for event in &self.span_events { + // time_unix_nano(8) + size += 8 + event.name.as_ref().len(); + for (k, v) in &event.attributes { + size += k.as_ref().len() + + match v { + AttributeAnyValue::SingleValue(av) => span_attr_size::(av), + AttributeAnyValue::Array(vec) => vec.iter().map(span_attr_size::).sum(), + }; + } + } + size + } +} + +fn span_attr_size(v: &libdd_trace_utils::span::v04::AttributeArrayValue) -> usize +where + T: libdd_trace_utils::span::TraceData, + T::Text: AsRef, +{ + use libdd_trace_utils::span::v04::AttributeArrayValue; + match v { + AttributeArrayValue::String(s) => s.as_ref().len(), + AttributeArrayValue::Boolean(_) => 1, + AttributeArrayValue::Integer(_) => 8, + AttributeArrayValue::Double(_) => 8, + } +} + #[derive(Clone, Copy, Debug)] pub struct TraceBufferConfig { synchronous_export: bool, synchronous_export_timeout: Option, max_flush_interval: Duration, - max_buffered_spans: usize, - span_flush_threshold: usize, + max_buffered_bytes: usize, + flush_threshold_bytes: usize, } impl TraceBufferConfig { @@ -62,18 +129,18 @@ impl TraceBufferConfig { } } - /// The maximum number of spans that will be buffered before we drop data - pub fn max_buffered_spans(self, max: usize) -> Self { + /// The maximum number of bytes that will be buffered before we drop data + pub fn max_buffered_bytes(self, max: usize) -> Self { Self { - max_buffered_spans: max, + max_buffered_bytes: max, ..self } } - /// The number of spans that will be buffered before we decide to flush - pub fn span_flush_threshold(self, threshold: usize) -> Self { + /// The number of bytes that will be buffered before we decide to flush + pub fn flush_threshold_bytes(self, threshold: usize) -> Self { Self { - span_flush_threshold: threshold, + flush_threshold_bytes: threshold, ..self } } @@ -85,8 +152,8 @@ impl Default for TraceBufferConfig { synchronous_export: false, synchronous_export_timeout: Some(Duration::from_secs(1)), max_flush_interval: Duration::from_secs(2), - max_buffered_spans: 10_000, - span_flush_threshold: 3_000, + max_buffered_bytes: 5_000_000, // 5MB + flush_threshold_bytes: 1_500_000, // 1.5MB } } } @@ -94,12 +161,12 @@ impl Default for TraceBufferConfig { pub type TraceChunk = Vec; /// Error that can occur when the batch has reached its maximum size -/// and we can't add more spans to it. +/// and we can't add more data to it. /// -/// The added spans will be dropped. +/// The added data will be dropped. #[derive(Debug, PartialEq, Eq)] pub struct BatchFullError { - spans_dropped: usize, + pub spans_dropped: usize, } /// Error that can occur when the mutex was poisoned. @@ -120,8 +187,8 @@ pub enum TraceBufferError { struct Batch { chunks: Vec>, last_flush: Instant, - span_count: usize, - max_buffered_spans: usize, + byte_count: usize, + max_buffered_bytes: usize, batch_gen: BatchGeneration, } @@ -130,15 +197,15 @@ struct Batch { const PRE_ALLOCATE_CHUNKS: usize = 400; impl Batch { - fn new(max_buffered_spans: usize) -> Self { + fn new(max_buffered_bytes: usize) -> Self { let mut batch_gen = BatchGeneration::default(); batch_gen.incr(); Self { chunks: Vec::with_capacity(PRE_ALLOCATE_CHUNKS), last_flush: Instant::now(), - span_count: 0, + byte_count: 0, batch_gen, - max_buffered_spans, + max_buffered_bytes, } } @@ -146,13 +213,13 @@ impl Batch { let Self { chunks, last_flush, - span_count, + byte_count, batch_gen, - max_buffered_spans: _max_buffered_spans, + max_buffered_bytes: _max_buffered_bytes, } = self; chunks.clear(); *last_flush = Instant::now(); - *span_count = 0; + *byte_count = 0; *batch_gen = { let mut batch_gen = BatchGeneration::default(); @@ -166,10 +233,13 @@ impl Batch { /// /// This method will not check that adding the chunk will not exceed the maximum size of the /// batch. So the batch can be over the maximum size after this call. - /// This is because we don't want to always drop traces that contain more spans than the maximum + /// This is because we don't want to always drop traces that contain more bytes than the maximum /// size. - fn add_trace_chunk(&mut self, chunk: Vec) -> Result<(), BatchFullError> { - if self.span_count > self.max_buffered_spans { + fn add_trace_chunk(&mut self, chunk: Vec) -> Result<(), BatchFullError> + where + T: BufferSize, + { + if self.byte_count > self.max_buffered_bytes { return Err(BatchFullError { spans_dropped: chunk.len(), }); @@ -178,7 +248,7 @@ impl Batch { return Ok(()); } - self.span_count += chunk.len(); + self.byte_count += chunk.iter().map(|s| s.byte_size()).sum::(); self.chunks.push(chunk); Ok(()) } @@ -186,7 +256,7 @@ impl Batch { /// Export the trace chunk and reset the batch fn export(&mut self) -> Vec> { let chunks = std::mem::replace(&mut self.chunks, Vec::with_capacity(PRE_ALLOCATE_CHUNKS)); - self.span_count = 0; + self.byte_count = 0; self.last_flush = Instant::now(); if !chunks.is_empty() { self.batch_gen.incr(); @@ -230,15 +300,15 @@ pub struct TraceBuffer { pub type ResponseHandler = Box) + Send + Sync>; -impl TraceBuffer { +impl TraceBuffer { pub fn new( config: TraceBufferConfig, response_handler: ResponseHandler, export_operation: Box + Send + Sync>, ) -> (Self, TraceExporterWorker) { let (tx, rx) = channel( - config.span_flush_threshold, - config.max_buffered_spans, + config.flush_threshold_bytes, + config.max_buffered_bytes, config.synchronous_export, ); let worker = TraceExporterWorker::new(rx, response_handler, export_operation, config); @@ -310,8 +380,8 @@ pub struct QueueMetrics { } fn channel( - flush_trigger_number_of_spans: usize, - max_number_of_spans: usize, + flush_trigger_bytes: usize, + max_buffered_bytes: usize, synchronous_write: bool, ) -> (Sender, Receiver) { let waiter = Arc::new(Waiter { @@ -319,7 +389,7 @@ fn channel( flush_needed: false, last_flush_generation: BatchGeneration::default(), has_shutdown: false, - batch: Batch::new(max_number_of_spans), + batch: Batch::new(max_buffered_bytes), metrics: QueueMetrics::default(), }), sender_notifier: Condvar::new(), @@ -328,7 +398,7 @@ fn channel( ( Sender { waiter: waiter.clone(), - flush_trigger_number_of_spans, + flush_trigger_bytes, synchronous_write, }, Receiver { waiter }, @@ -337,7 +407,7 @@ fn channel( struct Sender { waiter: Arc>, - flush_trigger_number_of_spans: usize, + flush_trigger_bytes: usize, synchronous_write: bool, } @@ -390,7 +460,10 @@ impl Sender { Ok(state) } - fn add_trace_chunk(&self, chunk: Vec) -> Result { + fn add_trace_chunk(&self, chunk: Vec) -> Result + where + T: BufferSize, + { let mut state = self.get_running_state()?; let chunk_len = chunk.len(); if let Err(e @ BatchFullError { spans_dropped }) = state.batch.add_trace_chunk(chunk) { @@ -400,8 +473,7 @@ impl Sender { state.metrics.spans_queued += chunk_len; let gen = state.batch.batch_gen; if !state.flush_needed - && (state.batch.span_count > self.flush_trigger_number_of_spans - || self.synchronous_write) + && (state.batch.byte_count > self.flush_trigger_bytes || self.synchronous_write) { state.flush_needed = true; self.waiter.notify_receiver(state); @@ -703,12 +775,19 @@ mod tests { use libdd_shared_runtime::SharedRuntime; - use crate::trace_buffer::{Export, TraceBuffer, TraceBufferConfig}; + use crate::trace_buffer::{BufferSize, Export, TraceBuffer, TraceBufferConfig}; use crate::trace_exporter::agent_response::AgentResponse; use crate::trace_exporter::error::TraceExporterError; use super::{BatchFullError, TraceBufferError}; + // Used for tests, 1 byte per item so size computations are easier + impl BufferSize for () { + fn byte_size(&self) -> usize { + 1 + } + } + struct AssertExporter( Box>) + Send + Sync>, Arc, @@ -769,8 +848,8 @@ mod tests { assert_eq!(lengths, &[1, 2]); }), TraceBufferConfig::default() - .max_buffered_spans(4) - .span_flush_threshold(2) + .max_buffered_bytes(4) + .flush_threshold_bytes(2) .max_flush_interval(Duration::from_secs(u32::MAX as u64)), ); @@ -798,8 +877,8 @@ mod tests { } }), TraceBufferConfig::default() - .max_buffered_spans(4) - .span_flush_threshold(3) + .max_buffered_bytes(4) + .flush_threshold_bytes(3) .max_flush_interval(Duration::from_secs(u32::MAX as u64)), ); @@ -837,8 +916,8 @@ mod tests { assert_eq!(chunks.len(), 1); }), TraceBufferConfig::default() - .max_buffered_spans(4) - .span_flush_threshold(2) + .max_buffered_bytes(4) + .flush_threshold_bytes(2) .max_flush_interval(Duration::from_millis(1)), ); sender.send_chunk(vec![()]).unwrap(); @@ -894,8 +973,8 @@ mod tests { assert_eq!(chunks.len(), 2); }), TraceBufferConfig::default() - .max_buffered_spans(100) - .span_flush_threshold(100) + .max_buffered_bytes(100) + .flush_threshold_bytes(100) .max_flush_interval(Duration::from_secs(u32::MAX as u64)), ); @@ -917,7 +996,7 @@ mod tests { fn test_worker_reset() { let (rt, sem, sender) = make_buffer( Box::new(|chunks| assert_eq!(chunks.len(), 1)), - TraceBufferConfig::default().span_flush_threshold(2), + TraceBufferConfig::default().flush_threshold_bytes(2), ); sender.send_chunk(vec![()]).unwrap(); assert_eq!(sem.available_permits(), 0);