Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -454,19 +454,34 @@ trait RuntimeReplaceable extends Expression {
override val nodePatterns: Seq[TreePattern] = Seq(RUNTIME_REPLACEABLE)
override def nullable: Boolean = replacement.nullable
override def dataType: DataType = replacement.dataType
// The actual evaluation is delegated to `replacement`, so determinism must reflect `replacement`,
// not this expression's `children` (which are the original arguments). For example, the children
// of `Uniform` are literal bounds and a seed (all deterministic), while its `replacement` is a
// non-deterministic `Rand`. This matters now that a deterministic `RuntimeReplaceable` may survive
// into the physical plan: the survival decision relies on an accurate determinism signal.
override lazy val deterministic: Boolean = replacement.deterministic
// Foldability is also derived from `replacement` rather than this expression's `children`. Note
// that this can yield a foldable expression that still has references (e.g. `collation(c1)`, whose
// value depends only on the child's type): such an expression is materialized into a literal by
// `ConstantFolding`, and `FoldablePropagation` only propagates literals, never bare foldables.
override def foldable: Boolean = replacement.foldable
// As this expression gets replaced at optimization with its `child" expression,
// two `RuntimeReplaceable` are considered to be semantically equal if their "child" expressions
// are semantically equal.
override lazy val canonicalized: Expression = replacement.canonicalized

final override def eval(input: InternalRow = null): Any = {
// For convenience, we allow to evaluate `RuntimeReplaceable` expressions, in case we need to
// get a constant from foldable expression before the query execution starts.
assert(input == null)
replacement.eval()
// `RuntimeReplaceable` expressions are normally rewritten into their `replacement` by the
// `ReplaceExpressions` rule before execution. However, a `RuntimeReplaceable` may also be
// produced *after* `ReplaceExpressions` has run (e.g. by an optimizer rule). To keep such an
// expression evaluable without depending on the rewrite, both `eval` and `doGenCode` delegate
// to `replacement`. As `replacement` is derived from this expression's children, it is bound
// and code-generated together with them, so the delegation observes the same input row.
final override def eval(input: InternalRow = null): Any = replacement.eval(input)

final override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val childGen = replacement.genCode(ctx)
ev.copy(code = childGen.code, isNull = childGen.isNull, value = childGen.value)
}
final override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode =
throw QueryExecutionErrors.cannotGenerateCodeForExpressionError(this)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,10 @@ case class AggregateExpression(
*/
abstract class AggregateFunction extends Expression {

/** An aggregate function is not foldable. */
final override def foldable: Boolean = false
// An aggregate function is not foldable. This is not `final` so that `RuntimeReplaceableAggregate`
// can inherit `RuntimeReplaceable.foldable` (which delegates to `replacement`); since such a
// replacement is itself an aggregate, the effective foldability stays `false`.
override def foldable: Boolean = false

/** The schema of the aggregation buffer. */
def aggBufferSchema: StructType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ case class Uniform(
def this(min: Expression, max: Expression, seedExpression: Expression) =
this(min, max, seedExpression, hideSeed = false)

final override lazy val deterministic: Boolean = false
// `deterministic` is inherited from `RuntimeReplaceable`, which delegates to `replacement` (a
// non-deterministic `Rand`-based expression unless an argument is null).
override def nodePatternsInternal(): Seq[TreePattern] =
Seq(RUNTIME_REPLACEABLE, EXPRESSION_WITH_RANDOM_SEED)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,12 @@ object FoldablePropagation extends Rule[LogicalPlan] {

private def collectFoldables(expressions: Seq[NamedExpression]) = {
AttributeMap(expressions.collect {
case a: Alias if a.child.foldable => (a.toAttribute, a)
// Only propagate literals. A foldable expression is not necessarily a self-contained constant:
// a `RuntimeReplaceable` such as `collation(c1)` is foldable (its value depends only on the
// child's type) yet still references its children, so substituting it for its alias elsewhere
// would leave those references dangling. Such expressions are turned into literals by
// `ConstantFolding` (in the same fixed-point batch), after which they propagate safely.
case a: Alias if a.child.isInstanceOf[Literal] => (a.toAttribute, a)
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,23 @@ object ReplaceExpressions extends Rule[LogicalPlan] {
}

private def replace(e: Expression): Expression = e match {
case r: RuntimeReplaceable => replace(r.replacement)
// Aggregates can never self-evaluate (no real aggregation buffer), so always rewrite early.
case r: RuntimeReplaceableAggregate => replace(r.replacement)

case r: RuntimeReplaceable =>
val replaced = replace(r.replacement)
// Only a deterministic, fully-evaluable replacement may survive into the physical plan, to be
// matched by a native engine or materialized just before codegen. The other cases must be
// rewritten early instead:
// - A non-deterministic replacement (e.g. the `Rand` inside `uniform`) carries mutable
// per-partition state that must be initialized before eval. That state cannot be reliably
// initialized through the `lazy val replacement`, which tree transforms may re-create.
// - A replacement that contains an `Unevaluable` expression (e.g. `With`) depends on a later
// logical rule (such as `RewriteWithExpression`) that can only run in the logical phase.
// A foldable replacement (e.g. `collation(c1)`) is allowed to survive here; `ConstantFolding`
// later materializes it into a literal.
if (replaced.deterministic && !replaced.exists(_.isInstanceOf[Unevaluable])) r else replaced

case _ => e.mapChildren(replace)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [SYSTEM.BUILTIN.UTF8_BINARY AS collation(g)#0]
Project [collation(g#0) AS collation(g)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [invoke(SchemaOfCsvEvaluator(Map(sep -> |)).evaluate(1|abc)) AS schema_of_csv(1|abc)#0]
Project [schema_of_csv(1|abc, (sep,|)) AS schema_of_csv(1|abc)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [invoke(SchemaOfJsonEvaluator(Map()).evaluate([{"col":01}])) AS schema_of_json([{"col":01}])#0]
Project [schema_of_json([{"col":01}]) AS schema_of_json([{"col":01}])#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [invoke(SchemaOfJsonEvaluator(Map(allowNumericLeadingZeros -> true)).evaluate([{"col":01}])) AS schema_of_json([{"col":01}])#0]
Project [schema_of_json([{"col":01}], (allowNumericLeadingZeros,true)) AS schema_of_json([{"col":01}])#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [tryeval(null) AS try_to_binary(g, format)#0]
Project [tryeval(to_binary(g#0, Some(format), true)) AS try_to_binary(g, format)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.expressions.{Expression, RuntimeReplaceable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.RUNTIME_REPLACEABLE

/**
* Prototype: materializes any [[RuntimeReplaceable]] that survived the logical optimizer into its
* `replacement`, on the physical plan, right before whole-stage codegen.
*
* In this prototype `RuntimeReplaceable` is intentionally left in the plan by the optimizer (scalar
* `ReplaceExpressions` is disabled) so that a native engine (Photon) could match the semantic
* expression directly. This rule then materializes the replacement for the Spark execution path, so
* Spark codegen/interpreted evaluation behaves exactly as today. Placed after the columnar/native
* conversion and before `CollapseCodegenStages`, so a native engine sees the origin while Spark sees
* the replacement.
*/
object MaterializeRuntimeReplaceable extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = plan.transformUpWithSubqueries {
case p if p.expressions.exists(_.containsPattern(RUNTIME_REPLACEABLE)) =>
p.mapExpressions(replace)
}

private def replace(e: Expression): Expression = e match {
case r: RuntimeReplaceable => replace(r.replacement)
case _ => e.mapChildren(replace)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,10 @@ object QueryExecution {
DisableUnnecessaryBucketedScan,
ApplyColumnarRulesAndInsertTransitions(
sparkSession.sessionState.columnarRules, outputsColumnar = false),
// Prototype: materialize any RuntimeReplaceable that survived the optimizer into its
// replacement for the Spark execution path. After columnar/native conversion (so a native
// engine sees the origin), before codegen (so Spark codegen never sees a RuntimeReplaceable).
MaterializeRuntimeReplaceable,
CollapseCodegenStages()) ++
(if (subquery) {
Nil
Expand Down