Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,40 @@ object FlowAnalysis {
resolved.mergeTagsFrom(u)
resolved
}
Dataset.ofRows(spark, resolvedPlan)
val result = Dataset.ofRows(spark, resolvedPlan)

// SPARK-57352: Detect inputs that were resolved directly by the user (e.g., via
// spark.table()) rather than through the pipeline's UnresolvedRelation path above.
// Scan the resolved plan for table relations that match known pipeline inputs and
// record them as external inputs so the DAG scheduler orders them correctly.
resolvedPlan.foreach {
case r: org.apache.spark.sql.catalyst.analysis.UnresolvedRelation =>
// Already handled above
case node =>
Comment on lines +137 to +139

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few smaller things on this detection block:

  • resolvedPlan.foreach doesn't descend into subquery expressions, whereas the resolver above uses transformWithSubqueries. A resolved table reference inside a subquery (e.g. WHERE x IN (SELECT ... FROM pipeline_tbl)) would be missed. If the goal is parity with the UnresolvedRelation path, a subquery-aware traversal would match it.
  • The relation types are referenced by fully-qualified name inline (org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation, etc.) — could add imports to match the file's style.
  • The case r: UnresolvedRelation => // Already handled arm: by this point the plan is resolved, so the pipeline's UnresolvedRelations are already gone — this arm looks like it never fires.

val tableIdOpt = node match {
case r: org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation =>
r.identifier.map(id => TableIdentifier(id.name(),
Option(id.namespace()).filter(_.nonEmpty).map(_.last)))
case r: org.apache.spark.sql.catalyst.catalog.HiveTableRelation =>
Some(r.tableMeta.identifier)
case r: org.apache.spark.sql.execution.datasources.LogicalRelation =>
r.catalogTable.map(_.identifier)
Comment on lines +146 to +147

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option(id.namespace()).filter(_.nonEmpty).map(_.last) takes only the last namespace element as the database and drops the catalog (and any earlier namespace levels). For a multi-level / 3-part identifier this loses the catalog, which feeds into the name-only matching concern below. If we end up qualifying the match by catalog+db, we'd want to preserve the full namespace here too.

case _ => None
}
tableIdOpt.foreach { tableId =>
// Match by table name, allowing for different catalog/database qualifications
val matchesInput = context.allInputs.exists(input =>
input.table == tableId.table
)
if (matchesInput &&
!context.batchInputs.exists(_.input.identifier.table == tableId.table) &&
!context.streamingInputs.exists(_.input.identifier.table == tableId.table)) {
context.externalInputs += tableId
}
}
Comment on lines +157 to +160

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The match is on tableId.table only (unqualified name), so a genuinely-external table that happens to share an unqualified name with a pipeline dataset in a different catalog/database would also match here and get recorded. The comment just above says "allowing for different catalog/database qualifications" — in a multi-catalog pipeline, name-only matching could introduce a dependency on the wrong table. Would it be worth qualifying the comparison by catalog + database (the TableIdentifier carries them) so only the intended dataset matches?

}

result

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this records the dependency in a field the scheduler doesn't read for ordering, so the matched table may not end up affecting execution order. Tracing it through: context.externalInputs becomes FlowFunctionResult.usedExternalInputs, but the DAG dependency set is ResolvedFlow.inputs, which (in Flow.scala) is (batchInputs ++ streamingInputs).map(_.input.identifier)usedExternalInputs isn't included. The consumers that order/validate the graph all read .inputs: GraphOperations.upstreamFlows, GraphValidations (topo order), DatasetManager (materialization deps), and CoreDataflowNodeProcessor.

externalInputs also semantically means "tables outside the pipeline" — it's only written today by readExternalBatchInput/readExternalStreamInput, which are precisely the non-pipeline paths. So when the matched relation is a pipeline dataset, I think we'd want it to flow through the internal-input path (landing in batchInputs/streamingInputs, which is what .inputs and the DAG consume) rather than externalInputs. Otherwise the ordering the PR is targeting may not change. Could you double-check this against how you saw the scheduler pick it up? Happy to pair on it.

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.pipelines.graph

import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext}
import org.apache.spark.sql.test.SharedSparkSession

/**
* Reproduces SPARK-57352: When a flow references a pipeline dataset via a resolved
* table relation (e.g., from spark.table() in classic mode), the pipeline engine
* should still infer the dependency.
*/
class SparkTableLineageSuite extends ExecutionTest with SharedSparkSession {

test("SPARK-57352: resolved table reference in plan captures lineage after fix") {
val session = spark
import session.implicits._

// Create a real table in the catalog so spark.table() doesn't throw
Seq(1, 2, 3).toDF("x").write.mode("overwrite").saveAsTable("bronze_real")

try {
val pipelineDef = new TestGraphRegistrationContext(spark) {
// Bronze: defined in the pipeline
registerMaterializedView("bronze_real", query = dfFlowFunc(Seq(1, 2, 3).toDF("x")))

// Silver: reads bronze_real via a RESOLVED plan (simulating spark.table())
// The plan has no UnresolvedRelation -- it's already resolved against the catalog
val resolvedPlan = session.table("bronze_real").queryExecution.analyzed
registerMaterializedView("silver_resolved", query =
FlowAnalysis.createFlowFunctionFromLogicalPlan(resolvedPlan))
}

val graph = pipelineDef.resolveToDataflowGraph()
val silverFlow = graph.flows.find(
f => f.identifier.table.contains("silver_resolved")).get
.asInstanceOf[ResolutionCompletedFlow]

// After fix: the post-resolution scan should detect bronze_real as an input
val silverInputs = silverFlow.funcResult.inputs
val externalInputs = silverFlow.funcResult.usedExternalInputs

// The fix records it as an external input (since it was resolved outside the pipeline)
assert(externalInputs.nonEmpty || silverInputs.nonEmpty,
s"SPARK-57352 NOT FIXED: resolved table 'bronze_real' not captured. " +
s"inputs=$silverInputs, externalInputs=$externalInputs")
} finally {
spark.sql("DROP TABLE IF EXISTS bronze_real")
}
}

test("readFlowFunc correctly captures lineage (control test)") {
val session = spark
import session.implicits._

val pipelineDef = new TestGraphRegistrationContext(spark) {
registerMaterializedView("bronze2", query = dfFlowFunc(Seq(1, 2).toDF("x")))
registerMaterializedView("silver2", query = readFlowFunc("bronze2"))
}

val graph = pipelineDef.resolveToDataflowGraph()
val silverFlow = graph.flows.find(
f => f.identifier.table.contains("silver2")).get
.asInstanceOf[ResolutionCompletedFlow]

val silverInputs = silverFlow.funcResult.inputs
assert(silverInputs.nonEmpty,
s"Control test: readFlowFunc should capture bronze2 as input but got: $silverInputs")
}
}