diff --git a/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueue.java b/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueue.java index ecfe88eb..ccd3e91d 100644 --- a/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueue.java +++ b/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueue.java @@ -54,7 +54,7 @@ public int compareTo(Key o) { final long maxTries; final WhenFull whenFull; - final TreeMap items = new TreeMap<>(); + final TreeMap items = new TreeMap<>(); final Telemetry telemetry; final LongSupplier nanos; @@ -88,14 +88,14 @@ private void recordDrop(long bytes) { droppedBytes += bytes; } - void add(byte[] item) throws InterruptedException { + void add(Payload item) throws InterruptedException { put(null, item, whenFull); } - void requeue(Map.Entry item) throws InterruptedException { + void requeue(Map.Entry item) throws InterruptedException { Key nextKey = item.getKey().next(); if (nextKey.tries > maxTries) { - telemetry.onDrop(1, item.getValue().length); + telemetry.onDrop(1, item.getValue().bytes.length); return; } put(nextKey, item.getValue(), WhenFull.DROP); @@ -107,15 +107,15 @@ private Key newKey() { return new Key(0, clock, nanos.getAsLong()); } - private void put(Key key, byte[] item, WhenFull whenFull) throws InterruptedException { + private void put(Key key, Payload item, WhenFull whenFull) throws InterruptedException { lock.lock(); try { if (key == null) { key = newKey(); } - ensureSpace(item.length, whenFull); + ensureSpace(item.bytes.length, whenFull); items.put(key, item); - bytes += item.length; + bytes += item.bytes.length; notEmpty.signal(); } finally { long droppedPayloads = this.droppedPayloads; @@ -135,9 +135,9 @@ private void ensureSpace(int length, WhenFull whenFull) throws InterruptedExcept while (bytes + length > maxBytes) { switch (whenFull) { case DROP: - Map.Entry last = items.pollLastEntry(); - bytes -= last.getValue().length; - recordDrop(last.getValue().length); + Map.Entry last = items.pollLastEntry(); + recordDrop(last.getValue().bytes.length); + bytes -= last.getValue().bytes.length; break; case BLOCK: notFull.await(); @@ -146,14 +146,14 @@ private void ensureSpace(int length, WhenFull whenFull) throws InterruptedExcept } } - Map.Entry next() throws InterruptedException { + Map.Entry next() throws InterruptedException { lock.lock(); try { while (items.size() == 0) { notEmpty.await(); } - Map.Entry item = items.pollFirstEntry(); - bytes -= item.getValue().length; + Map.Entry item = items.pollFirstEntry(); + bytes -= item.getValue().bytes.length; notFull.signalAll(); return item; } finally { diff --git a/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/Forwarder.java b/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/Forwarder.java index 2c7b0844..618babbd 100644 --- a/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/Forwarder.java +++ b/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/Forwarder.java @@ -24,14 +24,13 @@ /** * An HTTP forwarder that delivers DogStatsD HTTP payloads to a remote endpoint. * - *

Payloads are enqueued via {@link #send(byte[])} and delivered asynchronously by a background - * thread. Failed requests are retried with exponential back-off up to {@code maxTries} attempts - * before being discarded. + *

Payloads are enqueued via {@link #send(URI, byte[])} and delivered asynchronously by a + * background thread. Failed requests are retried with exponential back-off up to {@code maxTries} + * attempts before being discarded. */ public class Forwarder extends Thread { static final Logger logger = Logger.getLogger(Forwarder.class.getName()); final BoundedQueue queue; - final URI url; final HttpClient client; final Duration requestTimeout; final Random rng = new Random(); @@ -42,9 +41,8 @@ public class Forwarder extends Thread { final Telemetry telemetry; /** - * Creates a new forwarder targeting the given URL. + * Creates a new forwarder. * - * @param url the remote HTTP endpoint to POST payloads to * @param maxRequestsBytes maximum total size of buffered payloads, in bytes * @param maxTries maximum number of delivery attempts per payload * @param whenFull action to take when the queue is at capacity @@ -53,13 +51,11 @@ public class Forwarder extends Thread { * {@code null} disables the request timeout */ public Forwarder( - URI url, long maxRequestsBytes, long maxTries, WhenFull whenFull, Duration connectTimeout, Duration requestTimeout) { - this.url = url; this.telemetry = new Telemetry(); this.queue = new BoundedQueue(maxRequestsBytes, maxTries, whenFull, this.telemetry); this.requestTimeout = requestTimeout; @@ -91,26 +87,30 @@ public void run() { } /** - * Enqueues a payload for delivery to the remote endpoint. + * Enqueues a payload for delivery to the given endpoint. * *

If the queue is full, behaviour is determined by the {@link WhenFull} policy supplied at * construction time. * + * @param url the remote HTTP endpoint to POST the payload to * @param payload the raw bytes to deliver * @throws InterruptedException if the calling thread is interrupted while waiting for space * ({@link WhenFull#BLOCK} mode only) */ - public void send(byte[] payload) throws InterruptedException { - queue.add(payload); + public void send(URI url, byte[] payload) throws InterruptedException { + queue.add(new Payload(url, payload)); telemetry.onEnqueue(payload.length); } - void runOnce(Map.Entry item) throws InterruptedException { - byte[] payload = item.getValue(); - logger.log(Level.INFO, "sending {0} bytes", payload.length); + void runOnce(Map.Entry item) throws InterruptedException { + Payload payload = item.getValue(); + logger.log( + Level.INFO, + "sending {0} bytes to {1}", + new Object[] {payload.bytes.length, payload.url}); HttpRequest.Builder builder = - HttpRequest.newBuilder(url).POST(BodyPublishers.ofByteArray(payload)); + HttpRequest.newBuilder(payload.url).POST(BodyPublishers.ofByteArray(payload.bytes)); if (requestTimeout != null) { builder.timeout(requestTimeout); } @@ -138,9 +138,9 @@ void runOnce(Map.Entry item) throws InterruptedExcepti backoff(); } - void handleResponse(int code, Map.Entry item) + void handleResponse(int code, Map.Entry item) throws InterruptedException { - int len = item.getValue().length; + int len = item.getValue().bytes.length; switch (code) { case 400: telemetry.onResponse(code, len, false); @@ -158,9 +158,9 @@ void handleResponse(int code, Map.Entry item) } } - void handleTransportError(Map.Entry item) + void handleTransportError(Map.Entry item) throws InterruptedException { - telemetry.onTransportError(item.getValue().length); + telemetry.onTransportError(item.getValue().bytes.length); increaseBackoff(); queue.requeue(item); } diff --git a/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/Payload.java b/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/Payload.java new file mode 100644 index 00000000..22c3c938 --- /dev/null +++ b/dogstatsd-http-forwarder/src/main/java/com/datadoghq/dogstatsd/http/forwarder/Payload.java @@ -0,0 +1,20 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.forwarder; + +import java.net.URI; + +class Payload { + final URI url; + final byte[] bytes; + + Payload(URI url, byte[] bytes) { + this.url = url; + this.bytes = bytes; + } +} diff --git a/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueueTest.java b/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueueTest.java index 981feeb2..c2ab6f89 100644 --- a/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueueTest.java +++ b/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/BoundedQueueTest.java @@ -9,11 +9,14 @@ import static org.junit.Assert.*; +import java.net.URI; import java.util.Arrays; import java.util.Map; import org.junit.Test; public class BoundedQueueTest { + private static final URI URL = URI.create("http://example.invalid/"); + private static byte[] bytes(int n) { return bytes(n, (byte) 0); } @@ -24,14 +27,22 @@ private static byte[] bytes(int n, byte v) { return b; } + private static Payload payload(int n) { + return new Payload(URL, bytes(n)); + } + + private static Payload payload(byte[] b) { + return new Payload(URL, b); + } + // --- Round-trip / bytes tracking --- @Test public void addThenNextReturnsItem() throws InterruptedException { BoundedQueue q = new BoundedQueue(10, 1, WhenFull.DROP, new Telemetry()); - byte[] item = bytes(4); + Payload item = payload(4); q.add(item); - Map.Entry entry = q.next(); + Map.Entry entry = q.next(); assertSame(item, entry.getValue()); assertEquals(0, q.bytes); } @@ -39,9 +50,9 @@ public void addThenNextReturnsItem() throws InterruptedException { @Test public void bytesDecrementedOnNext() throws InterruptedException { BoundedQueue q = new BoundedQueue(30, 1, WhenFull.DROP, new Telemetry()); - q.add(bytes(3)); - q.add(bytes(3)); - q.add(bytes(3)); + q.add(payload(3)); + q.add(payload(3)); + q.add(payload(3)); assertEquals(9, q.bytes); q.next(); assertEquals(6, q.bytes); @@ -54,9 +65,9 @@ public void bytesDecrementedOnNext() throws InterruptedException { @Test public void newestItemDequeuesFirstWithinSameTries() throws InterruptedException { BoundedQueue q = new BoundedQueue(30, 1, WhenFull.DROP, new Telemetry()); - byte[] a = {1}; - byte[] b = {2}; - byte[] c = {3}; + Payload a = payload(new byte[] {1}); + Payload b = payload(new byte[] {2}); + Payload c = payload(new byte[] {3}); q.add(a); q.add(b); q.add(c); @@ -72,11 +83,11 @@ public void newestItemDequeuesFirstWithinSameTries() throws InterruptedException public void dropWhenFullDropsOldestItem() throws InterruptedException { Telemetry t = new Telemetry(); BoundedQueue q = new BoundedQueue(10, 1, WhenFull.DROP, t); - byte[] a = bytes(5); // added first → smallest clock → last in TreeMap - byte[] b = bytes(4); + Payload a = payload(5); // added first → smallest clock → last in TreeMap + Payload b = payload(4); q.add(a); // queue: a(clock=MIN+1) q.add(b); // queue full: a, b - byte[] c = bytes(3); + Payload c = payload(3); q.add(c); // a (oldest, last entry) evicted Telemetry.Snapshot s = t.snapshot(q); assertEquals(1, s.droppedPayloads); @@ -90,9 +101,9 @@ public void dropWhenFullDropsOldestItem() throws InterruptedException { public void dropCountersAccumulate() throws InterruptedException { Telemetry t = new Telemetry(); BoundedQueue q = new BoundedQueue(5, 1, WhenFull.DROP, t); - q.add(bytes(5)); // fills queue (X) - q.add(bytes(5)); // X dropped (Y in) - q.add(bytes(5)); // Y dropped (Z in) + q.add(payload(5)); // fills queue (X) + q.add(payload(5)); // X dropped (Y in) + q.add(payload(5)); // Y dropped (Z in) Telemetry.Snapshot s = t.snapshot(q); assertEquals(2, s.droppedPayloads); assertEquals(10, s.droppedBytes); @@ -101,8 +112,8 @@ public void dropCountersAccumulate() throws InterruptedException { @Test(timeout = 3000) public void dropDoesNotBlock() throws InterruptedException { BoundedQueue q = new BoundedQueue(5, 1, WhenFull.DROP, new Telemetry()); - q.add(bytes(5)); // fill - q.add(bytes(5)); // should return immediately via DROP + q.add(payload(5)); // fill + q.add(payload(5)); // should return immediately via DROP } // --- WhenFull.BLOCK --- @@ -111,13 +122,13 @@ public void dropDoesNotBlock() throws InterruptedException { public void blockUnblocksWhenSpaceFreed() throws InterruptedException { Telemetry t = new Telemetry(); BoundedQueue q = new BoundedQueue(5, 1, WhenFull.BLOCK, t); - q.add(bytes(5)); // queue full + q.add(payload(5)); // queue full Thread producer = new Thread( () -> { try { - q.add(bytes(5)); + q.add(payload(5)); } catch (InterruptedException e) { return; } @@ -140,20 +151,20 @@ public void blockUnblocksWhenSpaceFreed() throws InterruptedException { @Test(expected = IllegalArgumentException.class) public void addThrowsForOversizedItem() throws InterruptedException { BoundedQueue q = new BoundedQueue(4, 1, WhenFull.DROP, new Telemetry()); - q.add(bytes(5)); + q.add(payload(5)); } @Test public void requeueIncrementsTriesPreservesClock() throws InterruptedException { BoundedQueue q = new BoundedQueue(20, 3, WhenFull.DROP, new Telemetry()); - q.add(bytes(4)); - Map.Entry entry = q.next(); + q.add(payload(4)); + Map.Entry entry = q.next(); assertEquals(0, entry.getKey().tries); long originalClock = entry.getKey().clock; long originalEnqueued = entry.getKey().enqueuedAtNanos; q.requeue(entry); - Map.Entry requeued = q.next(); + Map.Entry requeued = q.next(); assertEquals(1, requeued.getKey().tries); assertEquals(originalClock, requeued.getKey().clock); assertEquals(originalEnqueued, requeued.getKey().enqueuedAtNanos); @@ -163,10 +174,10 @@ public void requeueIncrementsTriesPreservesClock() throws InterruptedException { public void requeuedItemDequeuesAfterFreshItems() throws InterruptedException { MockNanos nanos = new MockNanos(0); BoundedQueue q = new BoundedQueue(20, 3, WhenFull.DROP, new Telemetry(), nanos); - byte[] a = {10}; - byte[] b = {20}; + Payload a = payload(new byte[] {10}); + Payload b = payload(new byte[] {20}); q.add(a); // A.enqueuedAtNanos = 0 - Map.Entry entryA = q.next(); + Map.Entry entryA = q.next(); q.requeue(entryA); // A now has tries=1 nanos.advanceMillis(5); @@ -186,8 +197,8 @@ public void requeuedItemDequeuesAfterFreshItems() throws InterruptedException { public void requeueAtMaxTriesIsAccepted() throws InterruptedException { Telemetry t = new Telemetry(); BoundedQueue q = new BoundedQueue(20, 2, WhenFull.DROP, t); - q.add(bytes(3)); - Map.Entry e = q.next(); + q.add(payload(3)); + Map.Entry e = q.next(); q.requeue(e); // tries → 1 e = q.next(); q.requeue(e); // tries → 2 == maxTries, should be accepted @@ -199,9 +210,8 @@ public void requeueAtMaxTriesIsAccepted() throws InterruptedException { public void requeuePastMaxTriesDropsItem() throws InterruptedException { Telemetry t = new Telemetry(); BoundedQueue q = new BoundedQueue(20, 2, WhenFull.DROP, t); - byte[] item = bytes(7); - q.add(item); - Map.Entry e = q.next(); + q.add(payload(7)); + Map.Entry e = q.next(); q.requeue(e); // tries → 1 e = q.next(); q.requeue(e); // tries → 2 == maxTries, accepted @@ -230,8 +240,8 @@ public void snapshotEmptyQueue() { @Test public void snapshotReports() throws InterruptedException { BoundedQueue q = new BoundedQueue(30, 1, WhenFull.DROP, new Telemetry()); - q.add(bytes(3)); - q.add(bytes(5)); + q.add(payload(3)); + q.add(payload(5)); Telemetry.Snapshot s = new Telemetry.Snapshot(0L); q.snapshot(System.nanoTime(), s); @@ -245,8 +255,8 @@ public void snapshotReports() throws InterruptedException { @Test(timeout = 5000) public void nextBlocksUntilItemAdded() throws InterruptedException { BoundedQueue q = new BoundedQueue(100, 1, WhenFull.DROP, new Telemetry()); - byte[] item = bytes(3); - Map.Entry[] result = new Map.Entry[1]; + Payload item = payload(3); + Map.Entry[] result = new Map.Entry[1]; Thread consumer = new Thread( diff --git a/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/ForwarderTest.java b/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/ForwarderTest.java index 8b0e194d..4f80b46c 100644 --- a/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/ForwarderTest.java +++ b/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/ForwarderTest.java @@ -17,22 +17,17 @@ import org.junit.Test; public class ForwarderTest { + private static final URI URL = URI.create("http://localhost:0/"); private static Forwarder newForwarder(long maxBytes, WhenFull whenFull) { - return new Forwarder( - URI.create("http://localhost:0/"), - maxBytes, - 1, - whenFull, - Duration.ofSeconds(1), - Duration.ofSeconds(1)); + return new Forwarder(maxBytes, 1, whenFull, Duration.ofSeconds(1), Duration.ofSeconds(1)); } @Test public void sendCountsEnqueue() throws InterruptedException { Forwarder f = newForwarder(100, WhenFull.DROP); - f.send(new byte[7]); - f.send(new byte[3]); + f.send(URL, new byte[7]); + f.send(URL, new byte[3]); Telemetry.Snapshot s = f.snapshot(); assertEquals(2, s.enqueuedPayloads); assertEquals(10, s.enqueuedBytes); @@ -41,7 +36,7 @@ public void sendCountsEnqueue() throws InterruptedException { @Test public void oversizedSendDoesNotCount() { Forwarder f = newForwarder(10, WhenFull.DROP); - assertThrows(IllegalArgumentException.class, () -> f.send(new byte[11])); + assertThrows(IllegalArgumentException.class, () -> f.send(URL, new byte[11])); Telemetry.Snapshot s = f.snapshot(); assertEquals(0, s.enqueuedPayloads); assertEquals(0, s.enqueuedBytes); @@ -51,8 +46,8 @@ public void oversizedSendDoesNotCount() { @Test public void handle400() throws Exception { Forwarder f = newForwarder(100, WhenFull.DROP); - f.send(new byte[7]); - Map.Entry item = f.queue.next(); + f.send(URL, new byte[7]); + Map.Entry item = f.queue.next(); f.handleResponse(400, item); Telemetry.Snapshot s = f.snapshot(); assertEquals(1, s.enqueuedPayloads); @@ -65,8 +60,8 @@ public void handle400() throws Exception { @Test public void handle200() throws Exception { Forwarder f = newForwarder(100, WhenFull.DROP); - f.send(new byte[7]); - Map.Entry item = f.queue.next(); + f.send(URL, new byte[7]); + Map.Entry item = f.queue.next(); f.handleResponse(200, item); Telemetry.Snapshot s = f.snapshot(); assertEquals(1, s.deliveredPayloads); @@ -83,8 +78,8 @@ public void handle200() throws Exception { @Test public void handleError() throws Exception { Forwarder f = newForwarder(100, WhenFull.DROP); - f.send(new byte[7]); - Map.Entry item = f.queue.next(); + f.send(URL, new byte[7]); + Map.Entry item = f.queue.next(); f.handleTransportError(item); Telemetry.Snapshot s = f.snapshot(); assertEquals(0, s.deliveredPayloads); @@ -101,8 +96,8 @@ public void handleError() throws Exception { @Test public void handle500() throws Exception { Forwarder f = newForwarder(100, WhenFull.DROP); - f.send(new byte[7]); - Map.Entry item = f.queue.next(); + f.send(URL, new byte[7]); + Map.Entry item = f.queue.next(); f.handleResponse(500, item); Telemetry.Snapshot s = f.snapshot(); assertEquals(0, s.deliveredPayloads); diff --git a/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/TelemetryTest.java b/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/TelemetryTest.java index e41edbf2..7b88c94d 100644 --- a/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/TelemetryTest.java +++ b/dogstatsd-http-forwarder/src/test/java/com/datadoghq/dogstatsd/http/forwarder/TelemetryTest.java @@ -12,11 +12,13 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.net.URI; import java.time.Clock; import java.time.Instant; import org.junit.Test; public class TelemetryTest { + private static final URI URL = URI.create("http://example.invalid/"); private static void assertCounters( long expectedPayloads, long expectedBytes, Telemetry.Snapshot.CodeCounters c) { @@ -80,8 +82,8 @@ public void responses() { public void queueState() throws InterruptedException { Telemetry t = new Telemetry(); BoundedQueue q = new BoundedQueue(100, 1, WhenFull.DROP, t); - q.add(new byte[5]); - q.add(new byte[7]); + q.add(new Payload(URL, new byte[5])); + q.add(new Payload(URL, new byte[7])); Telemetry.Snapshot s = t.snapshot(q); assertEquals(2, s.queuePayloads); assertEquals(12, s.queueBytes);