Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions internal/batch/policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,23 @@ func (p *Batcher) Add(part *message.Part) bool {
return p.triggered || (p.period > 0 && time.Since(p.lastBatch) > p.period)
}

// Flush clears all messages stored by this batch policy. Returns nil if the
// policy is currently empty.
func (p *Batcher) Flush(ctx context.Context) message.Batch {
// Flush clears all messages stored by this batch policy and applies any
// batching processors to them. The returned values distinguish three outcomes:
//
// - (batch, nil): a batch is ready to be sent onwards.
// - (nil, nil): the policy is currently empty, or the batching processors
// intentionally filtered every message away. Callers should treat the
// buffered transactions as successfully consumed (ack).
// - (nil, err): the batching processors failed and the batch has been
// dropped. Callers should treat the buffered transactions as failed (nack)
// so the source can retry them, rather than passing unprocessed data on.
func (p *Batcher) Flush(ctx context.Context) (message.Batch, error) {
var newMsg message.Batch

resultMsgs := p.flushAny(ctx)
resultMsgs, err := p.flushAny(ctx)
if err != nil {
return nil, err
}
if len(resultMsgs) == 1 {
newMsg = resultMsgs[0]
} else if len(resultMsgs) > 1 {
Expand All @@ -144,10 +155,10 @@ func (p *Batcher) Flush(ctx context.Context) message.Batch {
})
}
}
return newMsg
return newMsg, nil
}

func (p *Batcher) flushAny(ctx context.Context) []message.Batch {
func (p *Batcher) flushAny(ctx context.Context) ([]message.Batch, error) {
var newMsg message.Batch
if len(p.parts) > 0 {
if !p.triggered && p.period > 0 && time.Since(p.lastBatch) > p.period {
Expand All @@ -162,19 +173,19 @@ func (p *Batcher) flushAny(ctx context.Context) []message.Batch {
p.triggered = false

if newMsg == nil {
return nil
return nil, nil
}

if len(p.procs) > 0 {
resultMsgs, err := iprocessor.ExecuteAll(ctx, p.procs, newMsg)
if err != nil {
p.log.Error("Batch processors resulted in error: %v, the batch has been dropped.", err)
return nil
return nil, fmt.Errorf("batch processors resulted in error: %w", err)
}
return resultMsgs
return resultMsgs, nil
}

return []message.Batch{newMsg}
return []message.Batch{newMsg}, nil
}

// Count returns the number of currently buffered message parts within this
Expand Down
45 changes: 32 additions & 13 deletions internal/batch/policy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,18 @@ func TestPolicyBasic(t *testing.T) {
t.Errorf("Wrong count: %v != %v", act, exp)
}

msg := pol.Flush(tCtx)
msg, err := pol.Flush(tCtx)
require.NoError(t, err)
if !reflect.DeepEqual(exp, message.GetAllBytes(msg)) {
t.Errorf("Wrong result: %s != %s", message.GetAllBytes(msg), exp)
}
if exp, act := 0, pol.Count(); exp != act {
t.Errorf("Wrong count: %v != %v", act, exp)
}

if msg = pol.Flush(tCtx); msg != nil {
msg, err = pol.Flush(tCtx)
require.NoError(t, err)
if msg != nil {
t.Error("Non-nil empty flush")
}
}
Expand Down Expand Up @@ -120,7 +123,9 @@ func TestPolicyPeriod(t *testing.T) {
t.Errorf("Wrong period: %v", v)
}

if v := pol.Flush(tCtx); v == nil {
v, err := pol.Flush(tCtx)
require.NoError(t, err)
if v == nil {
t.Error("Nil msgs from flush")
}

Expand Down Expand Up @@ -153,12 +158,15 @@ func TestPolicySize(t *testing.T) {
t.Error("Expected batch")
}

msg := pol.Flush(tCtx)
msg, err := pol.Flush(tCtx)
require.NoError(t, err)
if !reflect.DeepEqual(exp, message.GetAllBytes(msg)) {
t.Errorf("Wrong result: %s != %s", message.GetAllBytes(msg), exp)
}

if msg = pol.Flush(tCtx); msg != nil {
msg, err = pol.Flush(tCtx)
require.NoError(t, err)
if msg != nil {
t.Error("Non-nil empty flush")
}
}
Expand All @@ -185,12 +193,15 @@ func TestPolicyCheck(t *testing.T) {
t.Error("Expected batch")
}

msg := pol.Flush(tCtx)
msg, err := pol.Flush(tCtx)
require.NoError(t, err)
if !reflect.DeepEqual(exp, message.GetAllBytes(msg)) {
t.Errorf("Wrong result: %s != %s", message.GetAllBytes(msg), exp)
}

if msg = pol.Flush(tCtx); msg != nil {
msg, err = pol.Flush(tCtx)
require.NoError(t, err)
if msg != nil {
t.Error("Non-nil empty flush")
}
}
Expand Down Expand Up @@ -220,12 +231,15 @@ func TestPolicyCheckAdvanced(t *testing.T) {
t.Error("Expected batch")
}

msg := pol.Flush(tCtx)
msg, err := pol.Flush(tCtx)
require.NoError(t, err)
if !reflect.DeepEqual(exp, message.GetAllBytes(msg)) {
t.Errorf("Wrong result: %s != %s", message.GetAllBytes(msg), exp)
}

if msg = pol.Flush(tCtx); msg != nil {
msg, err = pol.Flush(tCtx)
require.NoError(t, err)
if msg != nil {
t.Error("Non-nil empty flush")
}
}
Expand Down Expand Up @@ -260,11 +274,13 @@ archive:
assert.True(t, pol.Add(message.NewPart([]byte("bar"))))
assert.Equal(t, 2, pol.Count())

msg := pol.Flush(tCtx)
msg, err := pol.Flush(tCtx)
require.NoError(t, err)
assert.Equal(t, exp, message.GetAllBytes(msg))
assert.Equal(t, 0, pol.Count())

msg = pol.Flush(tCtx)
msg, err = pol.Flush(tCtx)
require.NoError(t, err)
assert.Nil(t, msg)
}

Expand Down Expand Up @@ -302,15 +318,18 @@ func TestPolicySplit(t *testing.T) {
t.Errorf("Wrong count: %v != %v", act, exp)
}

msg := pol.Flush(tCtx)
msg, err := pol.Flush(tCtx)
require.NoError(t, err)
if !reflect.DeepEqual(exp, message.GetAllBytes(msg)) {
t.Errorf("Wrong result: %s != %s", message.GetAllBytes(msg), exp)
}
if exp, act := 0, pol.Count(); exp != act {
t.Errorf("Wrong count: %v != %v", act, exp)
}

if msg = pol.Flush(tCtx); msg != nil {
msg, err = pol.Flush(tCtx)
require.NoError(t, err)
if msg != nil {
t.Error("Non-nil empty flush")
}
}
24 changes: 23 additions & 1 deletion internal/component/input/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,31 @@ func (m *Impl) loop() {
pendingTrans := []*transaction.Tracked{}
pendingAcks := sync.WaitGroup{}

// resolvePending acknowledges every accumulated transaction with the given
// result and clears the slice. Used when a flush yields no batch to send, so
// those transactions are resolved immediately rather than lingering and
// inheriting a future batch's result.
resolvePending := func(ackErr error) {
for _, t := range pendingTrans {
if err := t.Ack(closeNowCtx, ackErr); err != nil {
break
}
}
pendingTrans = nil
}

flushBatchFn := func() {
sendMsg := m.batcher.Flush(closeNowCtx)
sendMsg, err := m.batcher.Flush(closeNowCtx)
if err != nil {
// The batching processors failed and the batch has been dropped.
// Nack the accumulated transactions so the source can retry them.
resolvePending(err)
return
}
if sendMsg == nil {
// No batch produced (the policy was empty or every message was
// intentionally filtered away); the data was consumed successfully.
resolvePending(nil)
return
}
Comment on lines 82 to 95

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The input batcher received the same behavioral fix as the output batcher — flushBatchFn now nacks pending transactions on a processor error and acks them on an empty/filtered flush via resolvePending. However, only the output batcher gained a regression test (TestBatcherDroppedBatchMisattributesAck); these new branches in the input batcher have no coverage. Consider adding a mirrored regression test (e.g. a count-triggered batch whose processor filters every message away, followed by a later batch that nacks, asserting the filtered transactions resolve with nil independently) so a future regression in the input path is caught too.

Per the project test patterns, changed code should be accompanied by tests exercising the new behavior.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in c03f060: TestBatcherDroppedBatchMisattributesAck in the input batcher test, mirroring the output batcher's. It drives a count-triggered batch whose processor filters every message away, then a later batch that the consumer nacks, and asserts the filtered transactions resolve with nil independently. Verified it fails against the pre-fix code and passes with the fix.


Expand Down
103 changes: 103 additions & 0 deletions internal/component/input/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ import (
"github.com/redpanda-data/benthos/v4/internal/batch/policy"
"github.com/redpanda-data/benthos/v4/internal/batch/policy/batchconfig"
"github.com/redpanda-data/benthos/v4/internal/component/input/batcher"
"github.com/redpanda-data/benthos/v4/internal/component/testutil"
"github.com/redpanda-data/benthos/v4/internal/log"
"github.com/redpanda-data/benthos/v4/internal/manager/mock"
"github.com/redpanda-data/benthos/v4/internal/message"

_ "github.com/redpanda-data/benthos/v4/internal/impl/pure"
)

func TestBatcherStandard(t *testing.T) {
Expand Down Expand Up @@ -165,6 +168,106 @@ func TestBatcherStandard(t *testing.T) {
}
}

// TestBatcherDroppedBatchMisattributesAck guards against pending transactions
// being acknowledged with the result of an unrelated, later batch. It mirrors
// the output batcher's regression test for the same accounting hazard.
//
// The batcher accumulates upstream transactions while buffering messages and
// resolves them once the resulting batch has been written. When a flush yields
// no batch — because the batch policy processors filtered every message away
// (exercised here), or because they returned an error — those transactions must
// be resolved against that flush rather than left to inherit a future batch's
// result.
//
// Scenario: batch one ("drop") is filtered to nothing by the policy processor,
// so its flush yields no batch. Batch two ("keep") forms a real batch which the
// downstream consumer nacks with errKeepFailed.
//
// Expected: the "drop" transactions resolve with nil (their data was
// successfully consumed and intentionally filtered) independently of batch two,
// rather than receiving batch two's errKeepFailed result.
func TestBatcherDroppedBatchMisattributesAck(t *testing.T) {
tCtx, done := context.WithTimeout(t.Context(), time.Second*20)
defer done()

procConf, err := testutil.ProcessorFromYAML(`
mapping: |
root = if content().string() == "drop" { deleted() }
`)
require.NoError(t, err)

mockInput := &mock.Input{TChan: make(chan message.Transaction)}

batchConf := batchconfig.NewConfig()
batchConf.Count = 2
batchConf.Processors = append(batchConf.Processors, procConf)
batchPol, err := policy.New(batchConf, mock.NewManager())
require.NoError(t, err)

b := batcher.New(batchPol, mockInput, log.Noop())
b.TriggerStartConsuming()

errKeepFailed := errors.New("keep batch write failed")

// Buffered so the inline acknowledgement of filtered transactions never
// blocks the batcher loop.
dropRes := []chan error{make(chan error, 1), make(chan error, 1)}
keepRes := []chan error{make(chan error, 1), make(chan error, 1)}

// Batch one: two messages the policy processor filters away. count=2 fires
// the flush, which yields no batch and is dropped.
for _, rc := range dropRes {
select {
case mockInput.TChan <- message.NewTransaction(message.QuickBatch([][]byte{[]byte("drop")}), rc):
case <-tCtx.Done():
t.Fatal("timed out sending drop message")
}
}

// Batch two: two messages that survive the processor and form a real batch.
for i, rc := range keepRes {
select {
case mockInput.TChan <- message.NewTransaction(message.QuickBatch([][]byte{fmt.Appendf(nil, "keep%v", i)}), rc):
case <-tCtx.Done():
t.Fatal("timed out sending keep message")
}
}

// Receive the keep batch and nack it.
select {
case tran := <-b.TransactionChan():
assert.Equal(t, [][]byte{[]byte("keep0"), []byte("keep1")}, message.GetAllBytes(tran.Payload))
require.NoError(t, tran.Ack(tCtx, errKeepFailed))
case <-tCtx.Done():
t.Fatal("timed out waiting for keep batch")
}

// The keep transactions belong to the batch that failed, so they are
// correctly nacked.
for _, rc := range keepRes {
select {
case got := <-rc:
assert.Equal(t, errKeepFailed, got)
case <-tCtx.Done():
t.Fatal("timed out waiting for keep ack")
}
}

// The drop transactions were filtered in a separate, earlier flush and must
// NOT inherit batch two's failure.
for _, rc := range dropRes {
select {
case got := <-rc:
assert.NoError(t, got, "drop transaction wrongly received the keep batch's result")
case <-tCtx.Done():
t.Fatal("drop transaction was never acked")
}
}

mockInput.TriggerStopConsuming()
require.NoError(t, b.WaitForClose(tCtx))
}

func TestBatcherErrorTracking(t *testing.T) {
tCtx, done := context.WithTimeout(t.Context(), time.Second*5)
defer done()
Expand Down
33 changes: 32 additions & 1 deletion internal/component/output/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,23 @@ func (m *Impl) loop() {
nextTimedBatchChan = time.After(tNext)
}

// resolvePending acknowledges every accumulated transaction with the given
// result and clears the slice. It is used when a flush does not yield a
// batch to send onwards, so that those transactions are resolved
// immediately rather than lingering and inheriting a future batch's result.
resolvePending := func(pendingTrans []*transaction.Tracked, ackErr error) {
if len(pendingTrans) == 0 {
return
}
closeLeisureCtx, done := m.shutSig.SoftStopCtx(context.Background())
defer done()
for _, t := range pendingTrans {
if err := t.Ack(closeLeisureCtx, ackErr); err != nil {
return
}
}
}

var pendingTrans []*transaction.Tracked
for !m.shutSig.IsSoftStopSignalled() {
if nextTimedBatchChan == nil {
Expand Down Expand Up @@ -129,8 +146,22 @@ func (m *Impl) loop() {
continue
}

sendMsg := m.batcher.Flush(closeNowCtx)
sendMsg, err := m.batcher.Flush(closeNowCtx)
if err != nil {
// The batching processors failed and the batch has been dropped.
// Nack the accumulated transactions so the input can retry them,
// rather than letting them be acked by a later, unrelated batch.
resolvePending(pendingTrans, err)
pendingTrans = nil
continue
}
if sendMsg == nil {
// The flush produced no batch, either because the policy was empty
// or because the batching processors intentionally filtered every
// message away. Either way the data was successfully consumed, so
// ack the accumulated transactions.
resolvePending(pendingTrans, nil)
pendingTrans = nil
continue
}

Expand Down
Loading
Loading