Spark: Add selective shredded variant extraction Parquet readers#16714
Spark: Add selective shredded variant extraction Parquet readers#16714qlong wants to merge 2 commits into
Conversation
dde0bbe to
8fdff7c
Compare
- Add selective Parquet readers (ParquetVariantExtractionReaders, VariantExtractionPathResolver) to read only shredded typed_value columns for requested extraction paths. - Add Spark row reader adapter (SparkVariantExtractionReaders, SparkParquetReaders) to materialize extraction slots from the engine read schema instead of full variant blobs. - Wire engine read schema from SparkBatch through SparkInputPartition to RowDataReader only (row Parquet path). - Update PruneColumnsWithoutReordering so annotated extraction structs map back to Iceberg VARIANT columns in the scan projection. Issue: apache#16726
8fdff7c to
2185f2a
Compare
|
@rdblue @steveloughran @nssalian PTAL when you get a chance. |
| } | ||
|
|
||
| @SuppressWarnings("checkstyle:CyclomaticComplexity") | ||
| private static Object toSparkValue(VariantValue value, DataType targetType) { |
There was a problem hiding this comment.
I may be missing something, but this looks like it overlaps with Spark's variant_get casting logic. toSparkValue handles the common cases, but it seems separate from Spark's existing behavior around failOnError, timeZoneId, and some cast edge cases.
Would it make sense to reuse Spark's cast path here if we can bridge from Iceberg's VariantValue to Spark's Variant / VariantVal?
There was a problem hiding this comment.
Thanks for review. I assume you were referring VariantGet.cast in spark. toSparkValue in the connector is required according to DSV2 extraction pushdown contract. When Spark pushes extraction down, it delegates extraction and cast to the connector, the engine no longer calls VarianGett.cast one the values returned from connector. The bridge from iceberg's VariantValue to spark's Variant already exists, it is triggered when extaction pushdown was rejected and connector returns the whole variant. It is expensive for shredded typed value, as they are put back into VariantValue then immediately extraced by Spark again.
The cast logic in Spark is more general than needed here, it handles cross-type coercisons that do not apply to typed_value, with the exception of type narrowing overflow. I added a fix for that.
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Streams; | ||
|
|
||
| public class PathUtil { |
There was a problem hiding this comment.
If I am not mistaken, this utility is only used for variant extraction/shredding paths. Should we rename it to something like VariantPathUtil or VariantPath so it is clear this is not a general Iceberg path utility?
There was a problem hiding this comment.
You are correct, agree that VariantPathUtil is a better name. I am going to defer the change for now, since this file is copied from #15384 to avoid stacked PRs. There are other in-fligh PRs that also copy this file. I will put up a follow up PR to rename once they are merged.
| if (segment instanceof PathSegment.Name) { | ||
| parts.add(((PathSegment.Name) segment).name()); | ||
| } else if (segment instanceof PathSegment.Index) { | ||
| parts.add("[" + ((PathSegment.Index) segment).index() + "]"); |
There was a problem hiding this comment.
I think this loses some path semantics. PathUtil.parse distinguishes object names from array indexes, but parseObjectPath flattens both into string parts. For example, $[0] means array element 0, while $['[0]'] means an object field whose name is literally [0]; after flattening, both are represented as the same string segment "[0]".
There was a problem hiding this comment.
Good call out. I removed parseObjectPath, and pass List through the Parquet extraction readers. Added tests
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Streams; | ||
|
|
||
| public class PathUtil { |
There was a problem hiding this comment.
Slightly broader question: would it make sense to use an existing JSONPath parser here and then validate that the parsed path is within Iceberg's supported subset? This is probably the fourth project where I have seen a new VariantPath-style implementation over the past year, so I am a bit worried about adding another one unless we keep the scope very explicit.
For example, we could allow simple/singular paths like field access and array indexes, while rejecting wildcards, recursive descent, slices, and filter expressions for now.
There was a problem hiding this comment.
My understanding is iceberg has strict dependency hygiene, adding new lib would require review and could add transtive dependences. The parser from #15384 is intentionlly minimal.. We probably should look into dedicated lib if we need to support wildcard, filter expressions.
a9e7ef1 to
4de911e
Compare
Address review feedback: - Remove parseObjectPath and related PathUtil string helpers so PathUtil stays aligned with the companion pushdown branch. - Pass List<PathSegment> through the Parquet extraction readers, path resolver, and Spark wiring to preserve array index vs object fieldq semantics during navigation. - Return null if the target type is narrower than the value type and overflows.
4de911e to
4227e75
Compare
Changes
This PR is part of the work to support variant extraction pushdown, the core change is to introduce new parquet readers to read selected variant paths instead of the whole variant:
Issue: #16726
Note for reviewers
End to end testing
Requires #16715 for end-to-end testing. To try the full pushdown + selective read path without merging locally, use this branch:
https://github.com/qlong/iceberg/tree/variant-extraction-integration-test
Test results
Use 1-day Github activities data, ingested as json and shredded variants with 299 shreddred columns.
Baseline:
gha-payload-iceberg-20260605· variant + extraction pushdown ON + selective shredded variant extraction Parquet readersCompare A: same run with payload stored as
string_jsonCompare B:
gha-payload-iceberg-nopushed-20260605· variant + pushdown OFF, read whole variantMedian of 3 timed runs per query (Spark
Time taken:).Co-authored with Claude Sonnet 4.6