Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,43 @@ private static ByteBuffer readBinary(ColumnIterator<?> column) {
}

private static class VariantMetadataReader extends PrimitiveReader<VariantMetadata> {
// Caches the last parsed metadata for adjacent rows with identical bytes.
private byte[] lastMetadataBytes;
private VariantMetadata cachedMetadata;

private VariantMetadataReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public VariantMetadata read(VariantMetadata reuse) {
return Variants.metadata(readBinary(column));
ByteBuffer data = column.nextBinary().toByteBuffer();
int length = data.remaining();

if (cachedMetadata != null
&& lastMetadataBytes != null
&& lastMetadataBytes.length == length
&& bufferEquals(data, lastMetadataBytes)) {
return cachedMetadata;
}

byte[] bytes = new byte[length];
data.get(bytes, 0, length);
VariantMetadata parsed =
Variants.metadata(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN));
this.lastMetadataBytes = bytes;
this.cachedMetadata = parsed;
return parsed;
}

private static boolean bufferEquals(ByteBuffer buffer, byte[] expected) {
int pos = buffer.position();
for (int i = 0; i < expected.length; i++) {
if (buffer.get(pos + i) != expected[i]) {
return false;
}
}
return true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,115 @@ public void testArrayWithElementValueTypedValueConflict() {
.hasMessageContaining("Invalid variant, conflicting value and typed_value");
}

@Test
public void testMetadataCacheHits() throws IOException {
GroupType variantType = variant("var", 2);
MessageType parquetSchema = parquetSchema(variantType);

List<GenericRecord> records = Lists.newArrayList();
for (int i = 0; i < 50; i++) {
GenericRecord variant =
record(
variantType,
Map.of(
"metadata", TEST_METADATA_BUFFER.duplicate(),
"value", serialize(Variants.of(i))));
records.add(record(parquetSchema, Map.of("id", i, "var", variant)));
}

List<Record> results = writeAndRead(parquetSchema, records);
assertThat(results).hasSize(50);
VariantMetadata cached = ((Variant) results.get(0).getField("var")).metadata();
for (int i = 0; i < 50; i++) {
Record actual = results.get(i);
assertThat(actual.getField("id")).isEqualTo(i);
Variant actualVariant = (Variant) actual.getField("var");
assertThat(actualVariant.metadata())
.as("row %d must reuse the cached metadata instance", i)
.isSameAs(cached);
VariantTestUtil.assertEqual(Variants.of(i), actualVariant.value());
}
}

@Test
public void testMetadataCacheInvalidatesByLength() throws IOException {
ByteBuffer metaA = VariantTestUtil.createMetadata(ImmutableList.of("x", "y"), true);
ByteBuffer metaB = VariantTestUtil.createMetadata(ImmutableList.of("p", "q", "r"), true);

GroupType variantType = variant("var", 2);
MessageType parquetSchema = parquetSchema(variantType);

List<GenericRecord> records = Lists.newArrayList();
for (int i = 0; i < 15; i++) {
ByteBuffer meta = (i / 5) % 2 == 0 ? metaA.duplicate() : metaB.duplicate();
GenericRecord variant =
record(variantType, Map.of("metadata", meta, "value", serialize(Variants.of(i))));
records.add(record(parquetSchema, Map.of("id", i, "var", variant)));
}

List<Record> results = writeAndRead(parquetSchema, records);
assertThat(results).hasSize(15);
VariantMetadata m0 = ((Variant) results.get(0).getField("var")).metadata();
VariantMetadata m5 = ((Variant) results.get(5).getField("var")).metadata();
VariantMetadata m10 = ((Variant) results.get(10).getField("var")).metadata();
assertThat(m5)
.as("metadata at row 5 must be a fresh instance after length change")
.isNotSameAs(m0);
assertThat(m10)
.as("metadata at row 10 must be a fresh instance after length change")
.isNotSameAs(m5);
for (int i = 0; i < 15; i++) {
Record actual = results.get(i);
assertThat(actual.getField("id")).isEqualTo(i);
Variant actualVariant = (Variant) actual.getField("var");
VariantMetadata cached = i < 5 ? m0 : i < 10 ? m5 : m10;
assertThat(actualVariant.metadata())
.as("row %d must reuse its group's cached metadata instance", i)
.isSameAs(cached);
VariantTestUtil.assertEqual(Variants.of(i), actualVariant.value());
}
}

@Test
public void testMetadataCacheInvalidatesByBytes() throws IOException {
ByteBuffer metaA = VariantTestUtil.createMetadata(ImmutableList.of("x", "y"), true);
ByteBuffer metaC = VariantTestUtil.createMetadata(ImmutableList.of("a", "b"), true);
assertThat(metaA.remaining()).isEqualTo(metaC.remaining());

GroupType variantType = variant("var", 2);
MessageType parquetSchema = parquetSchema(variantType);

List<GenericRecord> records = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
ByteBuffer meta = (i / 2) % 2 == 0 ? metaA.duplicate() : metaC.duplicate();
GenericRecord variant =
record(variantType, Map.of("metadata", meta, "value", serialize(Variants.of(i))));
records.add(record(parquetSchema, Map.of("id", i, "var", variant)));
}

List<Record> results = writeAndRead(parquetSchema, records);
assertThat(results).hasSize(10);
for (int i = 0; i < 10; i++) {
Record actual = results.get(i);
assertThat(actual.getField("id")).isEqualTo(i);
Variant actualVariant = (Variant) actual.getField("var");
VariantMetadata pairLeader = ((Variant) results.get((i / 2) * 2).getField("var")).metadata();
assertThat(actualVariant.metadata())
.as("row %d must reuse its pair leader's cached metadata instance", i)
.isSameAs(pairLeader);
VariantTestUtil.assertEqual(Variants.of(i), actualVariant.value());
}
VariantMetadata p0 = ((Variant) results.get(0).getField("var")).metadata();
VariantMetadata p2 = ((Variant) results.get(2).getField("var")).metadata();
VariantMetadata p4 = ((Variant) results.get(4).getField("var")).metadata();
assertThat(p2)
.as("pair leader at row 2 must be a fresh instance after byte change")
.isNotSameAs(p0);
assertThat(p4)
.as("pair leader at row 4 must be a fresh instance after byte change")
.isNotSameAs(p2);
}

private static ByteBuffer serialize(VariantValue value) {
ByteBuffer buffer = ByteBuffer.allocate(value.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN);
value.writeTo(buffer, 0);
Expand Down