Skip to content

[SPARK-57499][SQL] Fix column pruning and invalid plans in variant extraction pushdown on DSv2 scans#56556

Open
qlong wants to merge 1 commit into
apache:masterfrom
qlong:SPARK-57499-variant-pushdown-column-pruning
Open

[SPARK-57499][SQL] Fix column pruning and invalid plans in variant extraction pushdown on DSv2 scans#56556
qlong wants to merge 1 commit into
apache:masterfrom
qlong:SPARK-57499-variant-pushdown-column-pruning

Conversation

@qlong

@qlong qlong commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Three fixes in pushVariantExtractions (called byV2ScaqqnRelationPushDown.pushDownVariants):

  1. Guard against double-visit: Add a pushedVariants.isEmpty
    sentinel check so the inner ScanBuilderHolder leaf visit (caused by
    transformDown recursing into the child after returning the plan
    unchanged) returns immediately. This ensures
    builder.pushVariantExtractions is called exactly once per holder.
  2. Eager column pruning: While projectList and filters are in
    scope, call builder.pruneColumns(requiredSchema) for builders
    implementing SupportsPushDownRequiredColumns and trim
    sHolder.output to the required columns. By the time
    buildScanWithPushedVariants calls build(), the builder already
    has the correct pruned schema. This is similiar to how
    buildScanWithPushedAggregate works.
  3. Keep whole-variant reads raw: pushdown is for extractions, not
    whole-variant reads. A bare variant reference -- SELECT v, or a
    column lifted to feed a variant_get above a Join/Sort/Aggregate
    barrier the local rewrite cannot see -- is recorded as fullVariant
    (path $), meaning "the entire value." Shredding that to a lone
    full-variant slot saves no I/O and is mishandled: the Parquet reader
    collapses it to a boolean placeholder, and above a barrier the
    re-exposed GetStructField AS v#orig alias is dropped by
    RemoveRedundantAliases, giving wrong results or an invalid plan. So
    when fullVariant is a variant's only requested field, leave the
    column raw; when it coexists with real extractions (e.g. SELECT v, variant_get(v, '$.a')) the >=2-slot struct is not collapsed and
    keeps its pushdown. This subsumes the join-key case and shreds
    automatically once barrier-aware pushdown makes the variant_get
    visible as a typed path.

Jira: https://issues.apache.org/jira/browse/SPARK-57499

Why are the changes needed?

Three bugs on the accepted variant pushdown path:

Issue 1 — column pruning is skipped. buildScanWithPushedVariants calls
builder.build() and replaces the ScanBuilderHolder with a DataSourceV2ScanRelation.
The subsequent pruneColumns rule matches only ScanBuilderHolder nodes, so it is a
no-op and builder.pruneColumns() is never called. The scan reads the full table schema
including unreferenced columns. For unreferenced VARIANT columns this is especially
costly — each is fully reconstructed from its shredded Parquet tree on every row.

Issue 2 — invalid plan / crash on tables with >=2 VARIANT columns
pushDownVariants uses transformDown, which recurses into the child ScanBuilderHolder after returning the plan unchanged. The bare ScanBuilderHolder matches PhysicalOperation a second time, collecting an unreferenced sibling VARIANT column as a full-variant request and pushing it to the builder again. ParquetScanBuilder overwrites its state on every call, so the second push clobbers the correct extraction from the first. The rewritten scan then emits a fresh ExprId for the variant while the projection still references the original, and binding fails.

This affects any extraction shape — projection, ORDER BY, aggregate, join — on a table with two or more VARIANT columns. Single-VARIANT tables are unaffected.

Reproduce on stock spark-4.1.x (path-based views force DSv2):

SET spark.sql.sources.useV1SourceList = "";

CREATE TABLE t (a INT, v1 VARIANT, v2 VARIANT) USING PARQUET LOCATION '/tmp/vt';
INSERT INTO t VALUES
  (1, parse_json('{"x":1,"price":3,"name":"x"}'), parse_json('{"y":2}')),
  (2, parse_json('{"x":9,"price":1,"name":"z"}'), parse_json('{"y":8}'));
CREATE OR REPLACE TEMPORARY VIEW tv    USING parquet OPTIONS (path '/tmp/vt');
CREATE OR REPLACE TEMPORARY VIEW codes USING parquet OPTIONS (path '/tmp/vt');

-- All four crash with: [INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find v1#NN in [...]
SELECT variant_get(v1, '$.x', 'int') FROM tv;        
SELECT variant_get(v1,'$.name','string') AS nm
  FROM tv ORDER BY variant_get(v1,'$.price','int');                   
SELECT max(variant_get(v1, '$.price', 'int')) FROM tv;                     
SELECT l.a FROM tv l JOIN codes r
  ON variant_get(l.v1,'$.x','int') = variant_get(r.v1,'$.x','int');  

[INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find v1#21 in [a#33,v1#34,v2#35]. SQLSTATE: XX000
  at org.apache.spark.sql.catalyst.expressions.BindReferences$.attributeNotFoundException(BoundAttribute.scala:109)
  ...

Issue 3 -- wrong results / crash when a whole-variant read is shredded. A bare
variant reference (a plain SELECT v, or a column lifted to feed a variant_get
above a Join/Sort/Aggregate barrier the local rewrite cannot see) is recorded as a
full-variant request (path "$"). Shredding it to a lone full-variant slot is both
useless (the whole value is read regardless) and mishandled:

  • The Parquet reader collapses a lone VariantType slot to a boolean placeholder,
    so ORDER BY variant_get(v, '$.price') sorts on the placeholder and silently
    returns the wrong order, and max(variant_get(v, '$.price')) fails to codegen.
  • A join key is re-exposed above the join as GetStructField(v_new, i) AS v#orig;
    RemoveRedundantAliases collapses the alias and the condition references a
    dropped ExprId, failing plan validation.

Does this PR introduce any user-facing change?

No

How was this patch tested?

  • Added new unit tests
  • manual testing with spark-sql

Was this patch authored or co-authored using generative AI tooling?

Co-authored with Claude code (Sonnet 4.6)

@qlong qlong changed the title [SPARK-57499][SQL] Variant extraction pushdown bypasses column pruning on DSvs scans [SPARK-57499][SQL] Variant extraction pushdown bypasses column pruning on DSvs2 scans Jun 17, 2026
@qlong qlong force-pushed the SPARK-57499-variant-pushdown-column-pruning branch from 5abbc88 to 34f3787 Compare June 17, 2026 16:16
@cloud-fan

Copy link
Copy Markdown
Contributor

can you fix merge conflicts?

@qlong qlong force-pushed the SPARK-57499-variant-pushdown-column-pruning branch from 34f3787 to 0a76001 Compare June 18, 2026 01:37
@qlong

qlong commented Jun 18, 2026

Copy link
Copy Markdown
Contributor Author

@cloud-fan rebased. I will keep an eye on the CI build

…g on DSv2 scans

Three fixes in `pushVariantExtractions` (called by
`V2ScaqqnRelationPushDown.pushDownVariants`):

1. **Guard against double-visit**: Add a `pushedVariants.isEmpty`
   sentinel check so the inner `ScanBuilderHolder` leaf visit (caused by
   `transformDown` recursing into the child after returning the plan
   unchanged) returns immediately. This ensures
   `builder.pushVariantExtractions` is called exactly once per holder.

2. **Eager column pruning**: While `projectList` and `filters` are in
   scope, call `builder.pruneColumns(requiredSchema)` for builders
   implementing `SupportsPushDownRequiredColumns` and trim
   `sHolder.output` to the required columns. By the time
   `buildScanWithPushedVariants` calls `build()`, the builder already
   has the correct pruned schema. This is similiar to how
   buildScanWithPushedAggregate works.

3. **Keep whole-variant reads raw**: pushdown is for extractions, not
   whole-variant reads. A bare variant reference -- `SELECT v`, or a
   column lifted to feed a `variant_get` above a Join/Sort/Aggregate
   barrier the local rewrite cannot see -- is recorded as `fullVariant`
   (path `$`), meaning "the entire value." Shredding that to a lone
   full-variant slot saves no I/O and is mishandled: the Parquet reader
   collapses it to a boolean placeholder, and above a barrier the
   re-exposed `GetStructField AS v#orig` alias is dropped by
   `RemoveRedundantAliases`, giving wrong results or an invalid plan. So
   when fullVariant is a variant's only requested field, leave the
   column raw; when it coexists with real extractions (e.g. `SELECT v,
   variant_get(v, '$.a')`) the >=2-slot struct is not collapsed and
   keeps its pushdown. This subsumes the join-key case and shreds
   automatically once barrier-aware pushdown makes the `variant_get`
   visible as a typed path.

Jira: https://issues.apache.org/jira/browse/SPARK-57499
@qlong qlong force-pushed the SPARK-57499-variant-pushdown-column-pruning branch from 0a76001 to 81c15e5 Compare June 18, 2026 05:40
@qlong qlong changed the title [SPARK-57499][SQL] Variant extraction pushdown bypasses column pruning on DSvs2 scans [SPARK-57499][SQL] Fix column pruning and invalid plans in variant extraction pushdown on DSv2 scans Jun 18, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants