Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions internal/batch/policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,23 @@ func (p *Batcher) Add(part *message.Part) bool {
return p.triggered || (p.period > 0 && time.Since(p.lastBatch) > p.period)
}

// Flush clears all messages stored by this batch policy. Returns nil if the
// policy is currently empty.
func (p *Batcher) Flush(ctx context.Context) message.Batch {
// Flush clears all messages stored by this batch policy and applies any
// batching processors to them. The returned values distinguish three outcomes:
//
// - (batch, nil): a batch is ready to be sent onwards.
// - (nil, nil): the policy is currently empty, or the batching processors
// intentionally filtered every message away. Callers should treat the
// buffered transactions as successfully consumed (ack).
// - (nil, err): the batching processors failed and the batch has been
// dropped. Callers should treat the buffered transactions as failed (nack)
// so the source can retry them, rather than passing unprocessed data on.
func (p *Batcher) Flush(ctx context.Context) (message.Batch, error) {
var newMsg message.Batch

resultMsgs := p.flushAny(ctx)
resultMsgs, err := p.flushAny(ctx)
if err != nil {
return nil, err
}
if len(resultMsgs) == 1 {
newMsg = resultMsgs[0]
} else if len(resultMsgs) > 1 {
Expand All @@ -144,10 +155,10 @@ func (p *Batcher) Flush(ctx context.Context) message.Batch {
})
}
}
return newMsg
return newMsg, nil
}

func (p *Batcher) flushAny(ctx context.Context) []message.Batch {
func (p *Batcher) flushAny(ctx context.Context) ([]message.Batch, error) {
var newMsg message.Batch
if len(p.parts) > 0 {
if !p.triggered && p.period > 0 && time.Since(p.lastBatch) > p.period {
Expand All @@ -162,19 +173,19 @@ func (p *Batcher) flushAny(ctx context.Context) []message.Batch {
p.triggered = false

if newMsg == nil {
return nil
return nil, nil
}

if len(p.procs) > 0 {
resultMsgs, err := iprocessor.ExecuteAll(ctx, p.procs, newMsg)
if err != nil {
p.log.Error("Batch processors resulted in error: %v, the batch has been dropped.", err)
return nil
return nil, err

@josephwoodward josephwoodward Jun 4, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth adding context to this error message so it can be related to the error log above?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call — done in c03f060. The returned error is now wrapped as fmt.Errorf("batch processors resulted in error: %w", err), so the nack reason surfaced downstream lines up with the logged message just above it.

}
return resultMsgs
return resultMsgs, nil
}

return []message.Batch{newMsg}
return []message.Batch{newMsg}, nil
}

// Count returns the number of currently buffered message parts within this
Expand Down
45 changes: 32 additions & 13 deletions internal/batch/policy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,18 @@ func TestPolicyBasic(t *testing.T) {
t.Errorf("Wrong count: %v != %v", act, exp)
}

msg := pol.Flush(tCtx)
msg, err := pol.Flush(tCtx)
require.NoError(t, err)
if !reflect.DeepEqual(exp, message.GetAllBytes(msg)) {
t.Errorf("Wrong result: %s != %s", message.GetAllBytes(msg), exp)
}
if exp, act := 0, pol.Count(); exp != act {
t.Errorf("Wrong count: %v != %v", act, exp)
}

if msg = pol.Flush(tCtx); msg != nil {
msg, err = pol.Flush(tCtx)
require.NoError(t, err)
if msg != nil {
t.Error("Non-nil empty flush")
}
}
Expand Down Expand Up @@ -120,7 +123,9 @@ func TestPolicyPeriod(t *testing.T) {
t.Errorf("Wrong period: %v", v)
}

if v := pol.Flush(tCtx); v == nil {
v, err := pol.Flush(tCtx)
require.NoError(t, err)
if v == nil {
t.Error("Nil msgs from flush")
}

Expand Down Expand Up @@ -153,12 +158,15 @@ func TestPolicySize(t *testing.T) {
t.Error("Expected batch")
}

msg := pol.Flush(tCtx)
msg, err := pol.Flush(tCtx)
require.NoError(t, err)
if !reflect.DeepEqual(exp, message.GetAllBytes(msg)) {
t.Errorf("Wrong result: %s != %s", message.GetAllBytes(msg), exp)
}

if msg = pol.Flush(tCtx); msg != nil {
msg, err = pol.Flush(tCtx)
require.NoError(t, err)
if msg != nil {
t.Error("Non-nil empty flush")
}
}
Expand All @@ -185,12 +193,15 @@ func TestPolicyCheck(t *testing.T) {
t.Error("Expected batch")
}

msg := pol.Flush(tCtx)
msg, err := pol.Flush(tCtx)
require.NoError(t, err)
if !reflect.DeepEqual(exp, message.GetAllBytes(msg)) {
t.Errorf("Wrong result: %s != %s", message.GetAllBytes(msg), exp)
}

if msg = pol.Flush(tCtx); msg != nil {
msg, err = pol.Flush(tCtx)
require.NoError(t, err)
if msg != nil {
t.Error("Non-nil empty flush")
}
}
Expand Down Expand Up @@ -220,12 +231,15 @@ func TestPolicyCheckAdvanced(t *testing.T) {
t.Error("Expected batch")
}

msg := pol.Flush(tCtx)
msg, err := pol.Flush(tCtx)
require.NoError(t, err)
if !reflect.DeepEqual(exp, message.GetAllBytes(msg)) {
t.Errorf("Wrong result: %s != %s", message.GetAllBytes(msg), exp)
}

if msg = pol.Flush(tCtx); msg != nil {
msg, err = pol.Flush(tCtx)
require.NoError(t, err)
if msg != nil {
t.Error("Non-nil empty flush")
}
}
Expand Down Expand Up @@ -260,11 +274,13 @@ archive:
assert.True(t, pol.Add(message.NewPart([]byte("bar"))))
assert.Equal(t, 2, pol.Count())

msg := pol.Flush(tCtx)
msg, err := pol.Flush(tCtx)
require.NoError(t, err)
assert.Equal(t, exp, message.GetAllBytes(msg))
assert.Equal(t, 0, pol.Count())

msg = pol.Flush(tCtx)
msg, err = pol.Flush(tCtx)
require.NoError(t, err)
assert.Nil(t, msg)
}

Expand Down Expand Up @@ -302,15 +318,18 @@ func TestPolicySplit(t *testing.T) {
t.Errorf("Wrong count: %v != %v", act, exp)
}

msg := pol.Flush(tCtx)
msg, err := pol.Flush(tCtx)
require.NoError(t, err)
if !reflect.DeepEqual(exp, message.GetAllBytes(msg)) {
t.Errorf("Wrong result: %s != %s", message.GetAllBytes(msg), exp)
}
if exp, act := 0, pol.Count(); exp != act {
t.Errorf("Wrong count: %v != %v", act, exp)
}

if msg = pol.Flush(tCtx); msg != nil {
msg, err = pol.Flush(tCtx)
require.NoError(t, err)
if msg != nil {
t.Error("Non-nil empty flush")
}
}
24 changes: 23 additions & 1 deletion internal/component/input/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,31 @@ func (m *Impl) loop() {
pendingTrans := []*transaction.Tracked{}
pendingAcks := sync.WaitGroup{}

// resolvePending acknowledges every accumulated transaction with the given
// result and clears the slice. Used when a flush yields no batch to send, so
// those transactions are resolved immediately rather than lingering and
// inheriting a future batch's result.
resolvePending := func(ackErr error) {
for _, t := range pendingTrans {
if err := t.Ack(closeNowCtx, ackErr); err != nil {
break
}
}
pendingTrans = nil
}

flushBatchFn := func() {
sendMsg := m.batcher.Flush(closeNowCtx)
sendMsg, err := m.batcher.Flush(closeNowCtx)
if err != nil {
// The batching processors failed and the batch has been dropped.
// Nack the accumulated transactions so the source can retry them.
resolvePending(err)
return
}
if sendMsg == nil {
// No batch produced (the policy was empty or every message was
// intentionally filtered away); the data was consumed successfully.
resolvePending(nil)
return
}
Comment on lines 82 to 95

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The input batcher received the same behavioral fix as the output batcher — flushBatchFn now nacks pending transactions on a processor error and acks them on an empty/filtered flush via resolvePending. However, only the output batcher gained a regression test (TestBatcherDroppedBatchMisattributesAck); these new branches in the input batcher have no coverage. Consider adding a mirrored regression test (e.g. a count-triggered batch whose processor filters every message away, followed by a later batch that nacks, asserting the filtered transactions resolve with nil independently) so a future regression in the input path is caught too.

Per the project test patterns, changed code should be accompanied by tests exercising the new behavior.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in c03f060: TestBatcherDroppedBatchMisattributesAck in the input batcher test, mirroring the output batcher's. It drives a count-triggered batch whose processor filters every message away, then a later batch that the consumer nacks, and asserts the filtered transactions resolve with nil independently. Verified it fails against the pre-fix code and passes with the fix.


Expand Down
33 changes: 32 additions & 1 deletion internal/component/output/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,23 @@ func (m *Impl) loop() {
nextTimedBatchChan = time.After(tNext)
}

// resolvePending acknowledges every accumulated transaction with the given
// result and clears the slice. It is used when a flush does not yield a
// batch to send onwards, so that those transactions are resolved
// immediately rather than lingering and inheriting a future batch's result.
resolvePending := func(pendingTrans []*transaction.Tracked, ackErr error) {
if len(pendingTrans) == 0 {
return
}
closeLeisureCtx, done := m.shutSig.SoftStopCtx(context.Background())
defer done()
for _, t := range pendingTrans {
if err := t.Ack(closeLeisureCtx, ackErr); err != nil {
return
}
}
}

var pendingTrans []*transaction.Tracked
for !m.shutSig.IsSoftStopSignalled() {
if nextTimedBatchChan == nil {
Expand Down Expand Up @@ -129,8 +146,22 @@ func (m *Impl) loop() {
continue
}

sendMsg := m.batcher.Flush(closeNowCtx)
sendMsg, err := m.batcher.Flush(closeNowCtx)
if err != nil {
// The batching processors failed and the batch has been dropped.
// Nack the accumulated transactions so the input can retry them,
// rather than letting them be acked by a later, unrelated batch.
resolvePending(pendingTrans, err)
pendingTrans = nil
continue
}
if sendMsg == nil {
// The flush produced no batch, either because the policy was empty
// or because the batching processors intentionally filtered every
// message away. Either way the data was successfully consumed, so
// ack the accumulated transactions.
resolvePending(pendingTrans, nil)
pendingTrans = nil
continue
}

Expand Down
113 changes: 113 additions & 0 deletions internal/component/output/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ import (
"github.com/redpanda-data/benthos/v4/internal/batch/policy"
"github.com/redpanda-data/benthos/v4/internal/batch/policy/batchconfig"
"github.com/redpanda-data/benthos/v4/internal/component/output/batcher"
"github.com/redpanda-data/benthos/v4/internal/component/testutil"
"github.com/redpanda-data/benthos/v4/internal/manager/mock"
"github.com/redpanda-data/benthos/v4/internal/message"

_ "github.com/redpanda-data/benthos/v4/internal/impl/pure"
)

func TestBatcherEarlyTermination(t *testing.T) {
Expand Down Expand Up @@ -353,6 +356,116 @@ func TestBatcherBatchError(t *testing.T) {
wg.Wait()
}

// TestBatcherDroppedBatchMisattributesAck guards against pending transactions
// being acknowledged with the result of an unrelated, later batch.
//
// The batcher accumulates upstream transactions while buffering messages and
// resolves them once the resulting batch has been written. When a flush yields
// no batch — because the batch policy processors filtered every message away
// (exercised here), or because the flush context was cancelled — those
// transactions must be resolved against that flush rather than left to inherit
// a future batch's result. Otherwise data that was never delivered can be
// acked as if it succeeded.
//
// Scenario: batch one ("drop") is filtered to nothing by the policy processor,
// so its flush yields no batch. Batch two ("keep") forms a real batch which the
// output nacks with errKeepFailed.
//
// Expected: the "drop" transactions resolve with nil (their data was
// successfully consumed and intentionally filtered) independently of batch two,
// rather than receiving batch two's errKeepFailed result.
func TestBatcherDroppedBatchMisattributesAck(t *testing.T) {
tCtx, done := context.WithTimeout(t.Context(), time.Second*20)
defer done()

procConf, err := testutil.ProcessorFromYAML(`
mapping: |
root = if content().string() == "drop" { deleted() }
`)
require.NoError(t, err)

policyConf := batchconfig.NewConfig()
policyConf.Count = 2
policyConf.Processors = append(policyConf.Processors, procConf)
batchPol, err := policy.New(policyConf, mock.NewManager())
require.NoError(t, err)

out := &mock.OutputChanneled{}
b := batcher.New(batchPol, out, mock.NewManager())

tInChan := make(chan message.Transaction)
require.NoError(t, b.Consume(tInChan))
b.TriggerStartConsuming()

errKeepFailed := errors.New("keep batch write failed")

// The only batch that ever reaches the output is the "keep" batch, which we
// nack with errKeepFailed.
wg := sync.WaitGroup{}
wg.Go(func() {
select {
case outTr := <-out.TChan:
assert.Equal(t, [][]byte{[]byte("keep0"), []byte("keep1")}, message.GetAllBytes(outTr.Payload))
require.NoError(t, outTr.Ack(tCtx, errKeepFailed))
case <-tCtx.Done():
t.Error("timed out waiting for keep batch on output")
}
})

// Buffered so the batcher's ack goroutine never blocks regardless of the
// order in which the test reads results.
dropRes := []chan error{make(chan error, 1), make(chan error, 1)}
keepRes := []chan error{make(chan error, 1), make(chan error, 1)}

// Batch one: two messages the policy processor filters away. count=2 fires
// the flush, which yields no batch and is dropped.
for _, rc := range dropRes {
select {
case tInChan <- message.NewTransaction(message.QuickBatch([][]byte{[]byte("drop")}), rc):
case <-tCtx.Done():
t.Fatal("timed out sending drop message")
}
}

// Batch two: two messages that survive the processor and form a real batch.
for i, rc := range keepRes {
select {
case tInChan <- message.NewTransaction(message.QuickBatch([][]byte{fmt.Appendf(nil, "keep%v", i)}), rc):
case <-tCtx.Done():
t.Fatal("timed out sending keep message")
}
}

// The keep transactions belong to the batch that failed, so they are
// correctly nacked.
for _, rc := range keepRes {
select {
case got := <-rc:
assert.Equal(t, errKeepFailed, got)
case <-tCtx.Done():
t.Fatal("timed out waiting for keep ack")
}
}

// The drop transactions were filtered in a separate, earlier flush and must
// NOT inherit batch two's failure. On the buggy code they receive
// errKeepFailed (or never resolve at all), demonstrating the pendingTrans
// leak.
for _, rc := range dropRes {
select {
case got := <-rc:
assert.NoError(t, got, "drop transaction wrongly received the keep batch's result (pendingTrans leak)")
case <-tCtx.Done():
t.Fatal("drop transaction was never acked (leaked in pendingTrans)")
}
}

close(tInChan)
b.TriggerCloseNow()
require.NoError(t, b.WaitForClose(tCtx))
wg.Wait()
}

func TestBatcherTimed(t *testing.T) {
tInChan := make(chan message.Transaction)
resChan := make(chan error)
Expand Down
Loading
Loading