[SPARK-57242][CORE] Avoid unbounded page allocation retries after allocator OOM#56293
[SPARK-57242][CORE] Avoid unbounded page allocation retries after allocator OOM#56293sunchao wants to merge 10 commits into
Conversation
|
Question (non-blocking): If a consumer's In that scenario the consumer did free physical memory (which is what we need for the allocator retry), but also re-acquired tracked memory in the same callback. The method would undercount released bytes and potentially give up before the allocator has enough headroom. Is this intentional best-effort behavior? If so, a brief comment here noting that progress tracking is conservative (may undercount when spill callbacks re-acquire) would help future readers understand why the bounded fallback paths exist even when spillable consumers remain. |
|
@shrirangmhalgi Thanks for calling this out. Yes, progress here is intentionally measured conservatively as the net reduction in the consumer's tracked memory. I added a short clarification next to the calculation in |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
+1, this looks reasonable to me, @sunchao . You can ignore the previous tungstenMemoryAllocator comment if it's difficult to write a test code without it.
viirya
left a comment
There was a problem hiding this comment.
I traced the new allocatePage recovery loop's memory accounting through every branch against the test cases, and verified the ShuffleExternalSorter/ShuffleInMemorySorter lazy-pointer-array interplay with the re-entrancy guard. The core logic is correct and the change is well-motivated — this is an independent confirmation on top of the existing approval, plus a few maintainability notes on the ThreadLocal guard.
The memory accounting holds across all branches. This is the riskiest part, so I checked the invariant that acquired always equals the task's currently-held-but-unreleased grant for this allocation, and that acquiredButNotUsed += acquired - page.size() stays non-negative:
- Same-grant retry after spill (
released > 0):allocationSizestays at the originalacquired; spilled bytes belong to other consumers and return to the pool, so they don't inflate this allocation's grant. - Combine-spill-with-free-tail (
partialAllocationSize > 0): theoverlap = min(partialAllocationSize, additionalAcquired)subtraction is the crux — since spilled memory returns to the pool andacquireAdditionalmay re-grant that same physical memory, adding the fulladditionalAcquiredwould double-count. I verifiedpageAllocationFailureCombinesSpillWithFreeTailend-to-end (acquired lands at 5120, page 2048,acquiredButNotUsed3072, cleanup nets to 0). allocationSize <= acquiredholds at the success point on every path (max(partial, additional) <= oldAcquired + additional - overlapbecauseoverlap <= partial <= oldAcquired), soacquired - page.size()is never negative. TheMath.addExactguards and the idempotentfinallycleanup are good defensive touches.
Moving acquiredButNotUsed from volatile to @GuardedBy("this") is a correctness improvement, not just cosmetic — the old += was never atomic under volatile, and all accesses are now under the lock.
The re-entrancy guard + lazy pointer array is a coherent, necessary pair. inPageAllocationRecovery makes nested allocatePage return null during recovery, which is only safe because consumers' spill() must not allocate; the ShuffleInMemorySorter.reset() lazy re-allocation is what makes ShuffleExternalSorter's spill honor that. The extra growPointerArrayIfNecessary() after acquireNewPageIfNecessary() in insertRecord is genuinely necessary (a data-page allocation can spill-and-reset this sorter, leaving the array lazily-null), and the getSortedIterator() pos == 0 early return prevents an NPE on the null array (the sort paths dereference array). All of this hangs together.
On the inPageAllocationRecovery ThreadLocal — the protection rests on an unenforced threading assumption. The guard's correctness depends entirely on the whole chain allocatePage -> recover -> consumer.spill() -> nested allocatePage running synchronously on one thread. That's the right scope (a shared boolean field would wrongly block other tasks' allocations during cooperative spilling, since spill() can be triggered cross-task), and the leak-prone parts are handled well — try/finally with remove() (not set(false)) avoids stale values on pooled threads. But two things are emergent rather than enforced:
- If a consumer's
spill()ever hands the nestedallocatePagecall to a different thread (async flush, executor-pool compaction, IO completion callback), that thread's ThreadLocal isfalseand the re-entrancy protection silently disappears — degrading back to the unbounded stacking this PR fixes, with no exception to signal it. TheMemoryConsumer.spill()contract today only says "should not call acquireMemory() from spill()"; it says nothing about threading. All three built-in spills (ShuffleExternalSorter,UnsafeExternalSorter,BytesToBytesMap) are synchronous and same-thread, so this is safe today, butMemoryConsumeris a public extension point. recoverFromPageAllocationFailureandacquireAdditionalExecutionMemoryForPageAllocationeach doset(true)...finally remove(). Becauseremove()resets unconditionally rather than restoring the prior value, nesting these two would let the innerfinallyclear the outer's flag early. It doesn't happen today (each checks the flag and returns before re-setting, and the loop takes one path per OOM), but it relies on the call graph rather than being locally safe.
A suggestion that addresses both at once, if you think it's worth it: instead of a per-thread flag, track the set of consumers currently in spill-driven recovery on the TMM itself —
@GuardedBy("this")
private final Set<MemoryConsumer> consumersInSpillRecovery = new HashSet<>();add the consumer being spilled (and likely the requesting consumer, to match the current guard's breadth) around the spill() call, and have allocatePage bail when its consumer is in that set. Because the state is keyed on consumer + TMM rather than the thread, it stays correct even if a spill() implementation hops threads, it still won't block other tasks' allocations, and add/remove of the same key is naturally idempotent so the nested-remove() sharp edge goes away. The one thing to pin down is exactly which consumers to block during recovery so the breadth matches today's ThreadLocal behavior. This is more invasive than the current approach, so it's a judgment call — but it converts the "must be same thread" assumption from implicit-and-unchecked into something the data model enforces.
If you keep the ThreadLocal, I'd at least add a line to the MemoryConsumer.spill() Javadoc (and/or recoverFromPageAllocationFailure) stating that spill() must complete synchronously on the caller thread and that any nested page allocation must occur on the same thread, else allocator-OOM recovery's re-entrancy protection is bypassed — alongside the existing "should not call acquireMemory() from spill()" note.
Both of the above are non-blocking — the current code is correct given the built-in consumers.
A couple of smaller things: the second growPointerArrayIfNecessary() adds one pos < usableCapacity check per record on the insert hot path (effectively free, but the PR description's "successful page allocations are unchanged" is slightly understated). And on @dongjoon-hyun's open question about the new tungstenMemoryAllocator field — my read is it's there solely to inject TestAllocator, which is a legitimate need (the deterministic allocator-OOM tests are the strongest part of this PR); if you'd rather not widen the constructor, a @VisibleForTesting setter or an overridable accessor would keep the public surface untouched.
Overall: correct, well-tested, and the cross-file invariant between the re-entrancy guard and the lazy pointer array is sound. I concur with the approval; the ThreadLocal threading contract is the one thing I'd most like to see either documented or designed out.
|
@viirya Thanks for the careful review. I addressed the two concrete maintainability points in
I kept the This update also fixes an adversarial shuffle regression found during follow-up validation: with |
f72fa82 to
9ade5a7
Compare
cloud-fan
left a comment
There was a problem hiding this comment.
0 blocking, 1 non-blocking, 0 nits.
Correct, well-motivated, and unusually well-tested; I concur with the existing approvals. One non-blocking idiom note below.
Design / architecture (1)
- TaskMemoryManager.java:126:
inPageAllocationRecoveryis a rawThreadLocalwith manualset(true)/finally remove(); prefer the lexically-scoped idiom — see inline
Verification
Traced the recovery-loop memory accounting across every branch: acquiredButNotUsed += acquired - page.size() is never negative because acquired >= allocationSize == page.size() on each success path (initial grant; same-grant spill retry; spill+free-tail combine, where overlap = min(partialAllocationSize, additionalAcquired) removes the double-count; free-tail-only), and the failure finally retains the full grant — so the grant reconciles to 0 at cleanup. Separately confirmed the lazy-pointer-array design matches the existing UnsafeInMemorySorter.freeMemory() precedent (array nulled during spill, allocated lazily on next insert), so the UnsafeExternalSorter path driven by the same direct-spill recovery is already safe and needs no parallel fix.
| /** | ||
| * Prevent nested page allocations while spilling from recursively entering allocator recovery. | ||
| */ | ||
| private final ThreadLocal<Boolean> inPageAllocationRecovery = |
There was a problem hiding this comment.
Non-blocking idiom note. This is a raw ThreadLocal managed manually with set(true) ... finally remove() in both recoverFromPageAllocationFailure and acquireAdditionalExecutionMemoryForPageAllocation. remove() resets to the initial false unconditionally rather than restoring the prior value, so nesting the two would clear an outer scope's flag early — the same sharp edge @viirya raised above.
Spark's idiom for a lexically-scoped thread local is org.apache.spark.util.LexicalThreadLocal, whose runWith does save-old / set / finally restore-old and is nesting-safe by construction. The catch is that it's a Scala trait carrying state (private val tl) with no Java users, so it can't be cleanly extended from this Java class (the trait field initializer wouldn't run). Given that, the pragmatic in-language fix is to capture the prior boolean at entry and restore it in finally rather than calling remove():
final boolean prev = inPageAllocationRecovery.get();
inPageAllocationRecovery.set(true);
try {
...
} finally {
inPageAllocationRecovery.set(prev);
}Current code is correct for the three built-in consumers, so this is robustness/idiom only — not a blocker.
There was a problem hiding this comment.
Thanks for checking this. Both guarded helpers return immediately when the flag is already true, so any invocation that reaches set(true) necessarily has a prior value of false. A nested invocation cannot enter its own set/finally scope and therefore cannot clear the outer flag. I prefer keeping remove() here because it also removes the ThreadLocal entry from the pooled thread rather than retaining a false value. If the call structure changes to permit nesting in the future, we should introduce a truly scoped helper or depth counter then.
9ade5a7 to
f4bcd7d
Compare
|
Updated the PR and addressed most of the comments except the one on |
|
LGTM, also cc @jiangxb1987 @Ngone51 |
|
Do we need more review, @sunchao ? |
cloud-fan
left a comment
There was a problem hiding this comment.
Re-review (commit 4a1cee9): 1 addressed, 0 remaining, 0 new.
0 blocking, 0 non-blocking, 0 nits.
Re-reviewed the delta since my prior round — the minimumSize / allocatePageWithMinimum mechanism, the second-level insertRecord pointer-array recovery, and the added tests. Correct and very thoroughly tested; I concur with the existing approvals.
My earlier non-blocking idiom note on inPageAllocationRecovery is resolved — agreed that the two guarded helpers cannot nest (each early-returns when the flag is already set), so remove() always restores false, and the new pageAllocationRequest thread-local already uses save/restore.
Verification
Traced the acquiredButNotUsed accounting across every recovery branch, including the new minimum-size path: the surplus is released before the partial retry so acquired == minimumSize == allocationSize == page.size() on that success path, and across all branches acquired >= page.size() holds, so acquiredButNotUsed += acquired - page.size() is never negative and the grant reconciles to 0 at cleanup — cross-checked against the arithmetic asserted in pageAllocationFailureCanRetryWithPartialPage / CombinesSpillWithFreeTail / ReleasesSurplusGrantAfterFullSpill. Separately confirmed the new lazy pointer-array lifecycle in ShuffleInMemorySorter (null on reset, lazy realloc, null-guarded expandPointerArray / getMemoryUsage) matches the established UnsafeInMemorySorter precedent.
Thanks again for the stamp @viirya @dongjoon-hyun @cloud-fan ! Through another testing I a correctness gap in the new allocator-OOM recovery path: certain exact-fit retry pages can cause a false OOM instead of a safe fallback. Therefore, I just applied another update on the PR, which should be narrowly scoped. Please take a look again when you have a chance 🙏 . |
cloud-fan
left a comment
There was a problem hiding this comment.
Re-review (commit d51c821): 0 addressed, 0 remaining, 2 new. (2 newly introduced by this commit, 0 late catches.)
1 blocking, 1 non-blocking, 0 nits.
Re-reviewed the delta d51c821 only (the previously-approved iterative-recovery mechanism is unchanged). The two-tier marker design — distinguishing self-limiting exact-fit pages (safe to accept) from pathological ones (reject and fall back) — is sound and well-tested for the branch it covers. One gap: the marker is set on only one of the two branches that produce pathological exact-fit pages.
Correctness (1)
- TaskMemoryManager.java:621 (and BytesToBytesMap.java:847): the pre-existing minimum-drop branch produces a pathological exact-fit page but never sets the
pagesAllocatedFromMinimumRetrymarker, so B2B accepts it and the targeted "false OOM" stays reachable via the spill-active path — see inline. Blocking.
Design / architecture (1)
- ShuffleExternalSorter.java:449 / UnsafeExternalSorter.java:459: other
allocatePageconsumers pad to pageSize and accept any returned page with no exact-fit guard — are they not equally exposed to the same pathological tiny-page loop, or is the guard intentionally limited to B2B/HybridQueue? Non-blocking, needs confirmation.
Verification
Traced both minimum-drop branches in allocatePageInternal: branch L573-581 sets minimumRetryAfterPartialAllocationFailure (recorded at L639); the pre-existing branch L619-627 sets allocationSize = minimumSize and returns an identical exact-fit page but does not set the flag. Both fire while execution memory is available (surplus is released), so both pages are pathological. Confirmed the accounting is otherwise sound — surplus release is non-negative and acquiredButNotUsed += acquired - page.size() is 0 on the minimum-retry success path — and that the self-limiting (accept) vs. pathological (reject) distinction the tests assert is correct; the gap is solely that branch L619 is left unmarked.
PR description suggestions
- Document: the delta's design — the exact-fit / minimum-retry marker (
pagesAllocatedFromMinimumRetry/isPageAllocationFromMinimumRetry) and the consumer-side rejection inBytesToBytesMap/HybridQueuethat falls back instead of accepting a pathological exact-fit page. The current "What changes were proposed" list predates this commit.
| tryingPartialAllocation = true; | ||
| } else if (minimumSize > 0 && minimumSize < allocationSize) { | ||
| // The original grant is still reserved. If the caller padded a smaller allocation to | ||
| // the configured page size, make one bounded attempt at the minimum usable size without |
There was a problem hiding this comment.
This minimum-drop branch returns an exact-fit page (allocationSize = minimumSize) just like the new branch at L573-581, but it doesn't set minimumRetryAfterPartialAllocationFailure, so the page is never recorded in pagesAllocatedFromMinimumRetry (set only at L639). It fires after spilling made progress yet the allocator still OOMs and the surplus is released — i.e. execution memory is available — so its page is pathological in exactly the same way as the branch it does mark.
Because BytesToBytesMap.acquireNewPage rejects only marked pages (L847), it will accept a page from this branch and re-enter the per-record page-table-slot exhaustion this delta is meant to prevent, reachable whenever a second spillable consumer is present (so recovery spilling makes progress). Consider setting the flag in both minimum-drop branches, e.g. via a shared helper. (No test currently drives this path for B2B — the new B2B tests inject allocator OOM but register no spillable peer, so they only exercise L573.)
| } | ||
| // Retaining exact-fit minimum-retry pages would consume one page-table slot per map entry. | ||
| if (required < pageSizeBytes && page.size() == required && | ||
| isPageAllocationFromMinimumRetry(page)) { |
There was a problem hiding this comment.
This rejection is gated on isPageAllocationFromMinimumRetry, which TaskMemoryManager sets only for the new branch (L573-581), not for the pre-existing minimum-drop at L619-627 — yet both yield pathological exact-fit pages. As written, an exact-fit page from the L619 path slips through and is accepted, re-introducing the page-table-slot exhaustion in spill-active scenarios. See the comment on TaskMemoryManager.java:621 for the full trace.
d51c821 to
d08d28d
Compare
|
@cloud-fan Good catch, you're right. I missed marking the second minimum-drop branch. Fixed in The external sorters are different: after accepting an exact-fit page, the next failed full-page allocation makes each sorter self-spill and free its data pages, so they cannot accumulate one page-table slot per record like |
Why are the changes needed?
Spark page allocation has two distinct steps:
MemoryManagergrants the task permission to use execution memory.MemoryAllocatorphysically allocates the on-heap or off-heap page.An execution-memory grant does not guarantee that the physical allocator can create the page. The allocator may still throw
OutOfMemoryError, for example because of memory pressure outside Spark's execution-memory accounting or because a sufficiently large allocation cannot be satisfied.When that happens,
TaskMemoryManager.allocatePage()currently retains the grant as acquired-but-unused memory and recursively callsallocatePage():Each retry asks for another execution-memory grant even though the task has not physically allocated the previous one. There is no check that retrying made progress. Under a persistent allocator/accounting mismatch, the task can pin an increasing amount of execution memory, recurse repeatedly, and eventually block waiting for more execution memory or fail far away from the original allocator OOM.
This is the generic
TaskMemoryManagerfailure path underlying the long-running allocation retry described by SPARK-54354. That issue bounded temporary memory managers used by hashed relations, while SPARK-54818 improved allocator OOM diagnostics. The recursive retry behavior remains for other page-allocation consumers.Recovery should remain bounded while preserving the useful behavior of the old implementation:
What changes were proposed in this PR?
For
TaskMemoryManager:The direct spill path can reset
ShuffleExternalSorterwhile record insertion is in progress, so this PR also makes its pointer-array lifecycle safe for that recovery path:ShuffleInMemorySorter.reset()frees its pointer array and allocates the replacement lazily, outside the spill callback.ShuffleExternalSorterreuses successful pointer-array growth after a spill instead of collapsing back to a one-record array.For consumers that cannot safely retain one exact-fit page per record:
BytesToBytesMaprejects marked minimum-retry pages and falls back instead of consuming one page-table slot per record.HybridQueuefrees exact-fit partial pages and falls back to its disk queue.Normal page allocation does not enter the allocator-OOM recovery loop. Shuffle record insertion adds one inexpensive pointer-capacity check after data-page allocation so it can recover if that allocation spilled and reset the sorter. This introduces no new configuration or public API.
How was this PR tested?
Added deterministic
TaskMemoryManagerSuitecoverage for:Added Python row-queue coverage for exact-fit partial-page fallback to disk.
Added shuffle sorter coverage for:
Validation performed:
TaskMemoryManagerSuiteBytesToBytesMapOnHeapSuiteBytesToBytesMapOffHeapSuiteRowQueueSuiteShuffleInMemorySorterSuiteShuffleInMemoryRadixSorterSuiteShuffleExternalSorterSuiteUnsafeShuffleWriterSuiteUnsafeExternalSorterSuitegit diff --check