From 9d30286a3b936882614970f60af09747955db384 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 15 Jun 2026 18:38:32 +0900 Subject: [PATCH 1/2] [SPARK-56887][SQL][FOLLOW-UP] Extend ShuffledJoin + spill support + forward scan unification Refactor SortMergeAsOfJoinExec per review feedback: 1. Extend ShuffledJoin instead of BaseJoinExec - Inherits standard requiredChildDistribution, outputPartitioning, and output for all join types - Override requiredChildDistribution only for the AllTuples case (no equi-keys) - supportCodegen = false (codegen is future work) 2. Replace ArrayBuffer with ExternalAppendOnlyUnsafeRowArray - Right-side group buffer now spills to disk when the in-memory threshold is exceeded, eliminating OOM for skewed equi-key groups - Uses the same spark.sql.sortMergeJoinExecBuffer.* configs as SortMergeJoinExec 3. Unify scan to forward-only (remove findBestRightToLeft) - Backward joins: forward scan keeping the last as-of-satisfying row (monotone condition guarantees correctness + early termination on true->false transition) - Forward/Nearest joins: forward scan with distance-based early termination (unchanged logic) - Both are compatible with ExternalAppendOnlyUnsafeRowArray's forward-only generateIterator() --- .../joins/SortMergeAsOfJoinExec.scala | 252 ++++++++---------- 1 file changed, 111 insertions(+), 141 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeAsOfJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeAsOfJoinExec.scala index e5fd7b6403149..955fc7a185c82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeAsOfJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeAsOfJoinExec.scala @@ -17,14 +17,12 @@ package org.apache.spark.sql.execution.joins -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, GenerateOrdering} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.util.TypeUtils @@ -34,14 +32,15 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} /** * Performs an AS-OF join using sort-merge. Both sides are co-partitioned * by the equi-join keys and sorted by (equi-join keys, as-of key). - * For each left row, we scan the right side to find the nearest match - * satisfying the as-of condition. + * For each left row, we scan the right-side group buffer (forward-only) + * to find the nearest match satisfying the as-of condition. + * + * The right-side buffer uses [[ExternalAppendOnlyUnsafeRowArray]] which + * spills to disk when the in-memory threshold is exceeded, avoiding OOM + * for skewed equi-key groups. * * Note: When there are no equi-keys, both sides are collected into a - * single partition (AllTuples). The right side is fully buffered in - * memory, so this operator is not suitable for large right-side tables - * without equi-keys. For each equi-key group, all right rows with that - * key are also buffered in memory; skewed equi-key groups can OOM. + * single partition (AllTuples) and the entire right side is buffered. */ case class SortMergeAsOfJoinExec( leftKeys: Seq[Expression], @@ -53,32 +52,29 @@ case class SortMergeAsOfJoinExec( joinType: JoinType, condition: Option[Expression], left: SparkPlan, - right: SparkPlan) extends BaseJoinExec { + right: SparkPlan, + isSkewJoin: Boolean = false) extends ShuffledJoin { + + require(Seq(Inner, LeftOuter).exists(joinType == _), + s"$nodeName does not support join type: $joinType") override lazy val metrics: Map[String, SQLMetric] = Map( - "numOutputRows" -> SQLMetrics.createMetric(sparkContext, - "number of output rows")) - - override def output: Seq[Attribute] = joinType match { - case LeftOuter => - left.output ++ right.output.map(_.withNullability(true)) - case _: InnerLike => - left.output ++ right.output - case other => - throw SparkException.internalError( - s"$nodeName does not support join type: $other") - } + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) - override def outputOrdering: Seq[SortOrder] = { - // Output preserves left-side ordering (equi-keys + as-of key) - left.outputOrdering - } + override def supportCodegen: Boolean = false + + // Codegen stubs (not called since supportCodegen = false) + override def inputRDDs(): Seq[RDD[InternalRow]] = + left.execute() :: right.execute() :: Nil + override protected def doProduce(ctx: CodegenContext): String = + throw SparkException.internalError(s"$nodeName does not support codegen") override def requiredChildDistribution: Seq[Distribution] = { if (leftKeys.isEmpty) { AllTuples :: AllTuples :: Nil } else { - ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil + super.requiredChildDistribution } } @@ -90,15 +86,7 @@ case class SortMergeAsOfJoinExec( leftOrdering :: rightOrdering :: Nil } - override def outputPartitioning: Partitioning = joinType match { - case _: InnerLike => - PartitioningCollection( - Seq(left.outputPartitioning, right.outputPartitioning)) - case LeftOuter => left.outputPartitioning - case other => - throw SparkException.internalError( - s"$nodeName does not support join type: $other") - } + override def outputOrdering: Seq[SortOrder] = left.outputOrdering // Determine scan direction based on the order expression (distance metric). // This is a performance heuristic only -- if it misclassifies, the scan @@ -106,56 +94,56 @@ case class SortMergeAsOfJoinExec( // is lost. // // orderExpression is direction-unique by construction: - // Backward: Subtract(leftAsOf, rightAsOf) -> right-to-left - // Forward: Subtract(rightAsOf, leftAsOf) -> left-to-right - // Nearest: If(...) -> left-to-right - private val scanRightToLeft: Boolean = orderExpression match { + // Backward: Subtract(leftAsOf, rightAsOf) -> backward mode + // Forward: Subtract(rightAsOf, leftAsOf) -> forward mode + // Nearest: If(...) -> forward mode + private val isBackwardJoin: Boolean = orderExpression match { case Subtract(l, _, _) if l.semanticEquals(leftAsOfExpr) => true case _ => false } protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") - val scanFromRight = scanRightToLeft + val spillSize = longMetric("spillSize") + val isBackward = isBackwardJoin + val inMemoryThreshold = conf.sortMergeJoinExecBufferInMemoryThreshold + val sizeInBytesSpillThreshold = conf.sortMergeJoinExecBufferSpillSizeThreshold + val spillThreshold = conf.sortMergeJoinExecBufferSpillThreshold left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => val scanner = new SortMergeAsOfJoinScanner( - leftIter, - rightIter, - left.output, - right.output, - leftKeys, - rightKeys, - asOfCondition, - orderExpression, - joinType, - condition, - numOutputRows, - scanFromRight + leftIter, rightIter, + left.output, right.output, + leftKeys, rightKeys, + asOfCondition, orderExpression, + joinType, condition, + numOutputRows, spillSize, isBackward, + inMemoryThreshold, sizeInBytesSpillThreshold, spillThreshold ) - // Register cleanup to release the right-side buffer on task completion TaskContext.get().addTaskCompletionListener[Unit](_ => scanner.close()) scanner.iterator } } override protected def withNewChildrenInternal( - newLeft: SparkPlan, - newRight: SparkPlan): SortMergeAsOfJoinExec = { + newLeft: SparkPlan, newRight: SparkPlan): SortMergeAsOfJoinExec = { copy(left = newLeft, right = newRight) } } /** - * Performs the sort-merge AS-OF join scan. + * Performs the sort-merge AS-OF join scan using forward-only iteration. * * Both inputs are sorted by (equi-keys, as-of key) ascending. For each - * left row within an equi-key group, we find the right row that satisfies - * the as-of condition and minimizes the order expression (distance). + * left row within an equi-key group, we scan the right-side buffer + * forward to find the best match. * - * Since the right side is sorted by as-of key within each group, for - * backward joins we scan right-to-left and stop at the first match - * (exploiting sort order for early termination). + * For Backward joins (left.t >= right.t), the forward scan keeps the + * last as-of-satisfying row as the best match (since the buffer is + * sorted ascending, the last satisfying row is the closest). + * + * For Forward/Nearest joins, the forward scan uses distance-based + * early termination (stop when distance starts increasing). */ private[joins] class SortMergeAsOfJoinScanner( leftIter: Iterator[InternalRow], @@ -169,20 +157,22 @@ private[joins] class SortMergeAsOfJoinScanner( joinType: JoinType, residualCondition: Option[Expression], numOutputRows: SQLMetric, - scanRightToLeft: Boolean) { + spillSize: SQLMetric, + isBackwardJoin: Boolean, + inMemoryThreshold: Int, + sizeInBytesSpillThreshold: Long, + spillThreshold: Int) { private val joinedOutput = leftOutput ++ rightOutput private val joinedRow = new JoinedRow() private val resultProjection = UnsafeProjection.create(joinedOutput, joinedOutput) - // Bound expressions for evaluating conditions on joined rows private val boundAsOfCond = bindReference(asOfCondition, joinedOutput) private val boundOrderExpr = bindReference(orderExpression, joinedOutput) private val boundResidualCond = residualCondition.map(bindReference(_, joinedOutput)) - // Key ordering for equi-join keys private val equiKeyOrdering: Option[BaseOrdering] = if (leftKeys.nonEmpty) { val keyAttributes = leftKeys.zipWithIndex.map { case (key, i) => @@ -194,33 +184,32 @@ private[joins] class SortMergeAsOfJoinScanner( None } - // Projections to extract equi-keys for comparison private val leftKeyProj = UnsafeProjection.create(leftKeys, leftOutput) private val rightKeyProj = UnsafeProjection.create(rightKeys, rightOutput) - // Ordering for the distance metric private val distanceOrdering = TypeUtils.getInterpretedOrdering(orderExpression.dataType) - // Null row for LeftOuter when no match is found private val nullRightRow = new GenericInternalRow(rightOutput.length) - // Right-side buffer: holds right rows for the current equi-key group. - // Rows are sorted by as-of key ascending (guaranteed by requiredChildOrdering). - private val rightGroupBuffer = new ArrayBuffer[InternalRow]() + // Spill-backed right-side buffer + private val rightGroupBuffer = new ExternalAppendOnlyUnsafeRowArray( + inMemoryThreshold, sizeInBytesSpillThreshold, spillThreshold, sizeInBytesSpillThreshold) + private var rightGroupKey: UnsafeRow = _ - private var rightPeek: InternalRow = _ + private var rightPeek: UnsafeRow = _ private var rightDone: Boolean = !rightIter.hasNext - // Initialize: read first right row + // Projection to convert right rows to UnsafeRow for the buffer + private val rightToUnsafe = UnsafeProjection.create(rightOutput, rightOutput) + if (!rightDone) { - rightPeek = rightIter.next().copy() + rightPeek = rightToUnsafe(rightIter.next()).copy() } - /** Release resources held by this scanner. */ def close(): Unit = { + spillSize += rightGroupBuffer.spillSize rightGroupBuffer.clear() - rightGroupBuffer.trimToSize() } def iterator: Iterator[InternalRow] = new Iterator[InternalRow] { @@ -254,10 +243,8 @@ private[joins] class SortMergeAsOfJoinScanner( return resultProjection(joinedRow).copy() } } else { - // Advance right side to the matching equi-key group advanceRightTo(leftKey) - // Search for best match exploiting sort order val bestMatch = findBestInGroup(leftRow) if (bestMatch != null) { @@ -269,39 +256,30 @@ private[joins] class SortMergeAsOfJoinScanner( joinedRow.withLeft(leftRow).withRight(nullRightRow) return resultProjection(joinedRow).copy() } - // Inner join: no match, skip } } null } } - /** - * Advance the right side so that rightGroupBuffer contains all right - * rows whose equi-key matches `leftKey`. - */ private def advanceRightTo(leftKey: UnsafeRow): Unit = { equiKeyOrdering match { case None => - // No equi-keys: buffer all right rows once. - // WARNING: This loads the entire right partition into memory. if (rightGroupBuffer.isEmpty && !rightDone) { bufferAllRight() } case Some(ordering) => - // Check if current buffer already matches if (rightGroupKey != null && ordering.compare(leftKey, rightGroupKey) == 0) { return } - // Skip right rows with keys < leftKey while (!rightDone && rightPeek != null) { val rightKey = rightKeyProj(rightPeek) val cmp = ordering.compare(leftKey, rightKey) if (cmp > 0) { rightPeek = if (rightIter.hasNext) { - rightIter.next().copy() + rightToUnsafe(rightIter.next()).copy() } else { rightDone = true; null } @@ -319,7 +297,6 @@ private[joins] class SortMergeAsOfJoinScanner( } } - /** Buffer all right rows with the same equi-key as leftKey. */ private def bufferRightGroup( leftKey: UnsafeRow, ordering: BaseOrdering): Unit = { rightGroupBuffer.clear() @@ -328,9 +305,9 @@ private[joins] class SortMergeAsOfJoinScanner( while (!rightDone && rightPeek != null) { val rightKey = rightKeyProj(rightPeek) if (ordering.compare(leftKey, rightKey) == 0) { - rightGroupBuffer += rightPeek + rightGroupBuffer.add(rightPeek) rightPeek = if (rightIter.hasNext) { - rightIter.next().copy() + rightToUnsafe(rightIter.next()).copy() } else { rightDone = true; null } @@ -340,46 +317,49 @@ private[joins] class SortMergeAsOfJoinScanner( } } - /** Buffer all remaining right rows (no equi-keys case). */ private def bufferAllRight(): Unit = { rightGroupBuffer.clear() if (rightPeek != null) { - rightGroupBuffer += rightPeek + rightGroupBuffer.add(rightPeek) rightPeek = null } while (rightIter.hasNext) { - rightGroupBuffer += rightIter.next().copy() + rightGroupBuffer.add(rightToUnsafe(rightIter.next())) } rightDone = true } /** - * Find the best matching right row for the given left row within the - * current group buffer. + * Find the best matching right row using forward-only scan. + * + * For Backward joins: keeps the last as-of-satisfying row (since the + * buffer is sorted ascending, the last satisfying row has the largest + * right.t <= left.t, which is the closest match). Early-terminates + * when as-of condition transitions from true to false (monotone). * - * The buffer is sorted by as-of key ascending. The scan direction is - * chosen based on where the best match is expected: - * - Backward (left >= right): best match near the end -> right-to-left - * - Forward (left <= right): best match near the start -> left-to-right - * - Nearest: full scan needed (left-to-right, stop when distance - * increases after finding a match) + * For Forward/Nearest joins: uses distance-based early termination + * (stop when distance starts increasing past the minimum). */ private def findBestInGroup(leftRow: InternalRow): InternalRow = { - if (scanRightToLeft) { - findBestRightToLeft(leftRow) + if (isBackwardJoin) { + findBestBackwardForward(leftRow) } else { - findBestLeftToRight(leftRow) + findBestForwardNearest(leftRow) } } - /** Scan from end to start (optimal for Backward joins). */ - private def findBestRightToLeft(leftRow: InternalRow): InternalRow = { + /** + * Forward scan for Backward joins: last-match-wins. + * Buffer is sorted ascending by as-of key. For left.t >= right.t, + * as-of condition is monotone: true for right.t <= left.t, then false. + * The last satisfying row is the closest match. + */ + private def findBestBackwardForward(leftRow: InternalRow): InternalRow = { var bestMatch: InternalRow = null - var bestDistance: Any = null + val iter = rightGroupBuffer.generateIterator() - var i = rightGroupBuffer.size - 1 - while (i >= 0) { - val rightRow = rightGroupBuffer(i) + while (iter.hasNext) { + val rightRow = iter.next() joinedRow.withLeft(leftRow).withRight(rightRow) val asOfSatisfied = boundAsOfCond.eval(joinedRow) @@ -389,35 +369,29 @@ private[joins] class SortMergeAsOfJoinScanner( result != null && result.asInstanceOf[Boolean] } if (residualSatisfied) { - val distance = boundOrderExpr.eval(joinedRow) - if (distance != null) { - if (bestMatch == null) { - bestMatch = rightRow - bestDistance = distance - } else if (distanceOrdering.lt(distance, bestDistance)) { - bestMatch = rightRow - bestDistance = distance - } else { - return bestMatch - } - } + // Last match wins (closest right.t to left.t) + bestMatch = rightRow.copy() } } else if (bestMatch != null) { + // as-of condition transitioned true -> false (monotone for Backward). + // No further rows can satisfy it. return bestMatch } - i -= 1 } bestMatch } - /** Scan from start to end (optimal for Forward/Nearest joins). */ - private def findBestLeftToRight(leftRow: InternalRow): InternalRow = { + /** + * Forward scan for Forward/Nearest joins: distance-based termination. + * Stop when distance starts increasing past the minimum found so far. + */ + private def findBestForwardNearest(leftRow: InternalRow): InternalRow = { var bestMatch: InternalRow = null var bestDistance: Any = null + val iter = rightGroupBuffer.generateIterator() - var i = 0 - while (i < rightGroupBuffer.size) { - val rightRow = rightGroupBuffer(i) + while (iter.hasNext) { + val rightRow = iter.next() joinedRow.withLeft(leftRow).withRight(rightRow) val asOfSatisfied = boundAsOfCond.eval(joinedRow) @@ -429,27 +403,23 @@ private[joins] class SortMergeAsOfJoinScanner( if (residualSatisfied) { val distance = boundOrderExpr.eval(joinedRow) if (distance != null) { - if (bestMatch == null) { - bestMatch = rightRow - bestDistance = distance - } else if (distanceOrdering.lt(distance, bestDistance)) { - bestMatch = rightRow + if (bestMatch == null || distanceOrdering.lt(distance, bestDistance)) { + bestMatch = rightRow.copy() bestDistance = distance } else { - // Distance is increasing; for Forward the as-of condition - // guarantees no closer row exists further right. For - // Nearest the distance is V-shaped so once past the - // minimum no later row can beat it. + // Distance is increasing past the minimum. For Forward, + // the as-of condition guarantees no closer row exists + // further right. For Nearest, distance is V-shaped so + // once past the minimum no later row can beat it. return bestMatch } } } } - // Note: we do NOT early-terminate on as-of condition failure here. + // Do NOT early-terminate on as-of condition failure here. // For Nearest + !allowExactMatches, the condition is false at a // single interior point (right == left) with valid matches on - // both sides. The distance-based termination above is sufficient. - i += 1 + // both sides. Distance-based termination above is sufficient. } bestMatch } From 240151742a9ae2eab414e0673334539134d3a16b Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 16 Jun 2026 03:07:15 +0900 Subject: [PATCH 2/2] Benchmark results for AsOfJoinBenchmark (JDK 17/21/25, AMD EPYC 7763) --- .../benchmarks/AsOfJoinBenchmark-jdk21-results.txt | 12 ++++++------ .../benchmarks/AsOfJoinBenchmark-jdk25-results.txt | 12 ++++++------ sql/core/benchmarks/AsOfJoinBenchmark-results.txt | 12 ++++++------ 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/sql/core/benchmarks/AsOfJoinBenchmark-jdk21-results.txt b/sql/core/benchmarks/AsOfJoinBenchmark-jdk21-results.txt index 8e21b5ef31de7..2fc4e230fd603 100644 --- a/sql/core/benchmarks/AsOfJoinBenchmark-jdk21-results.txt +++ b/sql/core/benchmarks/AsOfJoinBenchmark-jdk21-results.txt @@ -2,18 +2,18 @@ AS-OF Join Benchmark ================================================================================================ -OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1010-azure +OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1018-azure AMD EPYC 7763 64-Core Processor AS-OF Join (left=10000, right=10000, groups=100): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------- -Correlated subquery (baseline) 37494 37697 287 0.0 3749357.9 1.0X -Sort-merge AS-OF join 62 85 19 0.2 6236.4 601.2X +Correlated subquery (baseline) 38299 38496 279 0.0 3829864.5 1.0X +Sort-merge AS-OF join 68 92 13 0.1 6813.3 562.1X -OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1010-azure +OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1018-azure AMD EPYC 7763 64-Core Processor AS-OF Join no equi-key (left=10000, right=10000): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------- -Correlated subquery (baseline) 23664 23684 27 0.0 2366425.3 1.0X -Sort-merge AS-OF join 1773 1795 32 0.0 177269.8 13.3X +Correlated subquery (baseline) 24733 24827 133 0.0 2473327.2 1.0X +Sort-merge AS-OF join 1570 1581 16 0.0 156990.6 15.8X diff --git a/sql/core/benchmarks/AsOfJoinBenchmark-jdk25-results.txt b/sql/core/benchmarks/AsOfJoinBenchmark-jdk25-results.txt index d2c13cfe69955..46fcdaeeb7715 100644 --- a/sql/core/benchmarks/AsOfJoinBenchmark-jdk25-results.txt +++ b/sql/core/benchmarks/AsOfJoinBenchmark-jdk25-results.txt @@ -2,18 +2,18 @@ AS-OF Join Benchmark ================================================================================================ -OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1010-azure +OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1018-azure AMD EPYC 7763 64-Core Processor AS-OF Join (left=10000, right=10000, groups=100): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------- -Correlated subquery (baseline) 37549 37690 199 0.0 3754903.0 1.0X -Sort-merge AS-OF join 56 73 14 0.2 5550.4 676.5X +Correlated subquery (baseline) 40197 40231 47 0.0 4019717.4 1.0X +Sort-merge AS-OF join 61 89 15 0.2 6093.4 659.7X -OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1010-azure +OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1018-azure AMD EPYC 7763 64-Core Processor AS-OF Join no equi-key (left=10000, right=10000): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------- -Correlated subquery (baseline) 22100 22128 40 0.0 2209969.1 1.0X -Sort-merge AS-OF join 1614 1624 13 0.0 161417.4 13.7X +Correlated subquery (baseline) 23803 23818 21 0.0 2380348.1 1.0X +Sort-merge AS-OF join 1585 1602 24 0.0 158493.5 15.0X diff --git a/sql/core/benchmarks/AsOfJoinBenchmark-results.txt b/sql/core/benchmarks/AsOfJoinBenchmark-results.txt index 55794a9296017..60edcb2743e50 100644 --- a/sql/core/benchmarks/AsOfJoinBenchmark-results.txt +++ b/sql/core/benchmarks/AsOfJoinBenchmark-results.txt @@ -2,18 +2,18 @@ AS-OF Join Benchmark ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1018-azure AMD EPYC 7763 64-Core Processor AS-OF Join (left=10000, right=10000, groups=100): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------- -Correlated subquery (baseline) 37391 37469 110 0.0 3739110.1 1.0X -Sort-merge AS-OF join 59 70 9 0.2 5918.1 631.8X +Correlated subquery (baseline) 37910 37980 99 0.0 3790977.0 1.0X +Sort-merge AS-OF join 53 65 9 0.2 5264.7 720.1X -OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure +OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1018-azure AMD EPYC 7763 64-Core Processor AS-OF Join no equi-key (left=10000, right=10000): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------- -Correlated subquery (baseline) 24013 24033 28 0.0 2401313.2 1.0X -Sort-merge AS-OF join 1713 1719 9 0.0 171343.3 14.0X +Correlated subquery (baseline) 23421 23429 11 0.0 2342088.5 1.0X +Sort-merge AS-OF join 1399 1402 4 0.0 139932.2 16.7X