From 2c7eb4b811dd20509c14c6101e97f8f18f3a4f0d Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Tue, 12 May 2026 23:45:11 -0700 Subject: [PATCH] stream: preserve toReadableSync batch after backpressure Keep the current batch and index across _read() calls so chunks that remain after push() returns false are emitted on later reads. Fixes: https://github.com/nodejs/node/issues/63275 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 --- lib/internal/streams/iter/classic.js | 22 +++++++++++++++---- test/parallel/test-stream-iter-to-readable.js | 16 ++++++++++++++ 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/lib/internal/streams/iter/classic.js b/lib/internal/streams/iter/classic.js index 18d1733d6ad648..854f761d071b1c 100644 --- a/lib/internal/streams/iter/classic.js +++ b/lib/internal/streams/iter/classic.js @@ -363,23 +363,37 @@ function toReadableSync(source, options = kNullPrototype) { const ReadableCtor = lazyReadable(); const iterator = source[SymbolIterator](); + let hasBatch = false; + let batch; + let batchIndex = 0; return new ReadableCtor({ __proto__: null, highWaterMark, read() { for (;;) { - const { value: batch, done } = iterator.next(); + if (hasBatch) { + while (batchIndex < batch.length) { + if (!this.push(batch[batchIndex++])) return; + } + batch = undefined; + hasBatch = false; + batchIndex = 0; + } + + const result = iterator.next(); + const { done } = result; if (done) { this.push(null); return; } - for (let i = 0; i < batch.length; i++) { - if (!this.push(batch[i])) return; - } + batch = result.value; + hasBatch = true; } }, destroy(err, cb) { + batch = undefined; + hasBatch = false; if (typeof iterator.return === 'function') iterator.return(); cb(err); }, diff --git a/test/parallel/test-stream-iter-to-readable.js b/test/parallel/test-stream-iter-to-readable.js index 4cb5600e3ba424..3f03090e30960c 100644 --- a/test/parallel/test-stream-iter-to-readable.js +++ b/test/parallel/test-stream-iter-to-readable.js @@ -439,6 +439,21 @@ async function testBackpressureSync() { assert.strictEqual(chunks.length, 10); } +// ============================================================================= +// fromStreamIterSync: backpressure within a batch +// ============================================================================= + +async function testBackpressureSyncMultiChunkBatch() { + function* gen() { + yield [Buffer.from('a'), Buffer.from('b'), Buffer.from('c')]; + } + + const readable = toReadableSync(gen(), { highWaterMark: 1 }); + const result = await collect(readable); + + assert.strictEqual(result.toString(), 'abc'); +} + // ============================================================================= // fromStreamIterSync: source error // ============================================================================= @@ -613,6 +628,7 @@ Promise.all([ testWithTransformAsync(), testBasicSync(), testBackpressureSync(), + testBackpressureSyncMultiChunkBatch(), testErrorSync(), testDestroySync(), testRoundTrip(),