Skip to content

[SPARK-56887][SQL][FOLLOWUP] Add spill support and extend ShuffledJoin in SortMergeAsOfJoinExec#56595

Open
sarutak wants to merge 2 commits into
apache:masterfrom
sarutak:asof-join-spill-refactor
Open

[SPARK-56887][SQL][FOLLOWUP] Add spill support and extend ShuffledJoin in SortMergeAsOfJoinExec#56595
sarutak wants to merge 2 commits into
apache:masterfrom
sarutak:asof-join-spill-refactor

Conversation

@sarutak

@sarutak sarutak commented Jun 18, 2026

Copy link
Copy Markdown
Member

Follow-up to #55912. Refactors SortMergeAsOfJoinExec to address review feedback:

  1. Extend ShuffledJoin instead of BaseJoinExec: Inherits standard requiredChildDistribution, outputPartitioning, and output handling consistent with other shuffled join operators.

  2. Replace ArrayBuffer with ExternalAppendOnlyUnsafeRowArray: The right-side group buffer now spills to disk when the in-memory threshold is exceeded, eliminating OOM for skewed equi-key groups. Uses the same spark.sql.sortMergeJoinExecBuffer.* configs as SortMergeJoinExec.

  3. Unify to forward-only scan: Removes findBestRightToLeft (which required indexed random access incompatible with spill-backed storage). Backward joins now use a forward scan keeping the last as-of-satisfying row, which produces the same result because the as-of predicate is monotone over the sorted buffer. Early termination is preserved (true→false transition for Backward, distance increase for Forward/Nearest).

  4. Add spillSize metric: Reports spill bytes at task completion, matching SortMergeJoinExec's observability.

Why are the changes needed?

The original operator used an in-memory ArrayBuffer that could OOM on skewed equi-key groups (flagged in #55912 review). The reverse scan for Backward joins prevented using ExternalAppendOnlyUnsafeRowArray (forward-only iterator). As noted in the review, the reverse scan is unnecessary since a forward scan keeping the last match produces the same result for monotone predicates, and extending ShuffledJoin provides maintenance parity with SortMergeJoinExec.

Does this PR introduce any user-facing change?

No. Behavior is unchanged. The operator now spills to disk instead of OOMing for large groups.

How was this patch tested?

  • SortMergeAsOfJoinSuite: all 26 tests pass
  • DataFrameAsOfJoinSuite: all 11 tests pass
  • AsOfJoinBenchmark: no performance regression (AMD EPYC 7763, 10K×10K rows, 100 groups):
JDK Before (ms) After (ms) Speedup vs baseline
17 59 53 720×
21 62 68 562×
25 56 61 660×

Was this patch authored or co-authored using generative AI tooling?

Kiro CLI / Claude

sarutak added 2 commits June 18, 2026 20:22
…orward scan unification

Refactor SortMergeAsOfJoinExec per review feedback:

1. Extend ShuffledJoin instead of BaseJoinExec
   - Inherits standard requiredChildDistribution, outputPartitioning,
     and output for all join types
   - Override requiredChildDistribution only for the AllTuples case
     (no equi-keys)
   - supportCodegen = false (codegen is future work)

2. Replace ArrayBuffer with ExternalAppendOnlyUnsafeRowArray
   - Right-side group buffer now spills to disk when the in-memory
     threshold is exceeded, eliminating OOM for skewed equi-key groups
   - Uses the same spark.sql.sortMergeJoinExecBuffer.* configs as
     SortMergeJoinExec

3. Unify scan to forward-only (remove findBestRightToLeft)
   - Backward joins: forward scan keeping the last as-of-satisfying
     row (monotone condition guarantees correctness + early termination
     on true->false transition)
   - Forward/Nearest joins: forward scan with distance-based early
     termination (unchanged logic)
   - Both are compatible with ExternalAppendOnlyUnsafeRowArray's
     forward-only generateIterator()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant