fix: drain pending tx queue in batches with durable ack#3351
Conversation
Resolves slow pending-tx drain caused by one-queue-entry-per-block consumption and seen-on-enqueue dedup poisoning: - single sequencer drains multiple queue entries per block up to MaxBytes; WAL entries are deleted only on ack after block commit, with retry before the next batch and startup reconciliation against the last committed block (DropIncluded) - tx dedup moves from the 30-min cache-manager tx cache into the batch queue itself, keyed by sha256, released on ack; dead cache tx-seen subsystem removed - postponed txs are requeued durably during ack and stay deduped - Load cleans duplicate/stale WAL entries so restarts cannot resurrect committed txs - reaper submits one scrape per interval and notifies the executor only when new entries were actually queued (explicit pending count wiring, immune to tracing wrappers)
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
💤 Files with no reviewable changes (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughMigrates tx dedup into a WAL-backed BatchQueue with Drain/Ack and in-flight bookkeeping, removes the tx-cache subsystem, simplifies reaper submission (no cache dedup), adds sequencer ack APIs and executor retry-before-next-block for post-commit batch acknowledgement, and updates tests/benches and component wiring. ChangesBatch queue drain/ack with in-memory dedup and cache removal
Estimated code review effort 🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Claude finished @chatton's task in 4m 25s —— View job Code Review:
|
|
The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).
|
- defer postponed entry requeue until ack fully succeeds so a drain rollback after a failed ack neither loses nor duplicates postponed txs - replace fmt.Printf with structured logging in BatchQueue - use monotonic enqueue counter in reaper to detect new submissions race-free against concurrent drain/ack
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3351 +/- ##
==========================================
+ Coverage 60.72% 61.00% +0.28%
==========================================
Files 127 127
Lines 13781 13879 +98
==========================================
+ Hits 8368 8467 +99
+ Misses 4500 4488 -12
- Partials 913 924 +11
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
pkg/sequencers/single/queue.go (1)
135-138: ⚡ Quick winConsider wrapping the persistence error for better context.
The error from
persistBatchis returned directly. Wrapping it would provide better context for debugging failures.📝 Suggested improvement
key := seqToKey(bq.nextAddSeq) if err := bq.persistBatch(ctx, batch, key); err != nil { bq.rollbackSeenLocked(hashes) - return err + return fmt.Errorf("failed to persist batch to WAL: %w", err) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/sequencers/single/queue.go` around lines 135 - 138, The call to bq.persistBatch returns an error that is propagated directly; wrap that error with contextual information before returning so callers know what operation failed. Update the if block that calls bq.persistBatch(ctx, batch, key) to wrap the returned err (e.g., include the key and operation name) when returning, and ensure the package imports fmt (or errors) to use error wrapping via %w (or errors.Wrap) so the original error is preserved; keep the rollback call to bq.rollbackSeenLocked(hashes) unchanged.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@pkg/sequencers/single/queue.go`:
- Around line 135-138: The call to bq.persistBatch returns an error that is
propagated directly; wrap that error with contextual information before
returning so callers know what operation failed. Update the if block that calls
bq.persistBatch(ctx, batch, key) to wrap the returned err (e.g., include the key
and operation name) when returning, and ensure the package imports fmt (or
errors) to use error wrapping via %w (or errors.Wrap) so the original error is
preserved; keep the rollback call to bq.rollbackSeenLocked(hashes) unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 686eac89-6664-447a-bb99-cef8eb2de766
📒 Files selected for processing (14)
block/components.goblock/internal/cache/generic_cache.goblock/internal/cache/generic_cache_test.goblock/internal/cache/manager.goblock/internal/cache/manager_test.goblock/internal/executing/executor.goblock/internal/reaping/bench_test.goblock/internal/reaping/reaper.goblock/internal/reaping/reaper_test.gopkg/sequencers/single/queue.gopkg/sequencers/single/queue_migration_test.gopkg/sequencers/single/queue_test.gopkg/sequencers/single/sequencer.gopkg/sequencers/single/sequencer_test.go
💤 Files with no reviewable changes (3)
- block/internal/cache/manager_test.go
- block/internal/cache/generic_cache_test.go
- block/internal/cache/generic_cache.go
- fail Load on datastore read errors instead of dropping txs - detach rollback WAL cleanup from drain context so it runs on shutdown - raise queue load timeout for large WAL backlogs - merge postponed-tx collection into the filter loop - document DropIncluded aliasing, SetPostponed contract, txSeen bound - add tests: executor ack retry, reconcile crash recovery, bulk-prepend rollback
There was a problem hiding this comment.
🧹 Nitpick comments (1)
pkg/sequencers/single/queue.go (1)
472-474: WAL datastore read errors hard-fail node startup (no internal retry/backoff)WAL read failure returns error instead of continuing
// a datastore read failure means the WAL cannot be trusted as // loaded — fail startup rather than silently dropping txs. return fmt.Errorf("failed to read WAL entry from datastore: %w", result.Error)
pkg/sequencers/single/queue.goreturns an error on any datastore read failure while loading the WAL; only protobuf decode errors are logged and skipped (queue.go:483).- That error propagates through
pkg/sequencers/single/sequencer.go(failed to load batch queue from DB) back to the appRunEhandlers (apps/*/cmd/run.go), which then bubbles up torootCmd.Execute()and terminates the process (apps/*/main.gocallsos.Exit(1)).Consider adding a bounded retry/backoff around the WAL
db.Query/load path if startup availability during transient datastore issues matters.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/sequencers/single/queue.go` around lines 472 - 474, The WAL loader in pkg/sequencers/single/queue.go currently returns immediately on any datastore read failure (the path that returns fmt.Errorf("failed to read WAL entry...": %w, result.Error)), causing hard-fail startup; instead, wrap the datastore read (the db.Query / WAL load block inside the queue/WAL load function) in a bounded retry with exponential backoff and jitter (configurable max attempts and total timeout) so transient datastore errors are retried before failing, while preserving the existing behavior of logging and skipping protobuf decode errors; ensure the retry only surrounds the datastore read/result.Error check and still returns an error if retries are exhausted so callers like the sequencer load caller (the loadBatchQueue/loadQueue code path used by pkg/sequencers/single/sequencer.go) observe a final failure.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@pkg/sequencers/single/queue.go`:
- Around line 472-474: The WAL loader in pkg/sequencers/single/queue.go
currently returns immediately on any datastore read failure (the path that
returns fmt.Errorf("failed to read WAL entry...": %w, result.Error)), causing
hard-fail startup; instead, wrap the datastore read (the db.Query / WAL load
block inside the queue/WAL load function) in a bounded retry with exponential
backoff and jitter (configurable max attempts and total timeout) so transient
datastore errors are retried before failing, while preserving the existing
behavior of logging and skipping protobuf decode errors; ensure the retry only
surrounds the datastore read/result.Error check and still returns an error if
retries are exhausted so callers like the sequencer load caller (the
loadBatchQueue/loadQueue code path used by pkg/sequencers/single/sequencer.go)
observe a final failure.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 188839d7-9e11-4b38-b2b7-5ebe73172a22
📒 Files selected for processing (5)
block/internal/executing/executor_logic_test.gopkg/sequencers/single/queue.gopkg/sequencers/single/queue_test.gopkg/sequencers/single/sequencer.gopkg/sequencers/single/sequencer_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
- pkg/sequencers/single/sequencer.go
- pkg/sequencers/single/queue_test.go
julienrbrt
left a comment
There was a problem hiding this comment.
One thing I don't like about this is the coupling, even though loose between the single sequencer queue and the reaper.
Can we add a changelog?
| r.cache.SetTxsSeen(newHashes) | ||
| totalSubmitted += len(newTxs) | ||
| var before uint64 | ||
| if r.totalEnqueuedBatches != nil { |
There was a problem hiding this comment.
This does not look super needed tbf apart from the metrics, and this adds the loose coupling.
and brings to some hacks line L158 for based/solo which does not implement this.
Can we just simplify here with a simple len(txs) which would be ok for 99% of the cases on all sequencers?
The only thing we risks is onTxsSubmitted being called early which could at worse lead to an empty block.
| return nil, fmt.Errorf("failed to create reaper: %w", err) | ||
| } | ||
|
|
||
| // wire batch ack callback so drained queue entries are committed after block commit |
There was a problem hiding this comment.
nit: could we move this in the executor constructor, we get the sequencer data there.
this would decrease the public api surface.
Overview
Fixes the pending tx drain bottlenecks we hit on testnet.
The queue used to hand out one entry per block, so a backlog of small entries took forever to drain. Drain now merges entries into a single batch up to the block's byte limit.
WAL entries were also deleted as soon as they were dequeued, so txs could be lost if the block never committed. Entries are now kept until the block is committed and explicitly acked. Failed acks are retried before the next block, and on restart anything already included in the last block is dropped from the reloaded queue.
Dedup moved from the reaper's in-memory cache into the queue itself, since the cache didn't survive restarts and let pending txs get re-submitted and included twice. The seen-set is rebuilt from the WAL on load and hashes are only released once their txs are committed.
Postponed txs are also handled by the queue now: persisted before the source entries are deleted, so ack retries can't lose or duplicate them.
Tested with a burst of 500K transactions, the important part of the fix being the sub 1s queue wait, this was ~20mins+ before this change.
Summary by CodeRabbit
Bug Fixes
New Features
Chores
Tests