Skip to content

[SPARK-57352][SDP] Detect resolved table references in pipeline flow lineage#56578

Open
shrirangmhalgi wants to merge 1 commit into
apache:masterfrom
shrirangmhalgi:SPARK-57352-pipeline-lineage-spark-table
Open

[SPARK-57352][SDP] Detect resolved table references in pipeline flow lineage#56578
shrirangmhalgi wants to merge 1 commit into
apache:masterfrom
shrirangmhalgi:SPARK-57352-pipeline-lineage-spark-table

Conversation

@shrirangmhalgi

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

After FlowAnalysis.analyze resolves the plan via transformWithSubqueries, scan for LogicalRelation / DataSourceV2Relation / HiveTableRelation nodes whose table name matches a known pipeline input. Record these as externalInputs so the DAG scheduler correctly orders execution.

Why are the changes needed?

FlowAnalysis.analyze only intercepts UnresolvedRelation nodes when building the pipeline dependency graph. If a flow's plan contains a pre-resolved table reference (e.g., from spark.table() in classic mode or a plan that was analyzed before reaching FlowAnalysis), the dependency is invisible to the scheduler. This causes downstream flows to execute before their upstream dependencies, resulting in either TABLE_OR_VIEW_NOT_FOUND errors or stale data reads.

Does this PR introduce any user-facing change?

Yes. Pipeline flows that reference upstream datasets via spark.table() now correctly infer the dependency, ensuring correct execution order.

How was this patch tested?

New SparkTableLineageSuite with two tests:

  • Resolved table reference (LogicalRelation) detected as pipeline input (was invisible before fix)
  • Control test confirming readFlowFunc (the UnresolvedRelation path) continues to work
  • Full pipeline test suite passes (378 tests, 0 failures).

Was this patch authored or co-authored using generative AI tooling?

Yes. Authored using Claude Opus 4.6.

…lineage

FlowAnalysis.analyze only intercepts UnresolvedRelation nodes when building the
pipeline DAG. If a flow references a pipeline-internal dataset via a pre-resolved
plan (e.g., spark.table() in classic mode, or a plan that was analyzed before
reaching FlowAnalysis), the dependency is invisible to the scheduler.

After the existing transformWithSubqueries pass, scan the resolved plan for
LogicalRelation / DataSourceV2Relation / HiveTableRelation nodes whose table
name matches a known pipeline input. Record these as externalInputs so the DAG
scheduler correctly orders execution.

Added SparkTableLineageSuite with:
- Test proving resolved table references are now captured
- Control test confirming readFlowFunc continues to work

@yadavay-amzn yadavay-amzn left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assessment: The gap this targets looks real — FlowAnalysis.analyze only intercepts UnresolvedRelation when building the dependency graph, so a pre-resolved table reference (e.g. a spark.table() plan that's already analyzed) carries no UnresolvedRelation and the dependency is invisible to the scheduler. Good thing to fix.

The main thing I'd want to resolve before this lands: I think the detected dependency is being recorded in a field the scheduler doesn't actually use for ordering, so the fix may not change execution order yet. The new code adds the match to context.externalInputsFlowFunctionResult.usedExternalInputs, but the DAG dependency set is ResolvedFlow.inputs = (batchInputs ++ streamingInputs), and every graph consumer I traced (GraphOperations.upstreamFlows, GraphValidations, DatasetManager, CoreDataflowNodeProcessor) reads .inputs, not usedExternalInputs. externalInputs is also semantically "tables outside the pipeline." So for a matched pipeline dataset, routing it through the internal-input path (so it lands in batchInputs/streamingInputs) seems closer to what the scheduler consumes. Details inline — and I may be missing how you saw it pick up the ordering, so happy to pair.

Two more I'd flag, both inline:

  • The match is on unqualified table name only, which can pull in a same-named external table from a different catalog/db.
  • The SparkTableLineageSuite assertion checks externalInputs.nonEmpty || silverInputs.nonEmpty; since the fix populates externalInputs, it passes without confirming bronze_real becomes an actual scheduling edge (silverFlow.inputs / upstreamFlows). Asserting on the DAG edge would tie the test to the user-visible behavior.

Plus a few small items inline (subquery-aware traversal, imports, namespace handling, the dead UnresolvedRelation arm).

What I verified

  1. Dependency set: Flow.scalaResolvedFlow.inputs = funcResult.inputs = (batchInputs ++ streamingInputs).map(_.input.identifier); usedExternalInputs is a separate field.
  2. Consumers: traced GraphOperations.upstreamFlows, GraphValidations (topo order), DatasetManager (materialization deps), CoreDataflowNodeProcessor — all order/validate via .inputs.
  3. externalInputs writers today: only readExternalBatchInput / readExternalStreamInput (the non-pipeline paths), consistent with "outside the pipeline" semantics.
  4. Did not build/run locally — conclusions are from code-tracing the head revision.

}
}

result

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this records the dependency in a field the scheduler doesn't read for ordering, so the matched table may not end up affecting execution order. Tracing it through: context.externalInputs becomes FlowFunctionResult.usedExternalInputs, but the DAG dependency set is ResolvedFlow.inputs, which (in Flow.scala) is (batchInputs ++ streamingInputs).map(_.input.identifier)usedExternalInputs isn't included. The consumers that order/validate the graph all read .inputs: GraphOperations.upstreamFlows, GraphValidations (topo order), DatasetManager (materialization deps), and CoreDataflowNodeProcessor.

externalInputs also semantically means "tables outside the pipeline" — it's only written today by readExternalBatchInput/readExternalStreamInput, which are precisely the non-pipeline paths. So when the matched relation is a pipeline dataset, I think we'd want it to flow through the internal-input path (landing in batchInputs/streamingInputs, which is what .inputs and the DAG consume) rather than externalInputs. Otherwise the ordering the PR is targeting may not change. Could you double-check this against how you saw the scheduler pick it up? Happy to pair on it.

Comment on lines +157 to +160
!context.streamingInputs.exists(_.input.identifier.table == tableId.table)) {
context.externalInputs += tableId
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The match is on tableId.table only (unqualified name), so a genuinely-external table that happens to share an unqualified name with a pipeline dataset in a different catalog/database would also match here and get recorded. The comment just above says "allowing for different catalog/database qualifications" — in a multi-catalog pipeline, name-only matching could introduce a dependency on the wrong table. Would it be worth qualifying the comparison by catalog + database (the TableIdentifier carries them) so only the intended dataset matches?

Comment on lines +137 to +139
case r: org.apache.spark.sql.catalyst.analysis.UnresolvedRelation =>
// Already handled above
case node =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few smaller things on this detection block:

  • resolvedPlan.foreach doesn't descend into subquery expressions, whereas the resolver above uses transformWithSubqueries. A resolved table reference inside a subquery (e.g. WHERE x IN (SELECT ... FROM pipeline_tbl)) would be missed. If the goal is parity with the UnresolvedRelation path, a subquery-aware traversal would match it.
  • The relation types are referenced by fully-qualified name inline (org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation, etc.) — could add imports to match the file's style.
  • The case r: UnresolvedRelation => // Already handled arm: by this point the plan is resolved, so the pipeline's UnresolvedRelations are already gone — this arm looks like it never fires.

Comment on lines +146 to +147
case r: org.apache.spark.sql.execution.datasources.LogicalRelation =>
r.catalogTable.map(_.identifier)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option(id.namespace()).filter(_.nonEmpty).map(_.last) takes only the last namespace element as the database and drops the catalog (and any earlier namespace levels). For a multi-level / 3-part identifier this loses the catalog, which feeds into the name-only matching concern below. If we end up qualifying the match by catalog+db, we'd want to preserve the full namespace here too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants