From 2a6b38a9963f68349ee77354b904f1619380106b Mon Sep 17 00:00:00 2001 From: Neelesh Salian Date: Tue, 16 Jun 2026 17:34:49 -0700 Subject: [PATCH 1/2] Parquet: Cache adjacent identical metadata in variant reader --- .../parquet/ParquetVariantReaders.java | 32 ++++++- .../iceberg/parquet/TestVariantReaders.java | 89 +++++++++++++++++++ 2 files changed, 120 insertions(+), 1 deletion(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java index 5343683bb801..fc235b1f0de5 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java @@ -143,13 +143,43 @@ private static ByteBuffer readBinary(ColumnIterator column) { } private static class VariantMetadataReader extends PrimitiveReader { + // Caches the last parsed metadata for adjacent rows with identical bytes. + private byte[] lastMetadataBytes; + private VariantMetadata cachedMetadata; + private VariantMetadataReader(ColumnDescriptor desc) { super(desc); } @Override public VariantMetadata read(VariantMetadata reuse) { - return Variants.metadata(readBinary(column)); + ByteBuffer data = column.nextBinary().toByteBuffer(); + int length = data.remaining(); + + if (cachedMetadata != null + && lastMetadataBytes != null + && lastMetadataBytes.length == length + && bufferEquals(data, lastMetadataBytes)) { + return cachedMetadata; + } + + byte[] bytes = new byte[length]; + data.get(bytes, 0, length); + VariantMetadata parsed = + Variants.metadata(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN)); + this.lastMetadataBytes = bytes; + this.cachedMetadata = parsed; + return parsed; + } + + private static boolean bufferEquals(ByteBuffer buffer, byte[] expected) { + int pos = buffer.position(); + for (int i = 0; i < expected.length; i++) { + if (buffer.get(pos + i) != expected[i]) { + return false; + } + } + return true; } } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java index eb917732aa3a..2e4b07e7c422 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java @@ -1349,6 +1349,95 @@ public void testArrayWithElementValueTypedValueConflict() { .hasMessageContaining("Invalid variant, conflicting value and typed_value"); } + @Test + public void testMetadataCachingWithIdenticalRows() throws IOException { + GroupType variantType = variant("var", 2); + MessageType parquetSchema = parquetSchema(variantType); + + List records = Lists.newArrayList(); + for (int i = 0; i < 50; i++) { + GenericRecord variant = + record( + variantType, + Map.of( + "metadata", TEST_METADATA_BUFFER.duplicate(), + "value", serialize(Variants.of(i)))); + records.add(record(parquetSchema, Map.of("id", i, "var", variant))); + } + + List results = writeAndRead(parquetSchema, records); + assertThat(results).hasSize(50); + for (int i = 0; i < 50; i++) { + Record actual = results.get(i); + assertThat(actual.getField("id")).isEqualTo(i); + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(TEST_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(Variants.of(i), actualVariant.value()); + } + } + + @Test + public void testMetadataCachingWithMixedMetadata() throws IOException { + ByteBuffer metaA = VariantTestUtil.createMetadata(ImmutableList.of("x", "y"), true); + ByteBuffer metaB = VariantTestUtil.createMetadata(ImmutableList.of("p", "q", "r"), true); + VariantMetadata expectedA = Variants.metadata(metaA); + VariantMetadata expectedB = Variants.metadata(metaB); + + GroupType variantType = variant("var", 2); + MessageType parquetSchema = parquetSchema(variantType); + + List records = Lists.newArrayList(); + for (int i = 0; i < 15; i++) { + ByteBuffer meta = (i / 5) % 2 == 0 ? metaA.duplicate() : metaB.duplicate(); + GenericRecord variant = + record(variantType, Map.of("metadata", meta, "value", serialize(Variants.of(i)))); + records.add(record(parquetSchema, Map.of("id", i, "var", variant))); + } + + List results = writeAndRead(parquetSchema, records); + assertThat(results).hasSize(15); + for (int i = 0; i < 15; i++) { + Record actual = results.get(i); + assertThat(actual.getField("id")).isEqualTo(i); + Variant actualVariant = (Variant) actual.getField("var"); + VariantMetadata expectedMeta = (i / 5) % 2 == 0 ? expectedA : expectedB; + VariantTestUtil.assertEqual(expectedMeta, actualVariant.metadata()); + VariantTestUtil.assertEqual(Variants.of(i), actualVariant.value()); + } + } + + @Test + public void testMetadataCachingByteCompare() throws IOException { + ByteBuffer metaA = VariantTestUtil.createMetadata(ImmutableList.of("x", "y"), true); + ByteBuffer metaC = VariantTestUtil.createMetadata(ImmutableList.of("a", "b"), true); + assertThat(metaA.remaining()).isEqualTo(metaC.remaining()); + + VariantMetadata expectedA = Variants.metadata(metaA); + VariantMetadata expectedC = Variants.metadata(metaC); + + GroupType variantType = variant("var", 2); + MessageType parquetSchema = parquetSchema(variantType); + + List records = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + ByteBuffer meta = (i % 2 == 0) ? metaA.duplicate() : metaC.duplicate(); + GenericRecord variant = + record(variantType, Map.of("metadata", meta, "value", serialize(Variants.of(i)))); + records.add(record(parquetSchema, Map.of("id", i, "var", variant))); + } + + List results = writeAndRead(parquetSchema, records); + assertThat(results).hasSize(10); + for (int i = 0; i < 10; i++) { + Record actual = results.get(i); + assertThat(actual.getField("id")).isEqualTo(i); + Variant actualVariant = (Variant) actual.getField("var"); + VariantMetadata expected = (i % 2 == 0) ? expectedA : expectedC; + VariantTestUtil.assertEqual(expected, actualVariant.metadata()); + VariantTestUtil.assertEqual(Variants.of(i), actualVariant.value()); + } + } + private static ByteBuffer serialize(VariantValue value) { ByteBuffer buffer = ByteBuffer.allocate(value.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); value.writeTo(buffer, 0); From 5b81d5ae194cff31f8f001460676eebed21fcf97 Mon Sep 17 00:00:00 2001 From: backport Date: Wed, 17 Jun 2026 20:53:40 -0700 Subject: [PATCH 2/2] Update tests to check for hits and misses --- .../iceberg/parquet/TestVariantReaders.java | 48 +++++++++++++------ 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java index 2e4b07e7c422..3185a9852644 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java @@ -1350,7 +1350,7 @@ public void testArrayWithElementValueTypedValueConflict() { } @Test - public void testMetadataCachingWithIdenticalRows() throws IOException { + public void testMetadataCacheHits() throws IOException { GroupType variantType = variant("var", 2); MessageType parquetSchema = parquetSchema(variantType); @@ -1367,21 +1367,22 @@ public void testMetadataCachingWithIdenticalRows() throws IOException { List results = writeAndRead(parquetSchema, records); assertThat(results).hasSize(50); + VariantMetadata cached = ((Variant) results.get(0).getField("var")).metadata(); for (int i = 0; i < 50; i++) { Record actual = results.get(i); assertThat(actual.getField("id")).isEqualTo(i); Variant actualVariant = (Variant) actual.getField("var"); - VariantTestUtil.assertEqual(TEST_METADATA, actualVariant.metadata()); + assertThat(actualVariant.metadata()) + .as("row %d must reuse the cached metadata instance", i) + .isSameAs(cached); VariantTestUtil.assertEqual(Variants.of(i), actualVariant.value()); } } @Test - public void testMetadataCachingWithMixedMetadata() throws IOException { + public void testMetadataCacheInvalidatesByLength() throws IOException { ByteBuffer metaA = VariantTestUtil.createMetadata(ImmutableList.of("x", "y"), true); ByteBuffer metaB = VariantTestUtil.createMetadata(ImmutableList.of("p", "q", "r"), true); - VariantMetadata expectedA = Variants.metadata(metaA); - VariantMetadata expectedB = Variants.metadata(metaB); GroupType variantType = variant("var", 2); MessageType parquetSchema = parquetSchema(variantType); @@ -1396,31 +1397,39 @@ public void testMetadataCachingWithMixedMetadata() throws IOException { List results = writeAndRead(parquetSchema, records); assertThat(results).hasSize(15); + VariantMetadata m0 = ((Variant) results.get(0).getField("var")).metadata(); + VariantMetadata m5 = ((Variant) results.get(5).getField("var")).metadata(); + VariantMetadata m10 = ((Variant) results.get(10).getField("var")).metadata(); + assertThat(m5) + .as("metadata at row 5 must be a fresh instance after length change") + .isNotSameAs(m0); + assertThat(m10) + .as("metadata at row 10 must be a fresh instance after length change") + .isNotSameAs(m5); for (int i = 0; i < 15; i++) { Record actual = results.get(i); assertThat(actual.getField("id")).isEqualTo(i); Variant actualVariant = (Variant) actual.getField("var"); - VariantMetadata expectedMeta = (i / 5) % 2 == 0 ? expectedA : expectedB; - VariantTestUtil.assertEqual(expectedMeta, actualVariant.metadata()); + VariantMetadata cached = i < 5 ? m0 : i < 10 ? m5 : m10; + assertThat(actualVariant.metadata()) + .as("row %d must reuse its group's cached metadata instance", i) + .isSameAs(cached); VariantTestUtil.assertEqual(Variants.of(i), actualVariant.value()); } } @Test - public void testMetadataCachingByteCompare() throws IOException { + public void testMetadataCacheInvalidatesByBytes() throws IOException { ByteBuffer metaA = VariantTestUtil.createMetadata(ImmutableList.of("x", "y"), true); ByteBuffer metaC = VariantTestUtil.createMetadata(ImmutableList.of("a", "b"), true); assertThat(metaA.remaining()).isEqualTo(metaC.remaining()); - VariantMetadata expectedA = Variants.metadata(metaA); - VariantMetadata expectedC = Variants.metadata(metaC); - GroupType variantType = variant("var", 2); MessageType parquetSchema = parquetSchema(variantType); List records = Lists.newArrayList(); for (int i = 0; i < 10; i++) { - ByteBuffer meta = (i % 2 == 0) ? metaA.duplicate() : metaC.duplicate(); + ByteBuffer meta = (i / 2) % 2 == 0 ? metaA.duplicate() : metaC.duplicate(); GenericRecord variant = record(variantType, Map.of("metadata", meta, "value", serialize(Variants.of(i)))); records.add(record(parquetSchema, Map.of("id", i, "var", variant))); @@ -1432,10 +1441,21 @@ public void testMetadataCachingByteCompare() throws IOException { Record actual = results.get(i); assertThat(actual.getField("id")).isEqualTo(i); Variant actualVariant = (Variant) actual.getField("var"); - VariantMetadata expected = (i % 2 == 0) ? expectedA : expectedC; - VariantTestUtil.assertEqual(expected, actualVariant.metadata()); + VariantMetadata pairLeader = ((Variant) results.get((i / 2) * 2).getField("var")).metadata(); + assertThat(actualVariant.metadata()) + .as("row %d must reuse its pair leader's cached metadata instance", i) + .isSameAs(pairLeader); VariantTestUtil.assertEqual(Variants.of(i), actualVariant.value()); } + VariantMetadata p0 = ((Variant) results.get(0).getField("var")).metadata(); + VariantMetadata p2 = ((Variant) results.get(2).getField("var")).metadata(); + VariantMetadata p4 = ((Variant) results.get(4).getField("var")).metadata(); + assertThat(p2) + .as("pair leader at row 2 must be a fresh instance after byte change") + .isNotSameAs(p0); + assertThat(p4) + .as("pair leader at row 4 must be a fresh instance after byte change") + .isNotSameAs(p2); } private static ByteBuffer serialize(VariantValue value) {