diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java index 5343683bb801..fc235b1f0de5 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java @@ -143,13 +143,43 @@ private static ByteBuffer readBinary(ColumnIterator column) { } private static class VariantMetadataReader extends PrimitiveReader { + // Caches the last parsed metadata for adjacent rows with identical bytes. + private byte[] lastMetadataBytes; + private VariantMetadata cachedMetadata; + private VariantMetadataReader(ColumnDescriptor desc) { super(desc); } @Override public VariantMetadata read(VariantMetadata reuse) { - return Variants.metadata(readBinary(column)); + ByteBuffer data = column.nextBinary().toByteBuffer(); + int length = data.remaining(); + + if (cachedMetadata != null + && lastMetadataBytes != null + && lastMetadataBytes.length == length + && bufferEquals(data, lastMetadataBytes)) { + return cachedMetadata; + } + + byte[] bytes = new byte[length]; + data.get(bytes, 0, length); + VariantMetadata parsed = + Variants.metadata(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN)); + this.lastMetadataBytes = bytes; + this.cachedMetadata = parsed; + return parsed; + } + + private static boolean bufferEquals(ByteBuffer buffer, byte[] expected) { + int pos = buffer.position(); + for (int i = 0; i < expected.length; i++) { + if (buffer.get(pos + i) != expected[i]) { + return false; + } + } + return true; } } diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java index eb917732aa3a..3185a9852644 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java @@ -1349,6 +1349,115 @@ public void testArrayWithElementValueTypedValueConflict() { .hasMessageContaining("Invalid variant, conflicting value and typed_value"); } + @Test + public void testMetadataCacheHits() throws IOException { + GroupType variantType = variant("var", 2); + MessageType parquetSchema = parquetSchema(variantType); + + List records = Lists.newArrayList(); + for (int i = 0; i < 50; i++) { + GenericRecord variant = + record( + variantType, + Map.of( + "metadata", TEST_METADATA_BUFFER.duplicate(), + "value", serialize(Variants.of(i)))); + records.add(record(parquetSchema, Map.of("id", i, "var", variant))); + } + + List results = writeAndRead(parquetSchema, records); + assertThat(results).hasSize(50); + VariantMetadata cached = ((Variant) results.get(0).getField("var")).metadata(); + for (int i = 0; i < 50; i++) { + Record actual = results.get(i); + assertThat(actual.getField("id")).isEqualTo(i); + Variant actualVariant = (Variant) actual.getField("var"); + assertThat(actualVariant.metadata()) + .as("row %d must reuse the cached metadata instance", i) + .isSameAs(cached); + VariantTestUtil.assertEqual(Variants.of(i), actualVariant.value()); + } + } + + @Test + public void testMetadataCacheInvalidatesByLength() throws IOException { + ByteBuffer metaA = VariantTestUtil.createMetadata(ImmutableList.of("x", "y"), true); + ByteBuffer metaB = VariantTestUtil.createMetadata(ImmutableList.of("p", "q", "r"), true); + + GroupType variantType = variant("var", 2); + MessageType parquetSchema = parquetSchema(variantType); + + List records = Lists.newArrayList(); + for (int i = 0; i < 15; i++) { + ByteBuffer meta = (i / 5) % 2 == 0 ? metaA.duplicate() : metaB.duplicate(); + GenericRecord variant = + record(variantType, Map.of("metadata", meta, "value", serialize(Variants.of(i)))); + records.add(record(parquetSchema, Map.of("id", i, "var", variant))); + } + + List results = writeAndRead(parquetSchema, records); + assertThat(results).hasSize(15); + VariantMetadata m0 = ((Variant) results.get(0).getField("var")).metadata(); + VariantMetadata m5 = ((Variant) results.get(5).getField("var")).metadata(); + VariantMetadata m10 = ((Variant) results.get(10).getField("var")).metadata(); + assertThat(m5) + .as("metadata at row 5 must be a fresh instance after length change") + .isNotSameAs(m0); + assertThat(m10) + .as("metadata at row 10 must be a fresh instance after length change") + .isNotSameAs(m5); + for (int i = 0; i < 15; i++) { + Record actual = results.get(i); + assertThat(actual.getField("id")).isEqualTo(i); + Variant actualVariant = (Variant) actual.getField("var"); + VariantMetadata cached = i < 5 ? m0 : i < 10 ? m5 : m10; + assertThat(actualVariant.metadata()) + .as("row %d must reuse its group's cached metadata instance", i) + .isSameAs(cached); + VariantTestUtil.assertEqual(Variants.of(i), actualVariant.value()); + } + } + + @Test + public void testMetadataCacheInvalidatesByBytes() throws IOException { + ByteBuffer metaA = VariantTestUtil.createMetadata(ImmutableList.of("x", "y"), true); + ByteBuffer metaC = VariantTestUtil.createMetadata(ImmutableList.of("a", "b"), true); + assertThat(metaA.remaining()).isEqualTo(metaC.remaining()); + + GroupType variantType = variant("var", 2); + MessageType parquetSchema = parquetSchema(variantType); + + List records = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + ByteBuffer meta = (i / 2) % 2 == 0 ? metaA.duplicate() : metaC.duplicate(); + GenericRecord variant = + record(variantType, Map.of("metadata", meta, "value", serialize(Variants.of(i)))); + records.add(record(parquetSchema, Map.of("id", i, "var", variant))); + } + + List results = writeAndRead(parquetSchema, records); + assertThat(results).hasSize(10); + for (int i = 0; i < 10; i++) { + Record actual = results.get(i); + assertThat(actual.getField("id")).isEqualTo(i); + Variant actualVariant = (Variant) actual.getField("var"); + VariantMetadata pairLeader = ((Variant) results.get((i / 2) * 2).getField("var")).metadata(); + assertThat(actualVariant.metadata()) + .as("row %d must reuse its pair leader's cached metadata instance", i) + .isSameAs(pairLeader); + VariantTestUtil.assertEqual(Variants.of(i), actualVariant.value()); + } + VariantMetadata p0 = ((Variant) results.get(0).getField("var")).metadata(); + VariantMetadata p2 = ((Variant) results.get(2).getField("var")).metadata(); + VariantMetadata p4 = ((Variant) results.get(4).getField("var")).metadata(); + assertThat(p2) + .as("pair leader at row 2 must be a fresh instance after byte change") + .isNotSameAs(p0); + assertThat(p4) + .as("pair leader at row 4 must be a fresh instance after byte change") + .isNotSameAs(p2); + } + private static ByteBuffer serialize(VariantValue value) { ByteBuffer buffer = ByteBuffer.allocate(value.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); value.writeTo(buffer, 0);