diff --git a/libdd-data-pipeline/benches/trace_buffer.rs b/libdd-data-pipeline/benches/trace_buffer.rs index 464a2f4c12..e06f4b0a02 100644 --- a/libdd-data-pipeline/benches/trace_buffer.rs +++ b/libdd-data-pipeline/benches/trace_buffer.rs @@ -1,30 +1,64 @@ // Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput}; use libdd_data_pipeline::trace_buffer::{Export, TraceBuffer, TraceBufferConfig, TraceChunk}; use libdd_data_pipeline::trace_exporter::{ agent_response::AgentResponse, error::TraceExporterError, }; use libdd_shared_runtime::SharedRuntime; - -type Span = [u8; 100]; +use libdd_tinybytes::BytesString; +use libdd_trace_utils::span::v04::SpanBytes; // Number of chunks each sender thread sends per benchmark iteration. const CHUNKS_PER_SENDER: usize = 900; +fn bs(s: &'static str) -> BytesString { + BytesString::from_static(s) +} + +fn make_span() -> SpanBytes { + SpanBytes { + service: bs("my-web-service"), + name: bs("http.request"), + resource: bs("GET /api/v1/users"), + r#type: bs("web"), + trace_id: 1_234_567_890_123_456_789_u128, + span_id: 987_654_321_u64, + parent_id: 0, + start: 1_700_000_000_000_000_000_i64, + duration: 5_000_000_i64, + error: 0, + meta: HashMap::from_iter([ + (bs("env"), bs("prod")), + (bs("version"), bs("1.0.0")), + (bs("http.method"), bs("GET")), + (bs("http.url"), bs("/api/v1/users")), + (bs("peer.service"), bs("users-service")), + ]), + metrics: HashMap::from_iter([ + (bs("_sampling_priority_v1"), 1.0_f64), + (bs("_dd.agent_psr"), 1.0_f64), + ]), + meta_struct: HashMap::new(), + span_links: vec![], + span_events: vec![], + } +} + // Simulates async IO by sleeping 2ms per export batch. #[derive(Debug)] struct SleepExport; -impl Export for SleepExport { +impl Export for SleepExport { fn export_trace_chunks( &mut self, - _trace_chunks: Vec>, + _trace_chunks: Vec>, ) -> Pin< Box< dyn std::future::Future> + Send + '_, @@ -37,11 +71,11 @@ impl Export for SleepExport { } } -fn setup_buffer() -> (Arc, Arc>) { +fn setup_buffer() -> (Arc, Arc>) { let rt = Arc::new(SharedRuntime::new().expect("SharedRuntime::new")); let cfg = TraceBufferConfig::new() - .max_buffered_spans(1_000) - .span_flush_threshold(500) + .max_buffered_bytes(1_000_000) + .flush_threshold_bytes(100_000) .max_flush_interval(Duration::from_secs(2)); let (buf, worker) = TraceBuffer::new(cfg, Box::new(|_| {}), Box::new(SleepExport)); let _ = rt.spawn_worker(worker, true).expect("spawn_worker"); @@ -69,22 +103,32 @@ fn bench_trace_buffer(c: &mut Criterion) { group.bench_function( BenchmarkId::new(format!("{}_senders", num_senders), delay_label), |b| { - b.iter(|| { - std::thread::scope(|s| { - for _ in 0..num_senders { - let sender = sender.clone(); - s.spawn(move || { - for _ in 0..CHUNKS_PER_SENDER { - // BatchFull errors are expected under high load. - let _ = sender.send_chunk(vec![[0u8; 100]]); - if let Some(d) = delay { - std::thread::sleep(d); + b.iter_batched( + || { + Vec::from_iter( + (0..num_senders) + .map(|_| (0..CHUNKS_PER_SENDER).map(|_| vec![make_span()])) + .map(Vec::from_iter), + ) + }, + |input| { + std::thread::scope(|s| { + for sender_spans in input { + let sender = sender.clone(); + s.spawn(move || { + for s in sender_spans { + // BatchFull errors are expected under high load. + let _ = sender.send_chunk(s); + if let Some(d) = delay { + std::thread::sleep(d); + } } - } - }); - } - }); - }); + }); + } + }); + }, + BatchSize::PerIteration, + ); }, ); diff --git a/libdd-data-pipeline/src/trace_buffer/mod.rs b/libdd-data-pipeline/src/trace_buffer/mod.rs index 54c239a878..09698eef18 100644 --- a/libdd-data-pipeline/src/trace_buffer/mod.rs +++ b/libdd-data-pipeline/src/trace_buffer/mod.rs @@ -20,13 +20,80 @@ use crate::trace_exporter::{ agent_response::AgentResponse, error::TraceExporterError, TraceExporter, }; +/// Trait for types stored in a [`TraceBuffer`] that can report their approximate byte size. +pub trait BufferSize { + fn byte_size(&self) -> usize; +} + +impl BufferSize for libdd_trace_utils::span::v04::Span +where + T: libdd_trace_utils::span::TraceData, + T::Text: AsRef, + T::Bytes: AsRef<[u8]>, +{ + fn byte_size(&self) -> usize { + use libdd_trace_utils::span::v04::AttributeAnyValue; + + // trace_id(16) + span_id(8) + parent_id(8) + start(8) + duration(8) + error(4) + let mut size: usize = 52; + + size += self.service.as_ref().len(); + size += self.name.as_ref().len(); + size += self.resource.as_ref().len(); + size += self.r#type.as_ref().len(); + + for (k, v) in &self.meta { + size += k.as_ref().len() + v.as_ref().len(); + } + for k in self.metrics.keys() { + size += k.as_ref().len() + 8; + } + for (k, v) in &self.meta_struct { + size += k.as_ref().len() + v.as_ref().len(); + } + for link in &self.span_links { + // trace_id(8) + trace_id_high(8) + span_id(8) + flags(4) = 28 + size += 28 + link.tracestate.as_ref().len(); + for (k, v) in &link.attributes { + size += k.as_ref().len() + v.as_ref().len(); + } + } + for event in &self.span_events { + // time_unix_nano(8) + size += 8 + event.name.as_ref().len(); + for (k, v) in &event.attributes { + size += k.as_ref().len() + + match v { + AttributeAnyValue::SingleValue(av) => span_attr_size::(av), + AttributeAnyValue::Array(vec) => vec.iter().map(span_attr_size::).sum(), + }; + } + } + size + } +} + +fn span_attr_size(v: &libdd_trace_utils::span::v04::AttributeArrayValue) -> usize +where + T: libdd_trace_utils::span::TraceData, + T::Text: AsRef, +{ + use libdd_trace_utils::span::v04::AttributeArrayValue; + match v { + AttributeArrayValue::String(s) => s.as_ref().len(), + AttributeArrayValue::Boolean(_) => 1, + AttributeArrayValue::Integer(_) => 8, + AttributeArrayValue::Double(_) => 8, + } +} + #[derive(Clone, Copy, Debug)] pub struct TraceBufferConfig { synchronous_export: bool, synchronous_export_timeout: Option, max_flush_interval: Duration, - max_buffered_spans: usize, - span_flush_threshold: usize, + max_buffered_bytes: usize, + flush_threshold_bytes: usize, } impl TraceBufferConfig { @@ -62,18 +129,18 @@ impl TraceBufferConfig { } } - /// The maximum number of spans that will be buffered before we drop data - pub fn max_buffered_spans(self, max: usize) -> Self { + /// The maximum number of bytes that will be buffered before we drop data + pub fn max_buffered_bytes(self, max: usize) -> Self { Self { - max_buffered_spans: max, + max_buffered_bytes: max, ..self } } - /// The number of spans that will be buffered before we decide to flush - pub fn span_flush_threshold(self, threshold: usize) -> Self { + /// The number of bytes that will be buffered before we decide to flush + pub fn flush_threshold_bytes(self, threshold: usize) -> Self { Self { - span_flush_threshold: threshold, + flush_threshold_bytes: threshold, ..self } } @@ -85,8 +152,8 @@ impl Default for TraceBufferConfig { synchronous_export: false, synchronous_export_timeout: Some(Duration::from_secs(1)), max_flush_interval: Duration::from_secs(2), - max_buffered_spans: 10_000, - span_flush_threshold: 3_000, + max_buffered_bytes: 5_000_000, // 5MB + flush_threshold_bytes: 1_500_000, // 1.5MB } } } @@ -94,12 +161,12 @@ impl Default for TraceBufferConfig { pub type TraceChunk = Vec; /// Error that can occur when the batch has reached its maximum size -/// and we can't add more spans to it. +/// and we can't add more data to it. /// -/// The added spans will be dropped. +/// The added data will be dropped. #[derive(Debug, PartialEq, Eq)] pub struct BatchFullError { - spans_dropped: usize, + pub spans_dropped: usize, } /// Error that can occur when the mutex was poisoned. @@ -120,8 +187,8 @@ pub enum TraceBufferError { struct Batch { chunks: Vec>, last_flush: Instant, - span_count: usize, - max_buffered_spans: usize, + byte_count: usize, + max_buffered_bytes: usize, batch_gen: BatchGeneration, } @@ -130,15 +197,15 @@ struct Batch { const PRE_ALLOCATE_CHUNKS: usize = 400; impl Batch { - fn new(max_buffered_spans: usize) -> Self { + fn new(max_buffered_bytes: usize) -> Self { let mut batch_gen = BatchGeneration::default(); batch_gen.incr(); Self { chunks: Vec::with_capacity(PRE_ALLOCATE_CHUNKS), last_flush: Instant::now(), - span_count: 0, + byte_count: 0, batch_gen, - max_buffered_spans, + max_buffered_bytes, } } @@ -146,13 +213,13 @@ impl Batch { let Self { chunks, last_flush, - span_count, + byte_count, batch_gen, - max_buffered_spans: _max_buffered_spans, + max_buffered_bytes: _max_buffered_bytes, } = self; chunks.clear(); *last_flush = Instant::now(); - *span_count = 0; + *byte_count = 0; *batch_gen = { let mut batch_gen = BatchGeneration::default(); @@ -166,10 +233,13 @@ impl Batch { /// /// This method will not check that adding the chunk will not exceed the maximum size of the /// batch. So the batch can be over the maximum size after this call. - /// This is because we don't want to always drop traces that contain more spans than the maximum + /// This is because we don't want to always drop traces that contain more bytes than the maximum /// size. - fn add_trace_chunk(&mut self, chunk: Vec) -> Result<(), BatchFullError> { - if self.span_count > self.max_buffered_spans { + fn add_trace_chunk(&mut self, chunk: Vec) -> Result<(), BatchFullError> + where + T: BufferSize, + { + if self.byte_count > self.max_buffered_bytes { return Err(BatchFullError { spans_dropped: chunk.len(), }); @@ -178,7 +248,7 @@ impl Batch { return Ok(()); } - self.span_count += chunk.len(); + self.byte_count += chunk.iter().map(|s| s.byte_size()).sum::(); self.chunks.push(chunk); Ok(()) } @@ -186,7 +256,7 @@ impl Batch { /// Export the trace chunk and reset the batch fn export(&mut self) -> Vec> { let chunks = std::mem::replace(&mut self.chunks, Vec::with_capacity(PRE_ALLOCATE_CHUNKS)); - self.span_count = 0; + self.byte_count = 0; self.last_flush = Instant::now(); if !chunks.is_empty() { self.batch_gen.incr(); @@ -230,15 +300,15 @@ pub struct TraceBuffer { pub type ResponseHandler = Box) + Send + Sync>; -impl TraceBuffer { +impl TraceBuffer { pub fn new( config: TraceBufferConfig, response_handler: ResponseHandler, export_operation: Box + Send + Sync>, ) -> (Self, TraceExporterWorker) { let (tx, rx) = channel( - config.span_flush_threshold, - config.max_buffered_spans, + config.flush_threshold_bytes, + config.max_buffered_bytes, config.synchronous_export, ); let worker = TraceExporterWorker::new(rx, response_handler, export_operation, config); @@ -310,8 +380,8 @@ pub struct QueueMetrics { } fn channel( - flush_trigger_number_of_spans: usize, - max_number_of_spans: usize, + flush_trigger_bytes: usize, + max_buffered_bytes: usize, synchronous_write: bool, ) -> (Sender, Receiver) { let waiter = Arc::new(Waiter { @@ -319,7 +389,7 @@ fn channel( flush_needed: false, last_flush_generation: BatchGeneration::default(), has_shutdown: false, - batch: Batch::new(max_number_of_spans), + batch: Batch::new(max_buffered_bytes), metrics: QueueMetrics::default(), }), sender_notifier: Condvar::new(), @@ -328,7 +398,7 @@ fn channel( ( Sender { waiter: waiter.clone(), - flush_trigger_number_of_spans, + flush_trigger_bytes, synchronous_write, }, Receiver { waiter }, @@ -337,7 +407,7 @@ fn channel( struct Sender { waiter: Arc>, - flush_trigger_number_of_spans: usize, + flush_trigger_bytes: usize, synchronous_write: bool, } @@ -390,7 +460,10 @@ impl Sender { Ok(state) } - fn add_trace_chunk(&self, chunk: Vec) -> Result { + fn add_trace_chunk(&self, chunk: Vec) -> Result + where + T: BufferSize, + { let mut state = self.get_running_state()?; let chunk_len = chunk.len(); if let Err(e @ BatchFullError { spans_dropped }) = state.batch.add_trace_chunk(chunk) { @@ -400,8 +473,7 @@ impl Sender { state.metrics.spans_queued += chunk_len; let gen = state.batch.batch_gen; if !state.flush_needed - && (state.batch.span_count > self.flush_trigger_number_of_spans - || self.synchronous_write) + && (state.batch.byte_count > self.flush_trigger_bytes || self.synchronous_write) { state.flush_needed = true; self.waiter.notify_receiver(state); @@ -708,12 +780,19 @@ mod tests { use libdd_shared_runtime::SharedRuntime; - use crate::trace_buffer::{Export, TraceBuffer, TraceBufferConfig}; + use crate::trace_buffer::{BufferSize, Export, TraceBuffer, TraceBufferConfig}; use crate::trace_exporter::agent_response::AgentResponse; use crate::trace_exporter::error::TraceExporterError; use super::{BatchFullError, TraceBufferError}; + // Used for tests, 1 byte per item so size computations are easier + impl BufferSize for () { + fn byte_size(&self) -> usize { + 1 + } + } + struct AssertExporter( Box>) + Send + Sync>, Arc, @@ -774,8 +853,8 @@ mod tests { assert_eq!(lengths, &[1, 2]); }), TraceBufferConfig::default() - .max_buffered_spans(4) - .span_flush_threshold(2) + .max_buffered_bytes(4) + .flush_threshold_bytes(2) .max_flush_interval(Duration::from_secs(u32::MAX as u64)), ); @@ -803,8 +882,8 @@ mod tests { } }), TraceBufferConfig::default() - .max_buffered_spans(4) - .span_flush_threshold(3) + .max_buffered_bytes(4) + .flush_threshold_bytes(3) .max_flush_interval(Duration::from_secs(u32::MAX as u64)), ); @@ -842,8 +921,8 @@ mod tests { assert_eq!(chunks.len(), 1); }), TraceBufferConfig::default() - .max_buffered_spans(4) - .span_flush_threshold(2) + .max_buffered_bytes(4) + .flush_threshold_bytes(2) .max_flush_interval(Duration::from_millis(1)), ); sender.send_chunk(vec![()]).unwrap(); @@ -899,8 +978,8 @@ mod tests { assert_eq!(chunks.len(), 2); }), TraceBufferConfig::default() - .max_buffered_spans(100) - .span_flush_threshold(100) + .max_buffered_bytes(100) + .flush_threshold_bytes(100) .max_flush_interval(Duration::from_secs(u32::MAX as u64)), ); @@ -922,7 +1001,7 @@ mod tests { fn test_worker_reset() { let (rt, sem, sender) = make_buffer( Box::new(|chunks| assert_eq!(chunks.len(), 1)), - TraceBufferConfig::default().span_flush_threshold(2), + TraceBufferConfig::default().flush_threshold_bytes(2), ); sender.send_chunk(vec![()]).unwrap(); assert_eq!(sem.available_permits(), 0);