Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changeset/stream-ping-timeout-noise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
"@vercel/flags-core": patch
---

Fix stream reconnect path producing noisy `AbortError` spans on instrumented
fetches.

When the ping watchdog fires (no ping received within the timeout), the SDK now
cancels the body reader instead of aborting the fetch signal. The read loop
exits via `{ done: true }` and the fetch span closes cleanly, so APM/error
tracking (e.g. `@vercel/otel/fetch`) no longer reports the expected reconnect
as `AbortError: This operation was aborted`. Reconnect behavior is unchanged.
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,57 @@ describe('connectStream', () => {
abortController.abort();
});

it('should not abort the fetch signal when ping timeout fires', async () => {
// Regression: aborting the fetch signal surfaced as AbortError on
// instrumented fetch spans (e.g. @vercel/otel/fetch) and got reported
// by APM/error tracking even though this is an expected reconnect.
// The ping timeout must instead cancel the body reader, leaving the
// fetch span to close cleanly.
let requestCount = 0;
const signals: AbortSignal[] = [];

fetchMock.mockImplementation((_input, init) => {
requestCount++;
if (init?.signal) signals.push(init.signal);
return streamResponse(
new ReadableStream({
start(controller) {
controller.enqueue(
new TextEncoder().encode(`${JSON.stringify(datafileMsg())}\n`),
);
// Simulate a zombie connection (server stops sending pings).
init?.signal?.addEventListener('abort', () => {
controller.close();
});
},
}),
);
});

const abortController = new AbortController();

await connectStream(
{ host: HOST, sdkKey: 'vf_test', abortController, fetch: fetchMock },
{ onDatafile: vi.fn() },
);

expect(requestCount).toBe(1);

// Trigger the ping timeout.
await vi.advanceTimersByTimeAsync(90_000);
await vi.advanceTimersByTimeAsync(0);
// Advance past the reconnection backoff (min 1s gap).
await vi.advanceTimersByTimeAsync(1_000);
await vi.advanceTimersByTimeAsync(0);

// Reconnected, but the first request's signal was never aborted —
// only the body reader was cancelled.
expect(requestCount).toBeGreaterThanOrEqual(2);
expect(signals[0]?.aborted).toBe(false);

abortController.abort();
});

it('should reset timeout on each ping', async () => {
let streamController: ReadableStreamDefaultController<Uint8Array>;

Expand Down
54 changes: 27 additions & 27 deletions packages/vercel-flags-core/src/controller/stream-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,29 @@ export async function connectStream(
break;
}

// Per-connection abort controller — allows ping timeout to abort a single
// connection without stopping the entire retry loop.
const connectionAbort = new AbortController();
const onMainAbort = (): void => connectionAbort.abort();
// Per-connection abort controller for the fetch signal. Forwarded from
// the external main abort so a hung fetch (e.g. before headers arrive)
// can still be cancelled on shutdown.
const fetchAbort = new AbortController();
const onMainAbort = (): void => fetchAbort.abort();
abortController.signal.addEventListener('abort', onMainAbort, {
once: true,
});

// Tracks the in-flight body reader so the ping timeout can cancel reads
// gracefully. Cancelling the reader breaks the read loop via
// `{ done: true }` without aborting the fetch signal — the fetch span
// ends cleanly. Aborting the signal would surface as AbortError on
// instrumented fetch spans (e.g. via @vercel/otel/fetch) and get
// reported by APM/error tracking even though this is an expected
// reconnect path.
let reader: ReadableStreamDefaultReader<Uint8Array> | undefined;
let pingTimeoutId: ReturnType<typeof setTimeout> | undefined;
// Reference to the response body so the ping timeout can cancel it
// to break out of the for-await loop.
let responseBody: ReadableStream<Uint8Array> | undefined;
const resetPingTimeout = (): void => {
if (pingTimeoutId !== undefined) clearTimeout(pingTimeoutId);
if (!initialDataReceived) return;
pingTimeoutId = setTimeout(() => {
responseBody?.cancel().catch(() => {});
connectionAbort.abort();
reader?.cancel().catch(() => {});
}, PING_TIMEOUT_MS);
};

Expand All @@ -129,7 +134,7 @@ export async function connectStream(
}
const response = await fetchFn(`${host}/v1/stream`, {
headers,
signal: connectionAbort.signal,
signal: fetchAbort.signal,
});

if (!response.ok) {
Expand All @@ -148,17 +153,18 @@ export async function connectStream(
throw new Error('stream body was not present');
}

responseBody = response.body;
const reader = response.body.getReader();
reader = response.body.getReader();
const decoder = new TextDecoder();
const bufferChunks: string[] = [];

// Allow the ping timeout (or main abort) to cancel the reader,
// which breaks the read loop immediately even on a zombie connection.
const onConnectionAbort = (): void => {
reader.cancel().catch(() => {});
// Ensure the read loop exits promptly when the fetch signal is
// aborted, even on zombie connections where the body doesn't observe
// the abort. Registered after getReader() so any listeners attached
// by the underlying source (e.g. test mocks) fire first.
const onFetchAbort = (): void => {
reader?.cancel().catch(() => {});
};
connectionAbort.signal.addEventListener('abort', onConnectionAbort, {
fetchAbort.signal.addEventListener('abort', onFetchAbort, {
once: true,
});

Expand Down Expand Up @@ -218,13 +224,11 @@ export async function connectStream(
}
}
} finally {
connectionAbort.signal.removeEventListener(
'abort',
onConnectionAbort,
);
fetchAbort.signal.removeEventListener('abort', onFetchAbort);
}

// Stream ended normally (server closed connection) - reconnect
// Stream ended (server closed, ping timeout cancelled the reader,
// or external abort). Either reconnect or exit the outer loop.
clearTimeout(pingTimeoutId);
abortController.signal.removeEventListener('abort', onMainAbort);
if (!abortController.signal.aborted) {
Expand All @@ -241,11 +245,7 @@ export async function connectStream(
if (abortController.signal.aborted) {
break;
}
// Ping timeout aborts only the per-connection controller; this is
// an expected reconnect, not a real error — skip the noisy log.
if (!connectionAbort.signal.aborted) {
console.error('@vercel/flags-core: Stream error', error);
}
console.error('@vercel/flags-core: Stream error', error);
onDisconnect?.();
retryCount++;
const elapsed = Date.now() - lastAttemptTime;
Expand Down
Loading