Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Drain the pending tx queue in merged batches with a durable WAL-backed ack, fixing severe queue backlog under heavy tx load. Tx dedup moved from the reaper cache into the sequencer queue [#3351](https://github.com/evstack/ev-node/pull/3351)

## v1.1.2

### Changes
Expand Down
16 changes: 15 additions & 1 deletion block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ func newAggregatorComponents(
// error channel for critical failures
errorCh := make(chan error, 1)

// capture raw sequencer before tracing wrap for batch ack interface
rawSequencer := sequencer

// wrap sequencer with tracing if enabled
if config.Instrumentation.IsTracingEnabled() {
sequencer = telemetry.WithTracingSequencer(sequencer)
Expand Down Expand Up @@ -278,14 +281,25 @@ func newAggregatorComponents(
sequencer,
genesis,
logger,
cacheManager,
config.Node.ScrapeInterval.Duration,
executor.NotifyNewTransactions,
)
if err != nil {
return nil, fmt.Errorf("failed to create reaper: %w", err)
}

// wire batch ack callback so drained queue entries are committed after block commit

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we move this in the executor constructor, we get the sequencer data there.
this would decrease the public api surface.

type batchAcknowledger interface {
AckBatch(ctx context.Context) error
}
if acker, ok := rawSequencer.(batchAcknowledger); ok {
executor.SetOnBatchCommitted(acker.AckBatch)
} else if !config.Node.BasedSequencer {
// without an ack, drained queue entries are rolled back on every
// retrieval and the same transactions would be re-included each block
logger.Warn().Msg("sequencer does not implement AckBatch; drained batch entries will not be acknowledged after block commit")
}

if config.Node.BasedSequencer { // no submissions needed for based sequencer
return &Components{
Executor: executor,
Expand Down
44 changes: 0 additions & 44 deletions block/internal/cache/generic_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,6 @@ func (c *Cache) isSeen(hash string) bool {
return c.hashes[hash]
}

// areSeen checks which hashes have been seen. Returns a boolean slice
// parallel to the input where result[i] is true if hashes[i] is in the
// cache. Acquires the read lock once for the entire batch.
func (c *Cache) areSeen(hashes []string) []bool {
c.mu.RLock()
defer c.mu.RUnlock()
result := make([]bool, len(hashes))
for i, h := range hashes {
result[i] = c.hashes[h]
}
return result
}

func (c *Cache) setSeen(hash string, height uint64) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -82,37 +69,6 @@ func (c *Cache) setSeen(hash string, height uint64) {
c.hashByHeight[height] = hash
}

func (c *Cache) removeSeen(hash string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.hashes, hash)
}

// setSeenBatch marks all hashes as seen under a single write lock.
// For height 0 (transactions), the hashByHeight bookkeeping is skipped
// since all txs share the same sentinel height — the map lookup and
// overwrite on every entry is pure overhead with no benefit.
func (c *Cache) setSeenBatch(hashes []string, height uint64) {
c.mu.Lock()
defer c.mu.Unlock()
if height == 0 {
for _, h := range hashes {
c.hashes[h] = true
}
return
}

// currently not used, but there for completeness against setSeen
for _, h := range hashes {
if existing, ok := c.hashByHeight[height]; ok && existing == h {
c.hashes[existing] = true
continue
}
c.hashes[h] = true
c.hashByHeight[height] = h
}
}

func (c *Cache) getDAIncluded(hash string) (uint64, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
Expand Down
2 changes: 0 additions & 2 deletions block/internal/cache/generic_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,6 @@ func TestCache_BasicOperations(t *testing.T) {
assert.False(t, c.isSeen("hash1"))
c.setSeen("hash1", 1)
assert.True(t, c.isSeen("hash1"))
c.removeSeen("hash1")
assert.False(t, c.isSeen("hash1"))

_, ok := c.getDAIncluded("hash2")
assert.False(t, ok)
Expand Down
79 changes: 1 addition & 78 deletions block/internal/cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/binary"
"fmt"
"sync"
"time"

"github.com/rs/zerolog"

Expand All @@ -24,10 +23,6 @@ const (

// DataDAIncludedPrefix is the store key prefix for data DA inclusion tracking.
DataDAIncludedPrefix = "cache/data-da-included/"

// DefaultTxCacheRetention is the default time to keep transaction hashes in cache.
// Keeping a too high value can lead to OOM during heavy transaction load.
DefaultTxCacheRetention = 30 * time.Minute
)

// CacheManager provides thread-safe cache operations for tracking seen blocks
Expand All @@ -51,13 +46,6 @@ type CacheManager interface {
SetDataDAIncluded(daCommitmentHash string, daHeight uint64, blockHeight uint64)
RemoveDataDAIncluded(hash string)

// Transaction operations
IsTxSeen(hash string) bool
AreTxsSeen(hashes []string) []bool
SetTxSeen(hash string)
SetTxsSeen(hashes []string)
CleanupOldTxs(olderThan time.Duration) int

// Pending events syncing coordination
GetNextPendingEvent(blockHeight uint64) *common.DAHeightEvent
SetPendingEvent(blockHeight uint64, event *common.DAHeightEvent)
Expand Down Expand Up @@ -94,8 +82,6 @@ var _ Manager = (*implementation)(nil)
type implementation struct {
headerCache *Cache
dataCache *Cache
txCache *Cache
txTimestamps *sync.Map // map[string]time.Time
pendingEvents map[uint64]*common.DAHeightEvent
pendingMu sync.Mutex
pendingHeaders *PendingHeaders
Expand All @@ -109,7 +95,6 @@ type implementation struct {
func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manager, error) {
headerCache := NewCache(st, HeaderDAIncludedPrefix)
dataCache := NewCache(st, DataDAIncludedPrefix)
txCache := NewCache(nil, "")

pendingHeaders, err := NewPendingHeaders(st, logger)
if err != nil {
Expand All @@ -124,8 +109,6 @@ func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manag
impl := &implementation{
headerCache: headerCache,
dataCache: dataCache,
txCache: txCache,
txTimestamps: new(sync.Map),
pendingEvents: make(map[uint64]*common.DAHeightEvent),
pendingHeaders: pendingHeaders,
pendingData: pendingData,
Expand Down Expand Up @@ -202,59 +185,6 @@ func (m *implementation) RemoveDataDAIncluded(hash string) {
m.dataCache.removeDAIncluded(hash)
}

func (m *implementation) IsTxSeen(hash string) bool {
return m.txCache.isSeen(hash)
}

func (m *implementation) AreTxsSeen(hashes []string) []bool {
return m.txCache.areSeen(hashes)
}

func (m *implementation) SetTxSeen(hash string) {
// Use 0 as height since transactions don't have a block height yet
m.txCache.setSeen(hash, 0)
// Track timestamp for cleanup purposes
m.txTimestamps.Store(hash, time.Now())
}

func (m *implementation) SetTxsSeen(hashes []string) {
m.txCache.setSeenBatch(hashes, 0)
now := time.Now()
for _, hash := range hashes {
m.txTimestamps.Store(hash, now)
}
}

// CleanupOldTxs removes transaction hashes older than olderThan and returns
// the count removed. Defaults to DefaultTxCacheRetention if olderThan <= 0.
func (m *implementation) CleanupOldTxs(olderThan time.Duration) int {
if olderThan <= 0 {
olderThan = DefaultTxCacheRetention
}

cutoff := time.Now().Add(-olderThan)
removed := 0

m.txTimestamps.Range(func(key, value any) bool {
hash, ok := key.(string)
if !ok {
return true
}
timestamp, ok := value.(time.Time)
if !ok {
return true
}
if timestamp.Before(cutoff) {
m.txCache.removeSeen(hash)
m.txTimestamps.Delete(hash)
removed++
}
return true
})

return removed
}

// DeleteHeight removes from all caches the given height.
// This can be done when a height has been da included.
func (m *implementation) DeleteHeight(blockHeight uint64) {
Expand All @@ -263,12 +193,6 @@ func (m *implementation) DeleteHeight(blockHeight uint64) {
m.pendingMu.Lock()
delete(m.pendingEvents, blockHeight)
m.pendingMu.Unlock()

// Note: txCache is intentionally NOT deleted here because:
// 1. Transactions are tracked by hash, not by block height (they use height 0)
// 2. A transaction seen at one height may be resubmitted at a different height
// 3. The cache prevents duplicate submissions across block heights
// 4. Cleanup is handled separately via CleanupOldTxs() based on time, not height
}

// Pending operations
Expand Down Expand Up @@ -363,7 +287,7 @@ func (m *implementation) SaveToStore() error {
return fmt.Errorf("failed to save data cache to store: %w", err)
}

// TX cache and pending events are ephemeral - not persisted
// pending events are ephemeral - not persisted
return nil
}

Expand Down Expand Up @@ -406,7 +330,6 @@ func (m *implementation) ClearFromStore() error {

m.headerCache = NewCache(m.store, HeaderDAIncludedPrefix)
m.dataCache = NewCache(m.store, DataDAIncludedPrefix)
m.txCache = NewCache(nil, "")
m.pendingEvents = make(map[uint64]*common.DAHeightEvent)

// Initialize DA height from store metadata to ensure DaHeight() is never 0.
Expand Down
Loading
Loading