Conversation
The output and input batchers accumulate upstream transactions while buffering messages, then acknowledge them once the resulting batch has been written. When a flush produced no batch -- because the batching processors filtered every message away, or because they returned an error -- the loop continued without resolving the already accumulated transactions. Those transactions lingered and were later acknowledged with the result of a subsequent, unrelated batch, which could acknowledge data that was never delivered. policy.Flush now returns (batch, error) to distinguish three outcomes: a batch ready to send onwards, an intentionally empty result (the buffered transactions are acked as successfully consumed), and a processing failure (the buffered transactions are nacked so the source can retry them). Both batchers resolve their pending transactions immediately in the latter two cases rather than deferring them to a future batch's result. Adds a regression test covering the misattributed acknowledgement.
| flushBatchFn := func() { | ||
| sendMsg := m.batcher.Flush(closeNowCtx) | ||
| sendMsg, err := m.batcher.Flush(closeNowCtx) | ||
| if err != nil { | ||
| // The batching processors failed and the batch has been dropped. | ||
| // Nack the accumulated transactions so the source can retry them. | ||
| resolvePending(err) | ||
| return | ||
| } | ||
| if sendMsg == nil { | ||
| // No batch produced (the policy was empty or every message was | ||
| // intentionally filtered away); the data was consumed successfully. | ||
| resolvePending(nil) | ||
| return | ||
| } |
There was a problem hiding this comment.
The input batcher received the same behavioral fix as the output batcher — flushBatchFn now nacks pending transactions on a processor error and acks them on an empty/filtered flush via resolvePending. However, only the output batcher gained a regression test (TestBatcherDroppedBatchMisattributesAck); these new branches in the input batcher have no coverage. Consider adding a mirrored regression test (e.g. a count-triggered batch whose processor filters every message away, followed by a later batch that nacks, asserting the filtered transactions resolve with nil independently) so a future regression in the input path is caught too.
Per the project test patterns, changed code should be accompanied by tests exercising the new behavior.
There was a problem hiding this comment.
Added in c03f060: TestBatcherDroppedBatchMisattributesAck in the input batcher test, mirroring the output batcher's. It drives a count-triggered batch whose processor filters every message away, then a later batch that the consumer nacks, and asserts the filtered transactions resolve with nil independently. Verified it fails against the pre-fix code and passes with the fix.
|
Commits Review
|
When does this affect a pipeline in practice?The most likely real-world exposure is on shutdown / restart. The unresolved-transaction path is reached whenever a flush yields no batch, and during a graceful shutdown the flush context can be cancelled while the batching processors are still running — leaving transactions accumulated in that codepath. This is most probable when the batching policy is large and the batch processors are slow enough to be the pipeline's bottleneck: at any given instant during the drain there is a high chance messages are sitting mid-processing. When the Steady-state pipelines with cheap, fast batch processing are far less likely to observe this, since the window in which messages occupy that codepath is vanishingly small. Follow-up correction. On further investigation the data-loss scenario above does not actually trigger in the current code, and I would like to correct the record. For a batch to be dropped (rather than written) due to a failure,
The flush here uses the hard-stop context, so a cancelled flush coincides with a hard stop, at which point the batcher loop exits before any subsequent batch is produced. The accumulated transactions are therefore never acknowledged and are redelivered on restart — at-least-once is preserved, not violated. The only situation in which the loop continues after an empty flush is the intentional-filter case, where acknowledging is correct. So the real-world symptoms of this bug are an acknowledgement-accounting issue: mis-attributed / delayed acknowledgements for filtered batches, possible spurious redelivery if a later batch is nacked, and — if a pipeline goes idle immediately after a filtered batch — transactions left unacknowledged, holding The data-loss risk becomes real only once batching-processor errors are routed into the dropped-batch path (the separate, upcoming output-error-rejection change). This PR's value is correcting the acknowledgement accounting and staging that nack path correctly; it does not fix a present-day data-loss bug. |
| if err != nil { | ||
| p.log.Error("Batch processors resulted in error: %v, the batch has been dropped.", err) | ||
| return nil | ||
| return nil, err |
There was a problem hiding this comment.
Would it be worth adding context to this error message so it can be related to the error log above?
There was a problem hiding this comment.
Good call — done in c03f060. The returned error is now wrapped as fmt.Errorf("batch processors resulted in error: %w", err), so the nack reason surfaced downstream lines up with the logged message just above it.
- Wrap the error returned from the batch policy flush with context so the nack reason surfaced downstream relates to the logged error. - Add a regression test for the input batcher mirroring the output batcher's, covering the new nack-on-error and ack-on-empty-flush branches.
|
Commits
Review The fix is sound. LGTM |
Summary
The output and input batchers accumulate upstream transactions while buffering messages, then acknowledge them once the resulting batch has been written. When a flush produced no batch — because the batching processors filtered every message away, or because they returned an error — the loop continued without resolving the already-accumulated transactions. Those transactions lingered and were later acknowledged with the result of a subsequent, unrelated batch.
In the current code the practical symptoms are an acknowledgement-accounting problem rather than data loss: transactions belonging to an empty/filtered flush are resolved against a later batch's result (acked late, or spuriously nacked and redelivered if that later batch fails), and — if a pipeline goes idle immediately after a filtered batch — those transactions are left unacknowledged, holding
max_in_flightslots and blocking offset commits. A genuine data-loss scenario only arises once batching-processor errors are routed into the dropped-batch path, which is a separate, upcoming change; this PR also stages the correct nack behaviour for it.Change
policy.Flushnow returns(message.Batch, error)to distinguish three outcomes:(batch, nil)(nil, nil)(nil, err)Both the output and input batchers resolve their pending transactions immediately in the latter two cases rather than deferring them to a future batch's result. The public
service.Batcher.Flushwrapper (already(batch, error)) now propagates the error rather than discarding it.Tests
Adds
TestBatcherDroppedBatchMisattributesAckto both the output and input batcher suites, asserting that transactions belonging to a flush that yields no batch are resolved against that flush rather than inheriting a later batch's result. Existing batcher and batch-policy suites updated for the new signature;internal/impl/pure,internal/impl/io, andinternal/streamsuites pass unchanged.🤖 Generated with Claude Code