Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Spark 4.1 <spark-4.1/index>
Expressions that are not 100% Spark-compatible fall back to Spark by default, except those
with a JVM codegen-dispatch path, which stay in Comet's native pipeline and match Spark
exactly. Set `spark.comet.expression.EXPRNAME.allowIncompatible=true`, where `EXPRNAME` is
the Spark expression class name, to run Comet's faster native implementation despite its
the Spark expression class name, to run Comet's native implementation despite its
differences from Spark. See the
[Comet Supported Expressions Guide](../../expressions.md) for more information on this
configuration setting.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Compatibility notes for Comet running on Apache Spark 3.4. Expressions that are
Spark-compatible fall back to Spark by default, except those with a JVM codegen-dispatch
path, which stay in Comet's native pipeline and match Spark exactly. Set
`spark.comet.expression.EXPRNAME.allowIncompatible=true`, where `EXPRNAME` is the Spark
expression class name, to run Comet's faster native implementation despite its differences
expression class name, to run Comet's native implementation despite its differences
from Spark. See the [Comet Supported Expressions Guide](../../../expressions.md)
for more information on this configuration setting.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Compatibility notes for Comet running on Apache Spark 3.5. Expressions that are
Spark-compatible fall back to Spark by default, except those with a JVM codegen-dispatch
path, which stay in Comet's native pipeline and match Spark exactly. Set
`spark.comet.expression.EXPRNAME.allowIncompatible=true`, where `EXPRNAME` is the Spark
expression class name, to run Comet's faster native implementation despite its differences
expression class name, to run Comet's native implementation despite its differences
from Spark. See the [Comet Supported Expressions Guide](../../../expressions.md)
for more information on this configuration setting.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Compatibility notes for Comet running on Apache Spark 4.0. Expressions that are
Spark-compatible fall back to Spark by default, except those with a JVM codegen-dispatch
path, which stay in Comet's native pipeline and match Spark exactly. Set
`spark.comet.expression.EXPRNAME.allowIncompatible=true`, where `EXPRNAME` is the Spark
expression class name, to run Comet's faster native implementation despite its differences
expression class name, to run Comet's native implementation despite its differences
from Spark. See the [Comet Supported Expressions Guide](../../../expressions.md)
for more information on this configuration setting.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Compatibility notes for Comet running on Apache Spark 4.1. Expressions that are
Spark-compatible fall back to Spark by default, except those with a JVM codegen-dispatch
path, which stay in Comet's native pipeline and match Spark exactly. Set
`spark.comet.expression.EXPRNAME.allowIncompatible=true`, where `EXPRNAME` is the Spark
expression class name, to run Comet's faster native implementation despite its differences
expression class name, to run Comet's native implementation despite its differences
from Spark. See the [Comet Supported Expressions Guide](../../../expressions.md)
for more information on this configuration setting.

Expand Down
8 changes: 4 additions & 4 deletions docs/source/user-guide/latest/compatibility/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ This guide documents areas where Comet's behavior is known to differ from Spark.
## Compatible by default, opt in to native

Comet runs a Spark-compatible implementation of every supported expression by default. Some
expressions also have a faster native implementation that can differ from Spark for certain
expressions also have a native implementation that can differ from Spark for certain
inputs. These are not used unless you opt in by setting the relevant
`spark.comet.expression.<Name>.allowIncompatible=true` config (a few use a dedicated config, noted
per expression below), after which you accept the documented differences.
Expand All @@ -53,11 +53,11 @@ Some Spark expressions have two implementations in Comet:
produces byte-exact Spark results at the cost of one JNI round-trip per batch. It is gated
globally by `spark.comet.exec.scalaUDF.codegen.enabled` (enabled by default); when the
dispatcher is disabled, these expressions fall back to Spark.
- A **native** (Rust / DataFusion) implementation that is faster, with no JNI overhead, but
has known semantic differences from Spark for some inputs or patterns.
- A **native** (Rust / DataFusion) implementation that avoids the JNI round-trip but has
known semantic differences from Spark for some inputs or patterns.

Because the codegen-dispatch path matches Spark exactly, Comet uses it by **default**. The
faster native path is **opt-in per expression** via that expression's
native path is **opt-in per expression** via that expression's
`spark.comet.expression.<ExprClassName>.allowIncompatible=true` flag, which declares that you
accept its differences from Spark. There is no global opt-in. When the native path is enabled
but a specific input or pattern has no native implementation, Comet routes that case back
Expand Down
32 changes: 23 additions & 9 deletions spark/src/main/scala/org/apache/comet/GenerateDocs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.Cast

import org.apache.comet.CometConf.COMET_ONHEAP_MEMORY_OVERHEAD
import org.apache.comet.expressions.{CometCast, CometEvalMode}
import org.apache.comet.serde.{CometAggregateExpressionSerde, CometExpressionSerde, Compatible, Incompatible, NativeOptInAvailable, QueryPlanSerde, Unsupported}
import org.apache.comet.serde.{CodegenDispatchFallback, CometAggregateExpressionSerde, CometExpressionSerde, Compatible, Incompatible, NativeOptInAvailable, QueryPlanSerde, Unsupported}

/**
* Utility for generating markdown documentation from the configs.
Expand All @@ -49,20 +49,24 @@ object GenerateDocs {
* @param incompatibleReasons
* reasons the native implementation is incompatible with Spark
* @param unsupportedReasons
* cases that Comet does not support
* cases that Comet's native implementation does not handle
* @param nativeOptIn
* whether the serde implements `NativeOptInAvailable`, meaning the expression runs a
* Spark-compatible path by default and the user can opt into a faster native path
* Spark-compatible path by default and the user can opt into a native path
* @param nativeOptInConfigKey
* the config key the user sets to opt into the native path
* @param codegenDispatchFallback
* whether the serde mixes in `CodegenDispatchFallback`, meaning `unsupportedReasons` cases
* route through the JVM codegen dispatcher instead of falling back to Spark
*/
private case class ExprNotes(
name: String,
compatibleNotes: Seq[String],
incompatibleReasons: Seq[String],
unsupportedReasons: Seq[String],
nativeOptIn: Boolean,
nativeOptInConfigKey: String)
nativeOptInConfigKey: String,
codegenDispatchFallback: Boolean)

private type CategoryNotes = Seq[ExprNotes]

Expand All @@ -80,7 +84,8 @@ object GenerateDocs {
serde.getIncompatibleReasons(),
serde.getUnsupportedReasons(),
optIn,
key)
key,
codegenDispatchFallback = serde.isInstanceOf[CodegenDispatchFallback])
}

/** Build the documentation notes for a single aggregate expression serde. */
Expand All @@ -92,7 +97,8 @@ object GenerateDocs {
serde.getUnsupportedReasons(),
// Aggregate serdes do not have a native opt-in path.
nativeOptIn = false,
nativeOptInConfigKey = CometConf.getExprAllowIncompatConfigKey(cls))
nativeOptInConfigKey = CometConf.getExprAllowIncompatConfigKey(cls),
codegenDispatchFallback = false)

/**
* Mapping from expression category to the compatibility guide filename where that category's
Expand Down Expand Up @@ -277,8 +283,9 @@ object GenerateDocs {
}
if (n.incompatibleReasons.nonEmpty) {
val header = if (n.nativeOptIn) {
s"\nBy default, Comet runs a Spark-compatible implementation of `$name`. Set" +
s" `${n.nativeOptInConfigKey}=true` to use Comet's faster native implementation" +
s"\nBy default, `$name` is evaluated in the JVM using Spark's own code-generated" +
" implementation (run inside the Comet pipeline), which matches Spark exactly." +
s" Set `${n.nativeOptInConfigKey}=true` to opt into Comet's native implementation" +
" instead, which has the following differences from Spark:\n\n"
} else {
s"\nThe following incompatibilities cause `$name` to fall back to Spark by default." +
Expand All @@ -291,7 +298,14 @@ object GenerateDocs {
}
}
if (n.unsupportedReasons.nonEmpty) {
w.write("\nThe following cases are not supported by Comet:\n\n".getBytes)
val header = if (n.codegenDispatchFallback) {
"\nThe following cases have no native implementation and always run in the JVM using" +
" Spark's code-generated implementation (inside the Comet pipeline):\n\n"
} else {
"\nThe following cases are not supported by Comet and always fall back to Spark," +
" regardless of any `allowIncompatible` setting:\n\n"
}
w.write(header.getBytes)
for (reason <- n.unsupportedReasons) {
w.write(s"- $reason\n".getBytes)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,13 @@ trait CometExpressionSerde[T <: Expression] {
def getIncompatibleReasons(): Seq[String] = Seq.empty

/**
* Get documentation for usages where this expression is unsupported with Spark. This is called
* from GenerateDocs when generating the Compatibility Guide. Each reason should be written in
* Markdown and may span multiple lines.
* Get documentation for usages of this expression that Comet's native implementation does not
* support. Cases listed here normally fall back to Spark, regardless of any `allowIncompatible`
* setting. When the serde mixes in `CodegenDispatchFallback` they are instead routed through
* the JVM codegen dispatcher (Spark's own `doGenCode` inside the Comet pipeline), so they stay
* in the Comet pipeline while still matching Spark exactly. This is called from GenerateDocs
* when generating the Compatibility Guide. Each reason should be written in Markdown and may
* span multiple lines.
*
* @return
* List of reasons, defaulting to an empty list.
Expand Down Expand Up @@ -101,14 +105,22 @@ trait CometExpressionSerde[T <: Expression] {
}

/**
* Opt-in marker for expression serdes that have a native implementation which is `Incompatible`
* with Spark for some inputs. When such an expression reports `Incompatible` and the user has not
* enabled `allowIncompatible` for it, mixing in this trait routes it through the JVM codegen
* dispatcher (running Spark's own `doGenCode` inside the Comet pipeline) instead of falling the
* projection back to Spark, so it stays native while still matching Spark exactly.
* Mixin for serdes whose native implementation cannot match Spark for some inputs. When
* `getSupportLevel` returns `Incompatible` and the user has not enabled `allowIncompatible`, the
* expression routes through the JVM codegen dispatcher (Spark's own `doGenCode` inside the Comet
* pipeline) instead of falling the projection back to Spark. When `getSupportLevel` returns
* `Unsupported`, the expression always routes through the dispatcher -- the serde is declaring
* "no native path exists for this case; run Spark's code in-pipeline." Spark fallback is reserved
* for the case where the dispatcher itself cannot handle the expression (e.g. the global codegen
* flag is off, or the kernel rejects the bound tree).
*
* Contract for `Unsupported` reasons on a `CodegenDispatchFallback` serde: the case must be

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expanded contract reads well. Worth noting for future authors that it is essentially always satisfiable: any case that reaches a serde already passed Spark analysis, so Spark supports it and doGenCode can compile it. The only way dispatch declines is a kernel limitation in canHandle, which falls back cleanly. No change needed, just confirming the reasoning holds.

* something `Expression.doGenCode` can compile. If you mark something `Unsupported` because Spark
* also rejects it, that is fine -- the dispatcher will surface the same error Spark would have.
*
* Enrollment is opt-in: only serdes that explicitly mix this in are routed through the
* dispatcher. Every other `Incompatible` expression falls back to Spark.
* dispatcher. Every other `Incompatible` expression falls back to Spark, and every other
* `Unsupported` expression falls back to Spark.
*/
trait CodegenDispatchFallback extends NativeOptInAvailable { self: CometExpressionSerde[_] => }

Expand Down
42 changes: 31 additions & 11 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -767,8 +767,14 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
}
handler.getSupportLevel(expr) match {
case Unsupported(notes) =>
withFallbackReason(expr, notes.getOrElse(""))
None
// `CodegenDispatchFallback` serdes have no native path for these cases either, but the
// dispatcher can still run Spark's own `doGenCode` inside the Comet pipeline. Try that
// before falling the projection back to Spark. No `[COMET-INFO]` hint here: unlike

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment explaining why there is no [COMET-INFO] hint in the Unsupported arm is helpful. Did you consider a debug-level log when an Unsupported case routes through dispatch? Without the opt-in flag there is no plan hint, so anyone debugging why a projection stayed native has nothing to go on. Not essential, just a thought.

// `Incompatible`, there is no native opt-in for the user to flip.
dispatchIfFallback(handler, expr, inputs, binding).map(_._2).orElse {
withFallbackReason(expr, notes.getOrElse(""))
None
}
case Incompatible(notes) =>
val exprAllowIncompat = CometConf.isExprAllowIncompat(exprConfName)
if (exprAllowIncompat) {
Expand All @@ -785,15 +791,12 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
// pipeline) so the projection stays native while still matching Spark. Everything else
// falls back to Spark. Falling back is also the result when the dispatcher cannot
// handle the expression.
val dispatched = handler match {
case h: CodegenDispatchFallback =>
CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding).map { proto =>
val key = h.nativeOptInConfigKeyOverride
.getOrElse(CometConf.getExprAllowIncompatConfigKey(exprConfName))
withInfo(expr, NativeOptIn.message(exprConfName, key))
proto
}
case _ => None
val dispatched = dispatchIfFallback(handler, expr, inputs, binding).map {
case (h, proto) =>
val key = h.nativeOptInConfigKeyOverride
.getOrElse(CometConf.getExprAllowIncompatConfigKey(exprConfName))
withInfo(expr, NativeOptIn.message(exprConfName, key))
proto
}
dispatched.orElse {
val optionalNotes = notes.map(str => s" ($str)").getOrElse("")
Expand Down Expand Up @@ -1036,6 +1039,23 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {

}

/**
* If `handler` is a `CodegenDispatchFallback`, run `expr` through the JVM codegen dispatcher
* and return `Some((handler, proto))` on success; otherwise return `None`. Shared by the
* `Unsupported` and (non-opt-in) `Incompatible` arms of `exprToProtoInternal` so they don't
* each inline the same pattern match. Returning the matched handler lets the `Incompatible` arm
* reach `nativeOptInConfigKeyOverride` without re-pattern-matching the same value.
*/
private def dispatchIfFallback(
handler: CometExpressionSerde[_],
expr: Expression,
inputs: Seq[Attribute],
binding: Boolean): Option[(CodegenDispatchFallback, Expr)] = handler match {
case h: CodegenDispatchFallback =>
CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding).map(h -> _)
case _ => None
}

// scalastyle:off
/**
* Align w/ Arrow's
Expand Down
3 changes: 1 addition & 2 deletions spark/src/main/scala/org/apache/comet/serde/arrays.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ object CometSortArray extends CometExpressionSerde[SortArray] with CodegenDispat
" floating-point types is not 100% compatible with Spark")

override def getUnsupportedReasons(): Seq[String] = Seq(
"Nested arrays with `Struct` or `Null` child values are not supported natively and will" +
" fall back to Spark.")
"Nested arrays with `Struct` or `Null` child values are not supported natively")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SortArray actually has a second Unsupported branch, the non-boolean-literal ascendingOrder case at line 156, and this PR routes that through dispatch too. The description only calls out the nested Struct/Null case. The dispatch path looks safe since Spark requires ascendingOrder to be foldable so doGenCode can always compile it, but I do not see a test that confirms this case stays native rather than falling back. Could we add one, or confirm it is covered elsewhere?


private def supportedSortArrayElementType(
dt: DataType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,20 @@ CREATE TABLE test_array_concat(c1 array<int>, c2 array<int>, c3 array<int>, c4 a
statement
INSERT INTO test_array_concat VALUES (array(0, 1), array(2, 3), array(), array(null), null), (array(1, 2), array(3, 4), array(), array(null), null), (array(2, 3), array(4, 5), array(), array(null), null)

query expect_fallback(CONCAT supports only string input parameters)
-- Concat mixes in CodegenDispatchFallback, so non-string (array) children have no native path
-- but route through the JVM codegen dispatcher (Spark's own Concat.doGenCode inside the Comet
-- pipeline) and stay native while matching Spark exactly.
query
SELECT concat(c1, c2) AS x FROM test_array_concat

query expect_fallback(CONCAT supports only string input parameters)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching these from expect_fallback to plain query is the right call now that they stay native, and the same applies to the other four files. One side effect is that the removed expect_fallback assertions were the only tests pinning the disabled-dispatcher fallback for these cases. Would it be worth one test with spark.comet.exec.scalaUDF.codegen.enabled=false confirming these Unsupported cases still fall back to Spark cleanly? That locks in the safety net the description mentions.

query
SELECT concat(c1, c1) AS x FROM test_array_concat

query expect_fallback(CONCAT supports only string input parameters)
query
SELECT concat(c1, c2, c3) AS x FROM test_array_concat

query expect_fallback(CONCAT supports only string input parameters)
query
SELECT concat(c1, c2, c3, c5) AS x FROM test_array_concat

query expect_fallback(CONCAT supports only string input parameters)
query
SELECT concat(concat(c1, c2, c3), concat(c1, c3)) AS x FROM test_array_concat
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,13 @@ INSERT INTO test_sort_array_nested_struct VALUES
(array()),
(NULL)

query expect_fallback(Sort on array element type ArrayType(StructType(StructField(a,IntegerType)
-- SortArray mixes in CodegenDispatchFallback, so nested arrays with Struct children have no
-- native path but route through the JVM codegen dispatcher (Spark's own SortArray.doGenCode
-- inside the Comet pipeline) and stay native while matching Spark exactly.
query
SELECT sort_array(arr) FROM test_sort_array_nested_struct

query expect_fallback(Sort on array element type ArrayType(StructType(StructField(a,IntegerType)
query
SELECT sort_array(arr, false) FROM test_sort_array_nested_struct

-- literal arguments
Expand Down Expand Up @@ -391,7 +394,10 @@ SELECT
sort_array(array(NULL, NULL)),
sort_array(cast(NULL as array<int>))

query expect_fallback(Sort on array element type ArrayType(StructType(StructField(a,IntegerType)
-- nested arrays with Struct children have no native path but route through the JVM codegen
-- dispatcher (Spark's own SortArray.doGenCode inside the Comet pipeline) and stay native while
-- matching Spark exactly.
query
SELECT sort_array(
array(
array(named_struct('a', 2)),
Expand Down
13 changes: 8 additions & 5 deletions spark/src/test/resources/sql-tests/expressions/string/concat.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,20 @@ SELECT concat('hello', ' ', 'world'), concat('', '', ''), concat(NULL, 'b', 'c')
statement
CREATE TABLE test_concat_binary USING parquet AS SELECT cast(uuid() as binary) c1, cast(uuid() as binary) c2, cast(uuid() as binary) c3, cast(uuid() as binary) c4, cast(null as binary) c5 FROM range(10)

query expect_fallback(CONCAT supports only string input parameters)
-- Concat mixes in CodegenDispatchFallback, so non-string (binary) children have no native path
-- but route through the JVM codegen dispatcher (Spark's own Concat.doGenCode inside the Comet
-- pipeline) and stay native while matching Spark exactly.
query
SELECT concat(c1, c2) AS x FROM test_concat_binary

query expect_fallback(CONCAT supports only string input parameters)
query
SELECT concat(c1, c1) AS x FROM test_concat_binary

query expect_fallback(CONCAT supports only string input parameters)
query
SELECT concat(c1, c2, c3) AS x FROM test_concat_binary

query expect_fallback(CONCAT supports only string input parameters)
query
SELECT concat(c1, c2, c3, c5) AS x FROM test_concat_binary

query expect_fallback(CONCAT supports only string input parameters)
query
SELECT concat(concat(c1, c2, c3), concat(c1, c3)) AS x FROM test_concat_binary
Loading
Loading