diff --git a/internal/batch/policy/policy.go b/internal/batch/policy/policy.go index d526acea7..f24152e2b 100644 --- a/internal/batch/policy/policy.go +++ b/internal/batch/policy/policy.go @@ -128,12 +128,23 @@ func (p *Batcher) Add(part *message.Part) bool { return p.triggered || (p.period > 0 && time.Since(p.lastBatch) > p.period) } -// Flush clears all messages stored by this batch policy. Returns nil if the -// policy is currently empty. -func (p *Batcher) Flush(ctx context.Context) message.Batch { +// Flush clears all messages stored by this batch policy and applies any +// batching processors to them. The returned values distinguish three outcomes: +// +// - (batch, nil): a batch is ready to be sent onwards. +// - (nil, nil): the policy is currently empty, or the batching processors +// intentionally filtered every message away. Callers should treat the +// buffered transactions as successfully consumed (ack). +// - (nil, err): the batching processors failed and the batch has been +// dropped. Callers should treat the buffered transactions as failed (nack) +// so the source can retry them, rather than passing unprocessed data on. +func (p *Batcher) Flush(ctx context.Context) (message.Batch, error) { var newMsg message.Batch - resultMsgs := p.flushAny(ctx) + resultMsgs, err := p.flushAny(ctx) + if err != nil { + return nil, err + } if len(resultMsgs) == 1 { newMsg = resultMsgs[0] } else if len(resultMsgs) > 1 { @@ -144,10 +155,10 @@ func (p *Batcher) Flush(ctx context.Context) message.Batch { }) } } - return newMsg + return newMsg, nil } -func (p *Batcher) flushAny(ctx context.Context) []message.Batch { +func (p *Batcher) flushAny(ctx context.Context) ([]message.Batch, error) { var newMsg message.Batch if len(p.parts) > 0 { if !p.triggered && p.period > 0 && time.Since(p.lastBatch) > p.period { @@ -162,19 +173,19 @@ func (p *Batcher) flushAny(ctx context.Context) []message.Batch { p.triggered = false if newMsg == nil { - return nil + return nil, nil } if len(p.procs) > 0 { resultMsgs, err := iprocessor.ExecuteAll(ctx, p.procs, newMsg) if err != nil { p.log.Error("Batch processors resulted in error: %v, the batch has been dropped.", err) - return nil + return nil, fmt.Errorf("batch processors resulted in error: %w", err) } - return resultMsgs + return resultMsgs, nil } - return []message.Batch{newMsg} + return []message.Batch{newMsg}, nil } // Count returns the number of currently buffered message parts within this diff --git a/internal/batch/policy/policy_test.go b/internal/batch/policy/policy_test.go index bb6734504..c981c20d4 100644 --- a/internal/batch/policy/policy_test.go +++ b/internal/batch/policy/policy_test.go @@ -79,7 +79,8 @@ func TestPolicyBasic(t *testing.T) { t.Errorf("Wrong count: %v != %v", act, exp) } - msg := pol.Flush(tCtx) + msg, err := pol.Flush(tCtx) + require.NoError(t, err) if !reflect.DeepEqual(exp, message.GetAllBytes(msg)) { t.Errorf("Wrong result: %s != %s", message.GetAllBytes(msg), exp) } @@ -87,7 +88,9 @@ func TestPolicyBasic(t *testing.T) { t.Errorf("Wrong count: %v != %v", act, exp) } - if msg = pol.Flush(tCtx); msg != nil { + msg, err = pol.Flush(tCtx) + require.NoError(t, err) + if msg != nil { t.Error("Non-nil empty flush") } } @@ -120,7 +123,9 @@ func TestPolicyPeriod(t *testing.T) { t.Errorf("Wrong period: %v", v) } - if v := pol.Flush(tCtx); v == nil { + v, err := pol.Flush(tCtx) + require.NoError(t, err) + if v == nil { t.Error("Nil msgs from flush") } @@ -153,12 +158,15 @@ func TestPolicySize(t *testing.T) { t.Error("Expected batch") } - msg := pol.Flush(tCtx) + msg, err := pol.Flush(tCtx) + require.NoError(t, err) if !reflect.DeepEqual(exp, message.GetAllBytes(msg)) { t.Errorf("Wrong result: %s != %s", message.GetAllBytes(msg), exp) } - if msg = pol.Flush(tCtx); msg != nil { + msg, err = pol.Flush(tCtx) + require.NoError(t, err) + if msg != nil { t.Error("Non-nil empty flush") } } @@ -185,12 +193,15 @@ func TestPolicyCheck(t *testing.T) { t.Error("Expected batch") } - msg := pol.Flush(tCtx) + msg, err := pol.Flush(tCtx) + require.NoError(t, err) if !reflect.DeepEqual(exp, message.GetAllBytes(msg)) { t.Errorf("Wrong result: %s != %s", message.GetAllBytes(msg), exp) } - if msg = pol.Flush(tCtx); msg != nil { + msg, err = pol.Flush(tCtx) + require.NoError(t, err) + if msg != nil { t.Error("Non-nil empty flush") } } @@ -220,12 +231,15 @@ func TestPolicyCheckAdvanced(t *testing.T) { t.Error("Expected batch") } - msg := pol.Flush(tCtx) + msg, err := pol.Flush(tCtx) + require.NoError(t, err) if !reflect.DeepEqual(exp, message.GetAllBytes(msg)) { t.Errorf("Wrong result: %s != %s", message.GetAllBytes(msg), exp) } - if msg = pol.Flush(tCtx); msg != nil { + msg, err = pol.Flush(tCtx) + require.NoError(t, err) + if msg != nil { t.Error("Non-nil empty flush") } } @@ -260,11 +274,13 @@ archive: assert.True(t, pol.Add(message.NewPart([]byte("bar")))) assert.Equal(t, 2, pol.Count()) - msg := pol.Flush(tCtx) + msg, err := pol.Flush(tCtx) + require.NoError(t, err) assert.Equal(t, exp, message.GetAllBytes(msg)) assert.Equal(t, 0, pol.Count()) - msg = pol.Flush(tCtx) + msg, err = pol.Flush(tCtx) + require.NoError(t, err) assert.Nil(t, msg) } @@ -302,7 +318,8 @@ func TestPolicySplit(t *testing.T) { t.Errorf("Wrong count: %v != %v", act, exp) } - msg := pol.Flush(tCtx) + msg, err := pol.Flush(tCtx) + require.NoError(t, err) if !reflect.DeepEqual(exp, message.GetAllBytes(msg)) { t.Errorf("Wrong result: %s != %s", message.GetAllBytes(msg), exp) } @@ -310,7 +327,9 @@ func TestPolicySplit(t *testing.T) { t.Errorf("Wrong count: %v != %v", act, exp) } - if msg = pol.Flush(tCtx); msg != nil { + msg, err = pol.Flush(tCtx) + require.NoError(t, err) + if msg != nil { t.Error("Non-nil empty flush") } } diff --git a/internal/component/input/batcher/batcher.go b/internal/component/input/batcher/batcher.go index 410e3e67c..b5feb2ae1 100644 --- a/internal/component/input/batcher/batcher.go +++ b/internal/component/input/batcher/batcher.go @@ -66,9 +66,31 @@ func (m *Impl) loop() { pendingTrans := []*transaction.Tracked{} pendingAcks := sync.WaitGroup{} + // resolvePending acknowledges every accumulated transaction with the given + // result and clears the slice. Used when a flush yields no batch to send, so + // those transactions are resolved immediately rather than lingering and + // inheriting a future batch's result. + resolvePending := func(ackErr error) { + for _, t := range pendingTrans { + if err := t.Ack(closeNowCtx, ackErr); err != nil { + break + } + } + pendingTrans = nil + } + flushBatchFn := func() { - sendMsg := m.batcher.Flush(closeNowCtx) + sendMsg, err := m.batcher.Flush(closeNowCtx) + if err != nil { + // The batching processors failed and the batch has been dropped. + // Nack the accumulated transactions so the source can retry them. + resolvePending(err) + return + } if sendMsg == nil { + // No batch produced (the policy was empty or every message was + // intentionally filtered away); the data was consumed successfully. + resolvePending(nil) return } diff --git a/internal/component/input/batcher/batcher_test.go b/internal/component/input/batcher/batcher_test.go index e69912352..04ca4f629 100644 --- a/internal/component/input/batcher/batcher_test.go +++ b/internal/component/input/batcher/batcher_test.go @@ -16,9 +16,12 @@ import ( "github.com/redpanda-data/benthos/v4/internal/batch/policy" "github.com/redpanda-data/benthos/v4/internal/batch/policy/batchconfig" "github.com/redpanda-data/benthos/v4/internal/component/input/batcher" + "github.com/redpanda-data/benthos/v4/internal/component/testutil" "github.com/redpanda-data/benthos/v4/internal/log" "github.com/redpanda-data/benthos/v4/internal/manager/mock" "github.com/redpanda-data/benthos/v4/internal/message" + + _ "github.com/redpanda-data/benthos/v4/internal/impl/pure" ) func TestBatcherStandard(t *testing.T) { @@ -165,6 +168,106 @@ func TestBatcherStandard(t *testing.T) { } } +// TestBatcherDroppedBatchMisattributesAck guards against pending transactions +// being acknowledged with the result of an unrelated, later batch. It mirrors +// the output batcher's regression test for the same accounting hazard. +// +// The batcher accumulates upstream transactions while buffering messages and +// resolves them once the resulting batch has been written. When a flush yields +// no batch — because the batch policy processors filtered every message away +// (exercised here), or because they returned an error — those transactions must +// be resolved against that flush rather than left to inherit a future batch's +// result. +// +// Scenario: batch one ("drop") is filtered to nothing by the policy processor, +// so its flush yields no batch. Batch two ("keep") forms a real batch which the +// downstream consumer nacks with errKeepFailed. +// +// Expected: the "drop" transactions resolve with nil (their data was +// successfully consumed and intentionally filtered) independently of batch two, +// rather than receiving batch two's errKeepFailed result. +func TestBatcherDroppedBatchMisattributesAck(t *testing.T) { + tCtx, done := context.WithTimeout(t.Context(), time.Second*20) + defer done() + + procConf, err := testutil.ProcessorFromYAML(` +mapping: | + root = if content().string() == "drop" { deleted() } +`) + require.NoError(t, err) + + mockInput := &mock.Input{TChan: make(chan message.Transaction)} + + batchConf := batchconfig.NewConfig() + batchConf.Count = 2 + batchConf.Processors = append(batchConf.Processors, procConf) + batchPol, err := policy.New(batchConf, mock.NewManager()) + require.NoError(t, err) + + b := batcher.New(batchPol, mockInput, log.Noop()) + b.TriggerStartConsuming() + + errKeepFailed := errors.New("keep batch write failed") + + // Buffered so the inline acknowledgement of filtered transactions never + // blocks the batcher loop. + dropRes := []chan error{make(chan error, 1), make(chan error, 1)} + keepRes := []chan error{make(chan error, 1), make(chan error, 1)} + + // Batch one: two messages the policy processor filters away. count=2 fires + // the flush, which yields no batch and is dropped. + for _, rc := range dropRes { + select { + case mockInput.TChan <- message.NewTransaction(message.QuickBatch([][]byte{[]byte("drop")}), rc): + case <-tCtx.Done(): + t.Fatal("timed out sending drop message") + } + } + + // Batch two: two messages that survive the processor and form a real batch. + for i, rc := range keepRes { + select { + case mockInput.TChan <- message.NewTransaction(message.QuickBatch([][]byte{fmt.Appendf(nil, "keep%v", i)}), rc): + case <-tCtx.Done(): + t.Fatal("timed out sending keep message") + } + } + + // Receive the keep batch and nack it. + select { + case tran := <-b.TransactionChan(): + assert.Equal(t, [][]byte{[]byte("keep0"), []byte("keep1")}, message.GetAllBytes(tran.Payload)) + require.NoError(t, tran.Ack(tCtx, errKeepFailed)) + case <-tCtx.Done(): + t.Fatal("timed out waiting for keep batch") + } + + // The keep transactions belong to the batch that failed, so they are + // correctly nacked. + for _, rc := range keepRes { + select { + case got := <-rc: + assert.Equal(t, errKeepFailed, got) + case <-tCtx.Done(): + t.Fatal("timed out waiting for keep ack") + } + } + + // The drop transactions were filtered in a separate, earlier flush and must + // NOT inherit batch two's failure. + for _, rc := range dropRes { + select { + case got := <-rc: + assert.NoError(t, got, "drop transaction wrongly received the keep batch's result") + case <-tCtx.Done(): + t.Fatal("drop transaction was never acked") + } + } + + mockInput.TriggerStopConsuming() + require.NoError(t, b.WaitForClose(tCtx)) +} + func TestBatcherErrorTracking(t *testing.T) { tCtx, done := context.WithTimeout(t.Context(), time.Second*5) defer done() diff --git a/internal/component/output/batcher/batcher.go b/internal/component/output/batcher/batcher.go index 1d6290c3b..c8b4ae52d 100644 --- a/internal/component/output/batcher/batcher.go +++ b/internal/component/output/batcher/batcher.go @@ -85,6 +85,23 @@ func (m *Impl) loop() { nextTimedBatchChan = time.After(tNext) } + // resolvePending acknowledges every accumulated transaction with the given + // result and clears the slice. It is used when a flush does not yield a + // batch to send onwards, so that those transactions are resolved + // immediately rather than lingering and inheriting a future batch's result. + resolvePending := func(pendingTrans []*transaction.Tracked, ackErr error) { + if len(pendingTrans) == 0 { + return + } + closeLeisureCtx, done := m.shutSig.SoftStopCtx(context.Background()) + defer done() + for _, t := range pendingTrans { + if err := t.Ack(closeLeisureCtx, ackErr); err != nil { + return + } + } + } + var pendingTrans []*transaction.Tracked for !m.shutSig.IsSoftStopSignalled() { if nextTimedBatchChan == nil { @@ -129,8 +146,22 @@ func (m *Impl) loop() { continue } - sendMsg := m.batcher.Flush(closeNowCtx) + sendMsg, err := m.batcher.Flush(closeNowCtx) + if err != nil { + // The batching processors failed and the batch has been dropped. + // Nack the accumulated transactions so the input can retry them, + // rather than letting them be acked by a later, unrelated batch. + resolvePending(pendingTrans, err) + pendingTrans = nil + continue + } if sendMsg == nil { + // The flush produced no batch, either because the policy was empty + // or because the batching processors intentionally filtered every + // message away. Either way the data was successfully consumed, so + // ack the accumulated transactions. + resolvePending(pendingTrans, nil) + pendingTrans = nil continue } diff --git a/internal/component/output/batcher/batcher_test.go b/internal/component/output/batcher/batcher_test.go index fb8244b40..ddd781d81 100644 --- a/internal/component/output/batcher/batcher_test.go +++ b/internal/component/output/batcher/batcher_test.go @@ -18,8 +18,11 @@ import ( "github.com/redpanda-data/benthos/v4/internal/batch/policy" "github.com/redpanda-data/benthos/v4/internal/batch/policy/batchconfig" "github.com/redpanda-data/benthos/v4/internal/component/output/batcher" + "github.com/redpanda-data/benthos/v4/internal/component/testutil" "github.com/redpanda-data/benthos/v4/internal/manager/mock" "github.com/redpanda-data/benthos/v4/internal/message" + + _ "github.com/redpanda-data/benthos/v4/internal/impl/pure" ) func TestBatcherEarlyTermination(t *testing.T) { @@ -353,6 +356,116 @@ func TestBatcherBatchError(t *testing.T) { wg.Wait() } +// TestBatcherDroppedBatchMisattributesAck guards against pending transactions +// being acknowledged with the result of an unrelated, later batch. +// +// The batcher accumulates upstream transactions while buffering messages and +// resolves them once the resulting batch has been written. When a flush yields +// no batch — because the batch policy processors filtered every message away +// (exercised here), or because the flush context was cancelled — those +// transactions must be resolved against that flush rather than left to inherit +// a future batch's result. Otherwise data that was never delivered can be +// acked as if it succeeded. +// +// Scenario: batch one ("drop") is filtered to nothing by the policy processor, +// so its flush yields no batch. Batch two ("keep") forms a real batch which the +// output nacks with errKeepFailed. +// +// Expected: the "drop" transactions resolve with nil (their data was +// successfully consumed and intentionally filtered) independently of batch two, +// rather than receiving batch two's errKeepFailed result. +func TestBatcherDroppedBatchMisattributesAck(t *testing.T) { + tCtx, done := context.WithTimeout(t.Context(), time.Second*20) + defer done() + + procConf, err := testutil.ProcessorFromYAML(` +mapping: | + root = if content().string() == "drop" { deleted() } +`) + require.NoError(t, err) + + policyConf := batchconfig.NewConfig() + policyConf.Count = 2 + policyConf.Processors = append(policyConf.Processors, procConf) + batchPol, err := policy.New(policyConf, mock.NewManager()) + require.NoError(t, err) + + out := &mock.OutputChanneled{} + b := batcher.New(batchPol, out, mock.NewManager()) + + tInChan := make(chan message.Transaction) + require.NoError(t, b.Consume(tInChan)) + b.TriggerStartConsuming() + + errKeepFailed := errors.New("keep batch write failed") + + // The only batch that ever reaches the output is the "keep" batch, which we + // nack with errKeepFailed. + wg := sync.WaitGroup{} + wg.Go(func() { + select { + case outTr := <-out.TChan: + assert.Equal(t, [][]byte{[]byte("keep0"), []byte("keep1")}, message.GetAllBytes(outTr.Payload)) + require.NoError(t, outTr.Ack(tCtx, errKeepFailed)) + case <-tCtx.Done(): + t.Error("timed out waiting for keep batch on output") + } + }) + + // Buffered so the batcher's ack goroutine never blocks regardless of the + // order in which the test reads results. + dropRes := []chan error{make(chan error, 1), make(chan error, 1)} + keepRes := []chan error{make(chan error, 1), make(chan error, 1)} + + // Batch one: two messages the policy processor filters away. count=2 fires + // the flush, which yields no batch and is dropped. + for _, rc := range dropRes { + select { + case tInChan <- message.NewTransaction(message.QuickBatch([][]byte{[]byte("drop")}), rc): + case <-tCtx.Done(): + t.Fatal("timed out sending drop message") + } + } + + // Batch two: two messages that survive the processor and form a real batch. + for i, rc := range keepRes { + select { + case tInChan <- message.NewTransaction(message.QuickBatch([][]byte{fmt.Appendf(nil, "keep%v", i)}), rc): + case <-tCtx.Done(): + t.Fatal("timed out sending keep message") + } + } + + // The keep transactions belong to the batch that failed, so they are + // correctly nacked. + for _, rc := range keepRes { + select { + case got := <-rc: + assert.Equal(t, errKeepFailed, got) + case <-tCtx.Done(): + t.Fatal("timed out waiting for keep ack") + } + } + + // The drop transactions were filtered in a separate, earlier flush and must + // NOT inherit batch two's failure. On the buggy code they receive + // errKeepFailed (or never resolve at all), demonstrating the pendingTrans + // leak. + for _, rc := range dropRes { + select { + case got := <-rc: + assert.NoError(t, got, "drop transaction wrongly received the keep batch's result (pendingTrans leak)") + case <-tCtx.Done(): + t.Fatal("drop transaction was never acked (leaked in pendingTrans)") + } + } + + close(tInChan) + b.TriggerCloseNow() + require.NoError(t, b.WaitForClose(tCtx)) + wg.Wait() +} + func TestBatcherTimed(t *testing.T) { tInChan := make(chan message.Transaction) resChan := make(chan error) diff --git a/public/service/config_batch_policy.go b/public/service/config_batch_policy.go index f6bc2bd19..0abeca9be 100644 --- a/public/service/config_batch_policy.go +++ b/public/service/config_batch_policy.go @@ -79,7 +79,10 @@ func (b *Batcher) UntilNext() (time.Duration, bool) { // Flush pending messages into a batch, apply any batching processors that are // part of the batching policy, and then return the result. func (b *Batcher) Flush(ctx context.Context) (batch MessageBatch, err error) { - m := b.p.Flush(ctx) + m, err := b.p.Flush(ctx) + if err != nil { + return nil, err + } if m == nil || m.Len() == 0 { return }