diff --git a/ext/libdatadog_api/init.c b/ext/libdatadog_api/init.c index 086009e8b8..f85e509c73 100644 --- a/ext/libdatadog_api/init.c +++ b/ext/libdatadog_api/init.c @@ -6,6 +6,7 @@ #include "feature_flags.h" #include "library_config.h" #include "process_discovery.h" +#include "trace_exporter.h" void ddsketch_init(VALUE core_module); void di_init(VALUE datadog_module); @@ -24,4 +25,7 @@ void DDTRACE_EXPORT Init_libdatadog_api(void) { ddsketch_init(core_module); feature_flags_init(core_module); di_init(datadog_module); + + VALUE tracing_module = rb_define_module_under(datadog_module, "Tracing"); + trace_exporter_init(tracing_module); } diff --git a/ext/libdatadog_api/trace_exporter.c b/ext/libdatadog_api/trace_exporter.c new file mode 100644 index 0000000000..16dcb3d1f1 --- /dev/null +++ b/ext/libdatadog_api/trace_exporter.c @@ -0,0 +1,774 @@ +#include +#include +#include +#include + +#include "datadog_ruby_common.h" +#include "helpers.h" +#include "trace_exporter.h" + +/* ======================================================================== + * Forward declarations + * ======================================================================== */ + +/* Internal: convert a Ruby Span to a Rust ddog_TracerSpan* (caller owns) */ +static ddog_TracerSpan *convert_ruby_span_to_rust(VALUE span); + +/* TracerSpan methods */ +static VALUE _native_from_span(VALUE klass, VALUE span); + +/* TraceExporter methods */ +static VALUE _native_exporter_new(int argc, VALUE *argv, VALUE klass); +static VALUE _native_send_traces(VALUE self, VALUE traces); + +/* Response helpers */ +static VALUE create_ok_response(long trace_count, VALUE payload); +static VALUE create_error_response(ddog_TraceExporterErrorCode code, + long trace_count); + +/* GC / TypedData */ +static void tracer_span_dfree(void *ptr); +static void trace_exporter_dfree(void *ptr); + +/* ======================================================================== + * Cached Ruby intern IDs + * ======================================================================== */ + +/* Instance variable IDs on Datadog::Tracing::Span */ +static ID at_name_id; +static ID at_service_id; +static ID at_resource_id; +static ID at_type_id; +static ID at_id_id; +static ID at_parent_id_id; +static ID at_trace_id_id; +static ID at_start_time_id; +static ID at_duration_id; +static ID at_status_id; +static ID at_meta_id; +static ID at_metrics_id; + +/* Method IDs for time / integer operations */ +static ID id_duration_method; + +/* Response class (loaded from Ruby) */ +static VALUE response_class = Qnil; +static ID id_new; + +/* ======================================================================== + * Ruby class references (marked as GC roots) + * ======================================================================== */ + +static VALUE tracer_span_class = Qnil; +static VALUE trace_exporter_class = Qnil; + +/* ======================================================================== + * TypedData definitions + * ======================================================================== */ + +static const rb_data_type_t tracer_span_typed_data = { + .wrap_struct_name = "Datadog::Tracing::Transport::Native::TracerSpan", + .function = { + .dmark = NULL, + .dfree = tracer_span_dfree, + .dsize = NULL, + }, + .flags = RUBY_TYPED_FREE_IMMEDIATELY +}; + +static void tracer_span_dfree(void *ptr) { + if (ptr != NULL) { + ddog_tracer_span_free((ddog_TracerSpan *)ptr); + } +} + +static const rb_data_type_t trace_exporter_typed_data = { + .wrap_struct_name = "Datadog::Tracing::Transport::Native::TraceExporter", + .function = { + .dmark = NULL, + .dfree = trace_exporter_dfree, + .dsize = NULL, + }, + .flags = RUBY_TYPED_FREE_IMMEDIATELY +}; + +static void trace_exporter_dfree(void *ptr) { + if (ptr != NULL) { + ddog_trace_exporter_free((ddog_TraceExporter *)ptr); + } +} + +/* ======================================================================== + * Error handling + * ======================================================================== */ + +/* + * If +err+ is non-NULL, copies the message, frees the error struct, and + * raises a Ruby RuntimeError. Does not return on error. + */ +static inline void check_exporter_error(const char *context, + ddog_TraceExporterError *err) { + if (err == NULL) return; + + char buf[MAX_RAISE_MESSAGE_SIZE]; + if (err->msg != NULL) { + snprintf(buf, sizeof(buf), "%s: %s", context, err->msg); + } else { + snprintf(buf, sizeof(buf), "%s: (unknown error)", context); + } + ddog_trace_exporter_error_free(err); + raise_error(rb_eRuntimeError, "%s", buf); +} + +/* ======================================================================== + * Config helpers + * ======================================================================== */ + +typedef ddog_TraceExporterError *(*config_setter_fn)( + ddog_TraceExporterConfig *, ddog_CharSlice); + +static inline void set_config_field( + ddog_TraceExporterConfig *config, + config_setter_fn setter, + VALUE rb_val, + const char *label) { + if (rb_val == Qnil) return; + + ddog_TraceExporterError *err = + setter(config, char_slice_from_ruby_string(rb_val)); + if (err) { + ddog_trace_exporter_config_free(config); + check_exporter_error(label, err); + } +} + +/* ======================================================================== + * Conversion helpers (Ruby -> C, require the GVL) + * ======================================================================== */ + +/* Nullable Ruby String -> ddog_CharSlice (nil -> empty slice) */ +static inline ddog_CharSlice nullable_char_slice(VALUE str) { + if (str == Qnil) { + return (ddog_CharSlice){.ptr = "", .len = 0}; + } + return char_slice_from_ruby_string(str); +} + +/* Ruby Time -> int64_t nanoseconds since Unix epoch */ +static inline int64_t time_to_nanos(VALUE time) { + struct timespec ts = rb_time_timespec(time); + return (int64_t)ts.tv_sec * 1000000000LL + (int64_t)ts.tv_nsec; +} + +/* 128-bit trace ID split into two 64-bit halves */ +typedef struct { + uint64_t low; + uint64_t high; +} trace_id_t; + +/* Ruby 128-bit Integer -> trace_id_t */ +static inline trace_id_t split_trace_id(VALUE trace_id) { + /* Fast path: Fixnum fits in 63 bits, no high half */ + if (FIXNUM_P(trace_id)) { + return (trace_id_t){ + .low = (uint64_t)FIX2LONG(trace_id), + .high = 0, + }; + } + + /* Bignum path: extract raw bytes into two 64-bit words */ + unsigned long words[2] = {0, 0}; + rb_big_pack(trace_id, words, 2); + return (trace_id_t){ + .low = (uint64_t)words[0], + .high = (uint64_t)words[1], + }; +} + +/* ======================================================================== + * Hash iteration callbacks for meta / metrics + * + * We cannot raise Ruby exceptions from inside rb_hash_foreach callbacks + * (longjmp would corrupt the hash iteration state). Instead, the first + * error is stashed in a context struct and iteration is stopped with + * ST_STOP. The caller checks for the error after rb_hash_foreach + * returns. + * ======================================================================== */ + +typedef struct { + ddog_TracerSpan *span; + ddog_TraceExporterError *error; /* first error, if any */ + long skipped; /* entries skipped due to wrong type */ +} hash_iter_ctx; + +static int meta_iter_cb(VALUE key, VALUE value, VALUE arg) { + hash_iter_ctx *ctx = (hash_iter_ctx *)arg; + + /* + * We intentionally use direct struct initialization instead of + * char_slice_from_ruby_string() here: that helper contains + * ENFORCE_TYPE which can raise, and raising inside an + * rb_hash_foreach callback would longjmp out of the hash + * iteration and corrupt internal VM state. + */ + if (!RB_TYPE_P(key, T_STRING) || !RB_TYPE_P(value, T_STRING)) { + ctx->skipped++; + return ST_CONTINUE; + } + + ddog_CharSlice ks = {.ptr = RSTRING_PTR(key), .len = RSTRING_LEN(key)}; + ddog_CharSlice vs = {.ptr = RSTRING_PTR(value), .len = RSTRING_LEN(value)}; + + ddog_TraceExporterError *err = ddog_tracer_span_set_meta(ctx->span, ks, vs); + if (err != NULL) { + ctx->error = err; + return ST_STOP; + } + + return ST_CONTINUE; +} + +static int metrics_iter_cb(VALUE key, VALUE value, VALUE arg) { + hash_iter_ctx *ctx = (hash_iter_ctx *)arg; + + if (!RB_TYPE_P(key, T_STRING) || + (!RB_TYPE_P(value, T_FLOAT) && !RB_TYPE_P(value, T_FIXNUM) && + !RB_TYPE_P(value, T_BIGNUM))) { + ctx->skipped++; + return ST_CONTINUE; + } + + /* See meta_iter_cb for why we avoid char_slice_from_ruby_string() here. */ + ddog_CharSlice ks = {.ptr = RSTRING_PTR(key), .len = RSTRING_LEN(key)}; + + ddog_TraceExporterError *err = + ddog_tracer_span_set_metric(ctx->span, ks, NUM2DBL(value)); + if (err != NULL) { + ctx->error = err; + return ST_STOP; + } + + return ST_CONTINUE; +} + +/* ======================================================================== + * Internal: convert a Ruby Span -> ddog_TracerSpan* + * + * The returned pointer is Rust-heap-allocated. Ownership is transferred + * to the caller (either wrap it in TypedData or push it into trace chunks). + * ======================================================================== */ + +static ddog_TracerSpan *convert_ruby_span_to_rust(VALUE span) { + /* 1. Read Ruby ivars */ + VALUE rb_name = rb_ivar_get(span, at_name_id); + VALUE rb_service = rb_ivar_get(span, at_service_id); + VALUE rb_resource = rb_ivar_get(span, at_resource_id); + VALUE rb_type = rb_ivar_get(span, at_type_id); + VALUE rb_span_id = rb_ivar_get(span, at_id_id); + VALUE rb_parent_id = rb_ivar_get(span, at_parent_id_id); + VALUE rb_trace_id = rb_ivar_get(span, at_trace_id_id); + VALUE rb_status = rb_ivar_get(span, at_status_id); + + /* 2. Convert scalars */ + ddog_CharSlice name_s = char_slice_from_ruby_string(rb_name); + ddog_CharSlice service_s = nullable_char_slice(rb_service); + ddog_CharSlice resource_s = char_slice_from_ruby_string(rb_resource); + ddog_CharSlice type_s = nullable_char_slice(rb_type); + + uint64_t span_id = NUM2ULL(rb_span_id); + uint64_t parent_id = NUM2ULL(rb_parent_id); + int32_t error_val = NUM2INT(rb_status); + + trace_id_t trace_id = split_trace_id(rb_trace_id); + + /* start (ns) */ + int64_t start_ns = 0; + VALUE rb_start_time = rb_ivar_get(span, at_start_time_id); + if (rb_start_time != Qnil) { + start_ns = time_to_nanos(rb_start_time); + } + + /* duration (ns) */ + int64_t duration_ns = 0; + VALUE rb_duration_ivar = rb_ivar_get(span, at_duration_id); + if (rb_duration_ivar != Qnil) { + duration_ns = (int64_t)(NUM2DBL(rb_duration_ivar) * 1e9); + } else { + VALUE dur = rb_funcall(span, id_duration_method, 0); + if (dur != Qnil) duration_ns = (int64_t)(NUM2DBL(dur) * 1e9); + } + + /* 3. Create Rust span */ + ddog_TracerSpanFields fields = { + .service = service_s, + .name = name_s, + .resource = resource_s, + .span_type = type_s, + .trace_id_low = trace_id.low, + .trace_id_high = trace_id.high, + .span_id = span_id, + .parent_id = parent_id, + .start = start_ns, + .duration = duration_ns, + .error = error_val, + }; + + ddog_TracerSpan *rust_span = NULL; + ddog_TraceExporterError *err = ddog_tracer_span_new(&rust_span, &fields); + check_exporter_error("Failed to create TracerSpan", err); + + /* 4. Populate meta and metrics */ + hash_iter_ctx ctx = {.span = rust_span, .error = NULL, .skipped = 0}; + + VALUE rb_meta = rb_ivar_get(span, at_meta_id); + if (RB_TYPE_P(rb_meta, T_HASH) && RHASH_SIZE(rb_meta) > 0) { + rb_hash_foreach(rb_meta, meta_iter_cb, (VALUE)&ctx); + if (ctx.error != NULL) { + ddog_tracer_span_free(rust_span); + check_exporter_error("Failed to set span meta", ctx.error); + } + if (ctx.skipped > 0) { + log_warning(rb_sprintf( + "Native trace exporter: skipped %ld non-string meta entries", + ctx.skipped)); + ctx.skipped = 0; + } + } + + VALUE rb_metrics = rb_ivar_get(span, at_metrics_id); + if (RB_TYPE_P(rb_metrics, T_HASH) && RHASH_SIZE(rb_metrics) > 0) { + rb_hash_foreach(rb_metrics, metrics_iter_cb, (VALUE)&ctx); + if (ctx.error != NULL) { + ddog_tracer_span_free(rust_span); + check_exporter_error("Failed to set span metric", ctx.error); + } + if (ctx.skipped > 0) { + log_warning(rb_sprintf( + "Native trace exporter: skipped %ld non-numeric metrics entries", + ctx.skipped)); + } + } + + return rust_span; +} + +/* ======================================================================== + * TracerSpan._native_from_span + * ======================================================================== */ + +static VALUE _native_from_span(DDTRACE_UNUSED VALUE klass, VALUE span) { + ddog_TracerSpan *rust_span = convert_ruby_span_to_rust(span); + return TypedData_Wrap_Struct(tracer_span_class, &tracer_span_typed_data, + rust_span); +} + +/* ======================================================================== + * Response class helpers + * ======================================================================== */ + +/* + * Build an error response, classifying the error code into the + * Transport::Response categories: + * + * HTTP_CLIENT -> client_error? (4xx family) + * HTTP_SERVER -> server_error? (5xx family) + * everything else -> internal_error? + */ +static VALUE create_error_response(ddog_TraceExporterErrorCode code, + long trace_count) { + VALUE kwargs = rb_hash_new(); + rb_hash_aset(kwargs, ID2SYM(rb_intern("ok")), Qfalse); + rb_hash_aset(kwargs, ID2SYM(rb_intern("internal_error")), (code != DDOG_TRACE_EXPORTER_ERROR_CODE_HTTP_CLIENT && + code != DDOG_TRACE_EXPORTER_ERROR_CODE_HTTP_SERVER) ? Qtrue : Qfalse); + rb_hash_aset(kwargs, ID2SYM(rb_intern("server_error")), code == DDOG_TRACE_EXPORTER_ERROR_CODE_HTTP_SERVER ? Qtrue : Qfalse); + rb_hash_aset(kwargs, ID2SYM(rb_intern("client_error")), code == DDOG_TRACE_EXPORTER_ERROR_CODE_HTTP_CLIENT ? Qtrue : Qfalse); + rb_hash_aset(kwargs, ID2SYM(rb_intern("trace_count")), LONG2NUM(trace_count)); + return rb_funcallv_kw(response_class, id_new, 1, &kwargs, RB_PASS_KEYWORDS); +} + +/* + * Build a success response, optionally carrying the agent's response body + * as +payload+. + * + * +payload+ is the raw HTTP response body returned by the Datadog Agent + * (typically JSON containing +rate_by_service+). It is surfaced here so + * that callers matching the +Datadog::Core::Transport::Response+ interface + * can parse service sampling rates, just as the Net::HTTP transport does. + */ +static VALUE create_ok_response(long trace_count, VALUE payload) { + VALUE kwargs = rb_hash_new(); + rb_hash_aset(kwargs, ID2SYM(rb_intern("ok")), Qtrue); + rb_hash_aset(kwargs, ID2SYM(rb_intern("trace_count")), LONG2NUM(trace_count)); + rb_hash_aset(kwargs, ID2SYM(rb_intern("payload")), payload); + return rb_funcallv_kw(response_class, id_new, 1, &kwargs, RB_PASS_KEYWORDS); +} + +/* ======================================================================== + * TraceExporter._native_new + * + * Creates a Rust TraceExporter with the given configuration. + * + * Ruby signature: + * TraceExporter._native_new( + * url:, tracer_version: nil, language: nil, language_version: nil, + * language_interpreter: nil, hostname: nil, env: nil, + * service: nil, version: nil) -> TraceExporter + * + * +url+ is required (String). All other arguments may be nil. + * ======================================================================== */ + +static VALUE _native_exporter_new( + int argc, VALUE *argv, DDTRACE_UNUSED VALUE klass +) { + VALUE options; + rb_scan_args(argc, argv, "0:", &options); + if (options == Qnil) options = rb_hash_new(); + + VALUE rb_url = rb_hash_fetch(options, ID2SYM(rb_intern("url"))); + VALUE rb_tracer_version = rb_hash_fetch(options, ID2SYM(rb_intern("tracer_version"))); + VALUE rb_language = rb_hash_fetch(options, ID2SYM(rb_intern("language"))); + VALUE rb_language_version = rb_hash_fetch(options, ID2SYM(rb_intern("language_version"))); + VALUE rb_language_interpreter = rb_hash_fetch(options, ID2SYM(rb_intern("language_interpreter"))); + VALUE rb_hostname = rb_hash_fetch(options, ID2SYM(rb_intern("hostname"))); + VALUE rb_env = rb_hash_fetch(options, ID2SYM(rb_intern("env"))); + VALUE rb_service = rb_hash_fetch(options, ID2SYM(rb_intern("service"))); + VALUE rb_version = rb_hash_fetch(options, ID2SYM(rb_intern("version"))); + + /* Phase 1: validate types (may raise, no Rust resources yet) */ + ENFORCE_TYPE(rb_url, T_STRING); + if (rb_tracer_version != Qnil) ENFORCE_TYPE(rb_tracer_version, T_STRING); + if (rb_language != Qnil) ENFORCE_TYPE(rb_language, T_STRING); + if (rb_language_version != Qnil) ENFORCE_TYPE(rb_language_version, T_STRING); + if (rb_language_interpreter != Qnil) ENFORCE_TYPE(rb_language_interpreter, T_STRING); + if (rb_hostname != Qnil) ENFORCE_TYPE(rb_hostname, T_STRING); + if (rb_env != Qnil) ENFORCE_TYPE(rb_env, T_STRING); + if (rb_service != Qnil) ENFORCE_TYPE(rb_service, T_STRING); + if (rb_version != Qnil) ENFORCE_TYPE(rb_version, T_STRING); + + /* Phase 2: create config (cleanup on error) */ + ddog_TraceExporterConfig *config = NULL; + ddog_trace_exporter_config_new(&config); + + set_config_field(config, ddog_trace_exporter_config_set_url, rb_url, "url"); + set_config_field(config, ddog_trace_exporter_config_set_tracer_version, rb_tracer_version, "tracer_version"); + set_config_field(config, ddog_trace_exporter_config_set_language, rb_language, "language"); + set_config_field(config, ddog_trace_exporter_config_set_lang_version, rb_language_version, "language_version"); + set_config_field(config, ddog_trace_exporter_config_set_lang_interpreter, rb_language_interpreter, "language_interpreter"); + set_config_field(config, ddog_trace_exporter_config_set_hostname, rb_hostname, "hostname"); + set_config_field(config, ddog_trace_exporter_config_set_env, rb_env, "env"); + set_config_field(config, ddog_trace_exporter_config_set_service, rb_service, "service"); + set_config_field(config, ddog_trace_exporter_config_set_version, rb_version, "version"); + + /* Phase 3: build the exporter from the config */ + ddog_TraceExporter *exporter = NULL; + ddog_TraceExporterError *err = ddog_trace_exporter_new(&exporter, config); + ddog_trace_exporter_config_free(config); + config = NULL; + + if (err) { + check_exporter_error("Failed to create TraceExporter", err); + } + + return TypedData_Wrap_Struct(trace_exporter_class, &trace_exporter_typed_data, + exporter); +} + +/* ======================================================================== + * GVL-release helper for ddog_trace_exporter_send_trace_chunks + * + * The send call performs blocking network I/O. Releasing the GVL lets + * other Ruby threads (application code, test mock servers, etc.) run + * while we wait for the agent's response. + * ======================================================================== */ + +typedef struct { + const ddog_TraceExporter *exporter; + ddog_TracerTraceChunks *chunks; + ddog_TraceExporterResponse *response; + ddog_TraceExporterErrorCode error_code; + bool failed; + bool send_ran; +} send_chunks_args_t; + +static void *send_chunks_without_gvl(void *data) { + send_chunks_args_t *args = (send_chunks_args_t *)data; + ddog_TraceExporterError *err = ddog_trace_exporter_send_trace_chunks( + args->exporter, args->chunks, &args->response); + if (err != NULL) { + args->error_code = err->code; + args->failed = true; + ddog_trace_exporter_error_free(err); + } + args->send_ran = true; + return NULL; +} + +/* + * Check for a pending Ruby exception without raising it. + * Mirrors the profiling extension's check_if_pending_exception(). + */ +static VALUE process_pending_interruptions(DDTRACE_UNUSED VALUE _) { + rb_thread_check_ints(); + return Qnil; +} + +__attribute__((warn_unused_result)) +static int check_if_pending_exception(void) { + int pending_exception; + rb_protect(process_pending_interruptions, Qnil, &pending_exception); + return pending_exception; +} + +/* ======================================================================== + * TraceExporter#_native_send_traces + * + * Ruby signature: + * exporter._native_send_traces(traces) -> Array[Response] + * + * +traces+ is an Array of Arrays of Spans: + * [[span, span, ...], [span, ...], ...] + * + * Each inner array maps to one trace chunk (Vec in Rust). + * + * On success returns [Response(ok: true, trace_count: N)]. + * On error returns [Response(ok: false, ...)]. + * + * The chunk-building loop calls into Ruby (ENFORCE_TYPE, + * convert_ruby_span_to_rust) which may raise. We use rb_ensure so + * that the Rust-allocated chunks are freed if an exception fires + * before the send consumes them. + * ======================================================================== */ + +/* Context shared between the body and ensure callbacks. */ +typedef struct { + const ddog_TraceExporter *exporter; + VALUE traces; + long trace_count; + ddog_TracerTraceChunks *chunks; /* NULL after send consumes it */ +} send_traces_ctx; + +/* + * Body: build trace chunks from Ruby spans, then send them. + * Passed to rb_ensure as the "try" block. + */ +static VALUE build_and_send_traces(VALUE arg) { + send_traces_ctx *ctx = (send_traces_ctx *)arg; + + for (long i = 0; i < ctx->trace_count; i++) { + VALUE chunk_spans = rb_ary_entry(ctx->traces, i); + ENFORCE_TYPE(chunk_spans, T_ARRAY); + + long span_count = RARRAY_LEN(chunk_spans); + ddog_TraceExporterError *begin_err = + ddog_tracer_trace_chunks_begin_chunk(ctx->chunks, (size_t)span_count); + if (begin_err != NULL) { + ddog_trace_exporter_error_free(begin_err); + } + for (long j = 0; j < span_count; j++) { + VALUE rb_span = rb_ary_entry(chunk_spans, j); + + /* convert_ruby_span_to_rust may raise (type errors, etc.). + * rb_ensure guarantees chunks is freed in that case. */ + ddog_TracerSpan *rust_span = convert_ruby_span_to_rust(rb_span); + + /* push_span consumes rust_span (ownership transferred to chunks). + * The error path is unreachable in practice: push only fails if no + * chunk was started (we always call begin_chunk above) or if the + * handle is NULL. Free defensively just in case. */ + ddog_TraceExporterError *push_err = + ddog_tracer_trace_chunks_push_span(ctx->chunks, rust_span); + if (push_err != NULL) { + ddog_trace_exporter_error_free(push_err); + } + } + } + + /* + * Send -- consumes chunks regardless of outcome. + * + * Release the GVL so other Ruby threads can run during network I/O. + * + * We use rb_thread_call_without_gvl2 (not the plain variant) because + * the "2" variant does NOT automatically raise pending interrupts + * after the call returns. This matters because chunks has already + * been consumed by the Rust side, and we must inspect send_err / + * response before any Ruby exception propagates -- otherwise we + * would leak those Rust-allocated objects. + * + * An interrupt (e.g. Thread#kill) may cause gvl2 to return before + * our function runs, so we loop until it does. + */ + send_chunks_args_t args = { + .exporter = ctx->exporter, + .chunks = ctx->chunks, + .response = NULL, + .failed = false, + .send_ran = false, + }; + + int pending_exception = 0; + while (!args.send_ran && !pending_exception) { + rb_thread_call_without_gvl2( + send_chunks_without_gvl, &args, + RUBY_UBF_IO, NULL); + + if (!args.send_ran) { + pending_exception = check_if_pending_exception(); + } + } + /* Only null chunks when the send actually ran and consumed them. + * If an interrupt fired before the send executed, chunks are still + * live and the ensure handler must free them. */ + if (args.send_ran) { + ctx->chunks = NULL; + } + + /* Extract the response body as a Ruby string before freeing. */ + VALUE payload = Qnil; + if (args.response != NULL) { + uintptr_t body_len = 0; + const uint8_t *body_ptr = + ddog_trace_exporter_response_get_body(args.response, &body_len); + if (body_ptr != NULL && body_len > 0) { + payload = rb_str_new((const char *)body_ptr, (long)body_len); + } + ddog_trace_exporter_response_free(args.response); + args.response = NULL; + } + + if (pending_exception) { + rb_jump_tag(pending_exception); + } + + if (args.failed) { + VALUE err_resp = create_error_response(args.error_code, ctx->trace_count); + return rb_ary_new_from_args(1, err_resp); + } + + VALUE ok_resp = create_ok_response(ctx->trace_count, payload); + return rb_ary_new_from_args(1, ok_resp); +} + +/* + * Ensure: free chunks if they haven't been consumed by the send yet. + * This runs whether build_and_send_traces returned normally or raised. + */ +static VALUE free_chunks_if_needed(VALUE arg) { + send_traces_ctx *ctx = (send_traces_ctx *)arg; + if (ctx->chunks != NULL) { + ddog_tracer_trace_chunks_free(ctx->chunks); + ctx->chunks = NULL; + } + return Qnil; +} + +static VALUE _native_send_traces(VALUE self, VALUE traces) { + ENFORCE_TYPE(traces, T_ARRAY); + + ddog_TraceExporter *exporter; + TypedData_Get_Struct(self, ddog_TraceExporter, &trace_exporter_typed_data, + exporter); + if (exporter == NULL) { + raise_error(rb_eRuntimeError, + "TraceExporter has not been initialized or was already freed"); + } + + long trace_count = RARRAY_LEN(traces); + + /* Empty batch -> empty response (matches existing transport behaviour) */ + if (trace_count == 0) { + return rb_ary_new(); + } + + /* Allocate trace chunks */ + ddog_TracerTraceChunks *chunks = NULL; + ddog_TraceExporterError *chunks_err = + ddog_tracer_trace_chunks_new((size_t)trace_count, &chunks); + if (chunks_err != NULL) { + ddog_trace_exporter_error_free(chunks_err); + raise_error(rb_eRuntimeError, "Failed to allocate trace chunks"); + } + + send_traces_ctx ctx = { + .exporter = exporter, + .traces = traces, + .trace_count = trace_count, + .chunks = chunks, + }; + + return rb_ensure( + build_and_send_traces, (VALUE)&ctx, + free_chunks_if_needed, (VALUE)&ctx); +} + +/* ======================================================================== + * Initialization + * ======================================================================== */ + +void trace_exporter_init(VALUE tracing_module) { + /* -- Module hierarchy -- */ + VALUE transport_module = rb_define_module_under(tracing_module, "Transport"); + VALUE native_module = + rb_define_module_under(transport_module, "Native"); + + /* ---------------------------------------------------------------- + * TracerSpan class + * ---------------------------------------------------------------- */ + tracer_span_class = + rb_define_class_under(native_module, "TracerSpan", rb_cObject); + rb_undef_alloc_func(tracer_span_class); + + /* Factory */ + rb_define_singleton_method(tracer_span_class, "_native_from_span", + _native_from_span, 1); + + /* ---------------------------------------------------------------- + * TraceExporter class + * ---------------------------------------------------------------- */ + trace_exporter_class = + rb_define_class_under(native_module, "TraceExporter", rb_cObject); + rb_undef_alloc_func(trace_exporter_class); + + /* Factory: _native_new(url:, tracer_version:, ...) */ + rb_define_singleton_method(trace_exporter_class, "_native_new", + _native_exporter_new, -1); + + /* Instance: _native_send_traces(traces) -> Array[Response] */ + rb_define_method(trace_exporter_class, "_native_send_traces", + _native_send_traces, 1); + + /* ---------------------------------------------------------------- + * Response class (defined in Ruby, loaded lazily) + * + * We resolve it here so create_ok_response / create_error_response + * can call Response.new without repeated const lookups. + * ---------------------------------------------------------------- */ + rb_require("datadog/tracing/transport/native/response"); + response_class = + rb_const_get(native_module, rb_intern("Response")); + rb_global_variable(&response_class); + + /* ---------------------------------------------------------------- + * Cache Ruby intern IDs + * ---------------------------------------------------------------- */ + + /* Span ivars */ + at_name_id = rb_intern("@name"); + at_service_id = rb_intern("@service"); + at_resource_id = rb_intern("@resource"); + at_type_id = rb_intern("@type"); + at_id_id = rb_intern("@id"); + at_parent_id_id = rb_intern("@parent_id"); + at_trace_id_id = rb_intern("@trace_id"); + at_start_time_id = rb_intern("@start_time"); + at_duration_id = rb_intern("@duration"); + at_status_id = rb_intern("@status"); + at_meta_id = rb_intern("@meta"); + at_metrics_id = rb_intern("@metrics"); + + /* Methods */ + id_duration_method = rb_intern("duration"); + + /* Response.new */ + id_new = rb_intern("new"); +} diff --git a/ext/libdatadog_api/trace_exporter.h b/ext/libdatadog_api/trace_exporter.h new file mode 100644 index 0000000000..04f2efbd93 --- /dev/null +++ b/ext/libdatadog_api/trace_exporter.h @@ -0,0 +1,5 @@ +#pragma once + +#include "datadog_ruby_common.h" + +void trace_exporter_init(VALUE tracing_module); diff --git a/lib/datadog/tracing/transport/native/response.rb b/lib/datadog/tracing/transport/native/response.rb new file mode 100644 index 0000000000..569daf9b07 --- /dev/null +++ b/lib/datadog/tracing/transport/native/response.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +module Datadog + module Tracing + module Transport + module Native + # Response from the native trace exporter. + # + # Constructed by the C extension after a send completes (or fails). + # Implements the same predicate interface as the HTTP transport's + # response so callers can treat both uniformly. + class Response + attr_reader :trace_count, :payload + + def initialize(ok:, internal_error: false, server_error: false, client_error: false, + not_found: false, unsupported: false, trace_count: 0, payload: nil) + @ok = ok + @internal_error = internal_error + @server_error = server_error + @client_error = client_error + @not_found = not_found + @unsupported = unsupported + @trace_count = trace_count + @payload = payload + end + + def ok? + @ok + end + + def internal_error? + @internal_error + end + + def server_error? + @server_error + end + + def client_error? + @client_error + end + + def not_found? + @not_found + end + + def unsupported? + @unsupported + end + end + end + end + end +end diff --git a/spec/datadog/tracing/transport/native/response_spec.rb b/spec/datadog/tracing/transport/native/response_spec.rb new file mode 100644 index 0000000000..5372e24acf --- /dev/null +++ b/spec/datadog/tracing/transport/native/response_spec.rb @@ -0,0 +1,79 @@ +# frozen_string_literal: true + +require 'datadog/core' + +RSpec.describe 'Datadog::Tracing::Transport::Native::Response' do + before do + skip_if_libdatadog_not_supported + end + + let(:native_module) { Datadog::Tracing::Transport::Native } + let(:response_class) { native_module::Response } + + # Response objects are created by the C extension (create_ok_response / + # create_error_response) and not directly by Ruby code. To test the + # reader methods we allocate an instance and set ivars manually, which + # mirrors what the C helpers do. + + def make_response(ok:, internal_error: false, server_error: false, + client_error: false, not_found: false, + unsupported: false, trace_count: 0, payload: nil) + resp = response_class.allocate + resp.instance_variable_set(:@ok, ok) + resp.instance_variable_set(:@internal_error, internal_error) + resp.instance_variable_set(:@server_error, server_error) + resp.instance_variable_set(:@client_error, client_error) + resp.instance_variable_set(:@not_found, not_found) + resp.instance_variable_set(:@unsupported, unsupported) + resp.instance_variable_set(:@trace_count, trace_count) + resp.instance_variable_set(:@payload, payload) + resp + end + + describe 'ok response' do + subject(:response) { make_response(ok: true, trace_count: 5, payload: '{"rate_by_service":{}}') } + + it { expect(response.ok?).to be true } + it { expect(response.internal_error?).to be false } + it { expect(response.server_error?).to be false } + it { expect(response.client_error?).to be false } + it { expect(response.not_found?).to be false } + it { expect(response.unsupported?).to be false } + it { expect(response.trace_count).to eq(5) } + it { expect(response.payload).to eq('{"rate_by_service":{}}') } + end + + describe 'internal error response' do + subject(:response) { make_response(ok: false, internal_error: true) } + + it { expect(response.ok?).to be false } + it { expect(response.internal_error?).to be true } + it { expect(response.server_error?).to be false } + it { expect(response.client_error?).to be false } + it { expect(response.payload).to be_nil } + end + + describe 'server error response' do + subject(:response) { make_response(ok: false, server_error: true) } + + it { expect(response.ok?).to be false } + it { expect(response.internal_error?).to be false } + it { expect(response.server_error?).to be true } + it { expect(response.client_error?).to be false } + end + + describe 'client error response' do + subject(:response) { make_response(ok: false, client_error: true) } + + it { expect(response.ok?).to be false } + it { expect(response.internal_error?).to be false } + it { expect(response.server_error?).to be false } + it { expect(response.client_error?).to be true } + end + + describe 'nil payload' do + subject(:response) { make_response(ok: true) } + + it { expect(response.payload).to be_nil } + end +end diff --git a/spec/datadog/tracing/transport/native/send_traces_spec.rb b/spec/datadog/tracing/transport/native/send_traces_spec.rb new file mode 100644 index 0000000000..7280f84a0d --- /dev/null +++ b/spec/datadog/tracing/transport/native/send_traces_spec.rb @@ -0,0 +1,200 @@ +# frozen_string_literal: true + +require 'datadog/core' +require 'datadog/tracing/span' +require 'socket' +require 'json' + +RSpec.describe 'Datadog::Tracing::Transport::Native::TraceExporter#_native_send_traces' do + before do + skip_if_libdatadog_not_supported + end + + let(:native_module) { Datadog::Tracing::Transport::Native } + let(:trace_exporter_class) { native_module::TraceExporter } + let(:response_class) { native_module::Response } + + # --------------------------------------------------------------------------- + # Minimal mock HTTP agent (TCP server) + # + # Accepts connections, reads the HTTP request, and replies with a + # configurable status and body. Handles both /info (used by the + # background agent-info worker) and trace endpoints. + # --------------------------------------------------------------------------- + + class MockAgent + attr_reader :port, :requests + + def initialize(status: 200, body: '{"rate_by_service":{"service:,env:":1.0}}') + @status = status + @body = body + @requests = [] + @server = TCPServer.new('127.0.0.1', 0) + @port = @server.addr[1] + @thread = Thread.new { run } + end + + def stop + @running = false + @server.close rescue nil + @thread.join(2) + end + + private + + def run + @running = true + while @running + client = @server.accept rescue break + handle(client) + end + end + + def handle(client) + request_line = client.gets + return client.close if request_line.nil? + + headers = {} + while (line = client.gets) && line != "\r\n" + key, value = line.split(': ', 2) + headers[key.downcase] = value&.strip + end + + body_len = (headers['content-length'] || 0).to_i + body = body_len > 0 ? client.read(body_len) : '' + + @requests << { request_line: request_line.strip, headers: headers, body: body } + + response_body = @body + client.print "HTTP/1.1 #{@status} OK\r\n" + client.print "Content-Length: #{response_body.bytesize}\r\n" + client.print "Content-Type: application/json\r\n" + client.print "\r\n" + client.print response_body + client.close + rescue => e + $stderr.puts "MockAgent error: #{e}" if ENV['DEBUG'] + client&.close + end + end + + # --------------------------------------------------------------------------- + # Helpers + # --------------------------------------------------------------------------- + + let(:mock_agent) { MockAgent.new } + after { mock_agent.stop } + + let(:exporter) do + trace_exporter_class._native_new( + url: "http://127.0.0.1:#{mock_agent.port}", + tracer_version: '1.0.0-test', + language: 'ruby', + language_version: RUBY_VERSION, + language_interpreter: RUBY_ENGINE, + hostname: 'test-host', + env: 'test-env', + service: 'test-service', + version: '0.0.1', + ) + end + + def make_span(name = 'test.op', **overrides) + defaults = { + service: 'test-svc', + resource: 'GET /test', + type: 'web', + id: rand(1 << 62), + parent_id: 0, + trace_id: rand(1 << 62), + } + Datadog::Tracing::Span.new(name, **defaults.merge(overrides)) + end + + # --------------------------------------------------------------------------- + # Tests + # --------------------------------------------------------------------------- + + describe 'with an empty array' do + it 'returns an empty array without contacting the server' do + responses = exporter._native_send_traces([]) + expect(responses).to eq([]) + end + end + + describe 'with a single trace containing one span' do + it 'returns a success response' do + spans = [make_span] + responses = exporter._native_send_traces([spans]) + + expect(responses).to be_an(Array) + expect(responses.length).to eq(1) + + resp = responses.first + expect(resp).to be_a(response_class) + expect(resp.ok?).to be true + expect(resp.internal_error?).to be false + expect(resp.server_error?).to be false + expect(resp.trace_count).to eq(1) + end + + it 'returns a payload with the agent response body' do + spans = [make_span] + responses = exporter._native_send_traces([spans]) + resp = responses.first + + # The payload should contain the mock agent's JSON body + expect(resp.payload).to be_a(String) + parsed = JSON.parse(resp.payload) + expect(parsed).to have_key('rate_by_service') + end + end + + describe 'with multiple trace chunks' do + it 'sends all chunks and returns a success response' do + chunk1 = [make_span('op1'), make_span('op2')] + chunk2 = [make_span('op3')] + responses = exporter._native_send_traces([chunk1, chunk2]) + + expect(responses.length).to eq(1) + expect(responses.first.ok?).to be true + expect(responses.first.trace_count).to eq(2) + end + end + + describe 'with spans containing meta and metrics' do + it 'does not raise' do + span = make_span + span.set_tag('http.method', 'GET') + span.set_tag('http.url', '/test') + span.set_metric('_dd.measured', 1.0) + + responses = exporter._native_send_traces([[span]]) + expect(responses.first.ok?).to be true + end + end + + describe 'when the agent returns an error' do + let(:mock_agent) { MockAgent.new(status: 500, body: '{"error":"server overloaded"}') } + + it 'returns an error response' do + responses = exporter._native_send_traces([[make_span]]) + + expect(responses.length).to eq(1) + resp = responses.first + expect(resp.ok?).to be false + # The error should be classified as server or internal + expect(resp.server_error? || resp.internal_error?).to be true + end + end + + describe 'argument validation' do + it 'raises TypeError for non-array argument' do + expect { exporter._native_send_traces('not an array') }.to raise_error(TypeError) + end + + it 'raises TypeError for non-array inner element' do + expect { exporter._native_send_traces(['not an array']) }.to raise_error(TypeError) + end + end +end diff --git a/spec/datadog/tracing/transport/native/trace_exporter_spec.rb b/spec/datadog/tracing/transport/native/trace_exporter_spec.rb new file mode 100644 index 0000000000..54d2f4a086 --- /dev/null +++ b/spec/datadog/tracing/transport/native/trace_exporter_spec.rb @@ -0,0 +1,89 @@ +# frozen_string_literal: true + +require 'datadog/core' + +RSpec.describe 'Datadog::Tracing::Transport::Native::TraceExporter' do + before do + skip_if_libdatadog_not_supported + end + + let(:native_module) { Datadog::Tracing::Transport::Native } + let(:trace_exporter_class) { native_module::TraceExporter } + + describe '._native_new' do + context 'with all string arguments' do + it 'creates an exporter' do + exporter = trace_exporter_class._native_new( + url: 'http://127.0.0.1:8126', + tracer_version: '1.0.0', + language: 'ruby', + language_version: RUBY_VERSION, + language_interpreter: RUBY_ENGINE, + hostname: 'testhost', + env: 'test', + service: 'testsvc', + version: '1.0', + ) + expect(exporter).to be_a(trace_exporter_class) + end + end + + context 'with nil for all optional arguments' do + it 'creates an exporter' do + exporter = trace_exporter_class._native_new( + url: 'http://127.0.0.1:8126', + tracer_version: nil, language: nil, language_version: nil, + language_interpreter: nil, hostname: nil, env: nil, + service: nil, version: nil, + ) + expect(exporter).to be_a(trace_exporter_class) + end + end + + context 'with a non-string url' do + it 'raises TypeError' do + expect { + trace_exporter_class._native_new( + url: 12345, + tracer_version: nil, language: nil, language_version: nil, + language_interpreter: nil, hostname: nil, env: nil, + service: nil, version: nil, + ) + }.to raise_error(TypeError) + end + end + + context 'with a non-string optional argument' do + it 'raises TypeError' do + expect { + trace_exporter_class._native_new( + url: 'http://127.0.0.1:8126', + tracer_version: 123, # should be String or nil + language: nil, language_version: nil, + language_interpreter: nil, hostname: nil, env: nil, + service: nil, version: nil, + ) + }.to raise_error(TypeError) + end + end + + it 'cannot be allocated directly' do + expect { trace_exporter_class.new }.to raise_error(TypeError) + end + + context 'GC safety' do + it 'does not crash when instances are garbage collected' do + 5.times do + trace_exporter_class._native_new( + url: 'http://127.0.0.1:8126', + tracer_version: nil, language: nil, language_version: nil, + language_interpreter: nil, hostname: nil, env: nil, + service: nil, version: nil, + ) + end + GC.start + GC.start + end + end + end +end diff --git a/spec/datadog/tracing/transport/native/tracer_span_spec.rb b/spec/datadog/tracing/transport/native/tracer_span_spec.rb new file mode 100644 index 0000000000..5a0be4a2d0 --- /dev/null +++ b/spec/datadog/tracing/transport/native/tracer_span_spec.rb @@ -0,0 +1,131 @@ +# frozen_string_literal: true + +require 'datadog/core' +require 'datadog/tracing/span' + +RSpec.describe 'Datadog::Tracing::Transport::Native::TracerSpan' do + before do + skip_if_libdatadog_not_supported + end + + let(:native_module) { Datadog::Tracing::Transport::Native } + let(:tracer_span_class) { native_module::TracerSpan } + + # --------------------------------------------------------------------------- + # Helper: create a populated Ruby span + # --------------------------------------------------------------------------- + + let(:now) { Time.now } + let(:trace_id_128bit) { (1 << 64) | 0xdeadbeef } + + def make_ruby_span(overrides = {}) + defaults = { + service: 'test-service', + resource: 'GET /test', + type: 'web', + id: 12345, + parent_id: 67890, + trace_id: trace_id_128bit, + start_time: now, + duration: 0.025, + status: 0, + meta: { 'http.method' => 'GET', 'http.url' => '/test' }, + metrics: { '_dd.measured' => 1.0, '_sampling_priority_v1' => 2.0 }, + } + Datadog::Tracing::Span.new('web.request', **defaults.merge(overrides)) + end + + # --------------------------------------------------------------------------- + # Tests + # --------------------------------------------------------------------------- + + describe '._native_from_span' do + context 'with a minimal span' do + it 'returns a TracerSpan' do + span = Datadog::Tracing::Span.new('test.op') + result = tracer_span_class._native_from_span(span) + expect(result).to be_a(tracer_span_class) + end + end + + context 'with all fields populated' do + it 'returns a TracerSpan' do + result = tracer_span_class._native_from_span(make_ruby_span) + expect(result).to be_a(tracer_span_class) + end + end + + context 'with nil-able string fields set to nil' do + it 'does not raise' do + span = make_ruby_span(service: nil, type: nil) + expect { tracer_span_class._native_from_span(span) }.not_to raise_error + end + end + + context 'with empty meta and metrics hashes' do + it 'does not raise' do + span = make_ruby_span(meta: {}, metrics: {}) + expect { tracer_span_class._native_from_span(span) }.not_to raise_error + end + end + + context 'with an unstarted span (no start_time or duration)' do + it 'does not raise' do + span = make_ruby_span(start_time: nil, duration: nil) + expect { tracer_span_class._native_from_span(span) }.not_to raise_error + end + end + + context 'with a 128-bit trace ID' do + it 'does not raise' do + big_id = (0xaabbccdd << 64) | 0x11223344 + span = make_ruby_span(trace_id: big_id) + expect { tracer_span_class._native_from_span(span) }.not_to raise_error + end + end + + context 'with a 64-bit trace ID (high bits zero)' do + it 'does not raise' do + span = make_ruby_span(trace_id: 42) + expect { tracer_span_class._native_from_span(span) }.not_to raise_error + end + end + + context 'with non-string meta values (mixed hash)' do + it 'silently skips non-string entries' do + span = make_ruby_span(meta: { 'good' => 'value', 'bad' => 123, nil => 'also_bad' }) + expect { tracer_span_class._native_from_span(span) }.not_to raise_error + end + end + + context 'with non-numeric metrics values (mixed hash)' do + it 'silently skips non-numeric entries' do + span = make_ruby_span(metrics: { '_dd.measured' => 1.0, 'bad' => 'string' }) + expect { tracer_span_class._native_from_span(span) }.not_to raise_error + end + end + + context 'when called multiple times on the same span' do + it 'returns independent instances' do + span = make_ruby_span + r1 = tracer_span_class._native_from_span(span) + r2 = tracer_span_class._native_from_span(span) + expect(r1).to be_a(tracer_span_class) + expect(r2).to be_a(tracer_span_class) + expect(r1).not_to equal(r2) + end + end + + context 'GC safety' do + it 'does not crash when instances are garbage collected' do + 20.times { tracer_span_class._native_from_span(make_ruby_span) } + GC.start + GC.start + end + end + + it 'cannot be allocated directly' do + expect { tracer_span_class.new }.to raise_error(TypeError) + end + end +end