Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Drain the pending tx queue in merged batches with a durable WAL-backed ack, fixing severe queue backlog under heavy tx load. Tx dedup moved from the reaper cache into the sequencer queue [#3351](https://github.com/evstack/ev-node/pull/3351)

## v1.1.2

### Changes
Expand Down
1 change: 0 additions & 1 deletion block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ func newAggregatorComponents(
sequencer,
genesis,
logger,
cacheManager,
config.Node.ScrapeInterval.Duration,
executor.NotifyNewTransactions,
)
Expand Down
44 changes: 0 additions & 44 deletions block/internal/cache/generic_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,6 @@ func (c *Cache) isSeen(hash string) bool {
return c.hashes[hash]
}

// areSeen checks which hashes have been seen. Returns a boolean slice
// parallel to the input where result[i] is true if hashes[i] is in the
// cache. Acquires the read lock once for the entire batch.
func (c *Cache) areSeen(hashes []string) []bool {
c.mu.RLock()
defer c.mu.RUnlock()
result := make([]bool, len(hashes))
for i, h := range hashes {
result[i] = c.hashes[h]
}
return result
}

func (c *Cache) setSeen(hash string, height uint64) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -82,37 +69,6 @@ func (c *Cache) setSeen(hash string, height uint64) {
c.hashByHeight[height] = hash
}

func (c *Cache) removeSeen(hash string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.hashes, hash)
}

// setSeenBatch marks all hashes as seen under a single write lock.
// For height 0 (transactions), the hashByHeight bookkeeping is skipped
// since all txs share the same sentinel height — the map lookup and
// overwrite on every entry is pure overhead with no benefit.
func (c *Cache) setSeenBatch(hashes []string, height uint64) {
c.mu.Lock()
defer c.mu.Unlock()
if height == 0 {
for _, h := range hashes {
c.hashes[h] = true
}
return
}

// currently not used, but there for completeness against setSeen
for _, h := range hashes {
if existing, ok := c.hashByHeight[height]; ok && existing == h {
c.hashes[existing] = true
continue
}
c.hashes[h] = true
c.hashByHeight[height] = h
}
}

func (c *Cache) getDAIncluded(hash string) (uint64, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
Expand Down
2 changes: 0 additions & 2 deletions block/internal/cache/generic_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,6 @@ func TestCache_BasicOperations(t *testing.T) {
assert.False(t, c.isSeen("hash1"))
c.setSeen("hash1", 1)
assert.True(t, c.isSeen("hash1"))
c.removeSeen("hash1")
assert.False(t, c.isSeen("hash1"))

_, ok := c.getDAIncluded("hash2")
assert.False(t, ok)
Expand Down
79 changes: 1 addition & 78 deletions block/internal/cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/binary"
"fmt"
"sync"
"time"

"github.com/rs/zerolog"

Expand All @@ -24,10 +23,6 @@ const (

// DataDAIncludedPrefix is the store key prefix for data DA inclusion tracking.
DataDAIncludedPrefix = "cache/data-da-included/"

// DefaultTxCacheRetention is the default time to keep transaction hashes in cache.
// Keeping a too high value can lead to OOM during heavy transaction load.
DefaultTxCacheRetention = 30 * time.Minute
)

// CacheManager provides thread-safe cache operations for tracking seen blocks
Expand All @@ -51,13 +46,6 @@ type CacheManager interface {
SetDataDAIncluded(daCommitmentHash string, daHeight uint64, blockHeight uint64)
RemoveDataDAIncluded(hash string)

// Transaction operations
IsTxSeen(hash string) bool
AreTxsSeen(hashes []string) []bool
SetTxSeen(hash string)
SetTxsSeen(hashes []string)
CleanupOldTxs(olderThan time.Duration) int

// Pending events syncing coordination
GetNextPendingEvent(blockHeight uint64) *common.DAHeightEvent
SetPendingEvent(blockHeight uint64, event *common.DAHeightEvent)
Expand Down Expand Up @@ -94,8 +82,6 @@ var _ Manager = (*implementation)(nil)
type implementation struct {
headerCache *Cache
dataCache *Cache
txCache *Cache
txTimestamps *sync.Map // map[string]time.Time
pendingEvents map[uint64]*common.DAHeightEvent
pendingMu sync.Mutex
pendingHeaders *PendingHeaders
Expand All @@ -109,7 +95,6 @@ type implementation struct {
func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manager, error) {
headerCache := NewCache(st, HeaderDAIncludedPrefix)
dataCache := NewCache(st, DataDAIncludedPrefix)
txCache := NewCache(nil, "")

pendingHeaders, err := NewPendingHeaders(st, logger)
if err != nil {
Expand All @@ -124,8 +109,6 @@ func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manag
impl := &implementation{
headerCache: headerCache,
dataCache: dataCache,
txCache: txCache,
txTimestamps: new(sync.Map),
pendingEvents: make(map[uint64]*common.DAHeightEvent),
pendingHeaders: pendingHeaders,
pendingData: pendingData,
Expand Down Expand Up @@ -202,59 +185,6 @@ func (m *implementation) RemoveDataDAIncluded(hash string) {
m.dataCache.removeDAIncluded(hash)
}

func (m *implementation) IsTxSeen(hash string) bool {
return m.txCache.isSeen(hash)
}

func (m *implementation) AreTxsSeen(hashes []string) []bool {
return m.txCache.areSeen(hashes)
}

func (m *implementation) SetTxSeen(hash string) {
// Use 0 as height since transactions don't have a block height yet
m.txCache.setSeen(hash, 0)
// Track timestamp for cleanup purposes
m.txTimestamps.Store(hash, time.Now())
}

func (m *implementation) SetTxsSeen(hashes []string) {
m.txCache.setSeenBatch(hashes, 0)
now := time.Now()
for _, hash := range hashes {
m.txTimestamps.Store(hash, now)
}
}

// CleanupOldTxs removes transaction hashes older than olderThan and returns
// the count removed. Defaults to DefaultTxCacheRetention if olderThan <= 0.
func (m *implementation) CleanupOldTxs(olderThan time.Duration) int {
if olderThan <= 0 {
olderThan = DefaultTxCacheRetention
}

cutoff := time.Now().Add(-olderThan)
removed := 0

m.txTimestamps.Range(func(key, value any) bool {
hash, ok := key.(string)
if !ok {
return true
}
timestamp, ok := value.(time.Time)
if !ok {
return true
}
if timestamp.Before(cutoff) {
m.txCache.removeSeen(hash)
m.txTimestamps.Delete(hash)
removed++
}
return true
})

return removed
}

// DeleteHeight removes from all caches the given height.
// This can be done when a height has been da included.
func (m *implementation) DeleteHeight(blockHeight uint64) {
Expand All @@ -263,12 +193,6 @@ func (m *implementation) DeleteHeight(blockHeight uint64) {
m.pendingMu.Lock()
delete(m.pendingEvents, blockHeight)
m.pendingMu.Unlock()

// Note: txCache is intentionally NOT deleted here because:
// 1. Transactions are tracked by hash, not by block height (they use height 0)
// 2. A transaction seen at one height may be resubmitted at a different height
// 3. The cache prevents duplicate submissions across block heights
// 4. Cleanup is handled separately via CleanupOldTxs() based on time, not height
}

// Pending operations
Expand Down Expand Up @@ -363,7 +287,7 @@ func (m *implementation) SaveToStore() error {
return fmt.Errorf("failed to save data cache to store: %w", err)
}

// TX cache and pending events are ephemeral - not persisted
// pending events are ephemeral - not persisted
return nil
}

Expand Down Expand Up @@ -406,7 +330,6 @@ func (m *implementation) ClearFromStore() error {

m.headerCache = NewCache(m.store, HeaderDAIncludedPrefix)
m.dataCache = NewCache(m.store, DataDAIncludedPrefix)
m.txCache = NewCache(nil, "")
m.pendingEvents = make(map[uint64]*common.DAHeightEvent)

// Initialize DA height from store metadata to ensure DaHeight() is never 0.
Expand Down
Loading