From d8cdbd08a531fbad376ef04cc31e818c9b726497 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 7 Oct 2025 11:31:42 -0700 Subject: [PATCH 1/6] Spark 4.0: Add variant round trip test for Spark --- .../spark/sql/TestSparkVariantRead.java | 154 ++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java new file mode 100644 index 000000000000..d22f476136d5 --- /dev/null +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assumptions.assumeFalse; + +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.iceberg.spark.TestBase; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.types.variant.Variant; +import org.apache.spark.unsafe.types.VariantVal; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestSparkVariantRead extends TestBase { + + private static final String CATALOG = "local"; + private static final String TABLE = CATALOG + ".default.var"; + + @BeforeEach + public void setupCatalog() { + // Use a Hadoop catalog to avoid Hive schema conversion (Hive doesn't support VARIANT yet) + spark.conf().set("spark.sql.catalog." + CATALOG, SparkCatalog.class.getName()); + spark.conf().set("spark.sql.catalog." + CATALOG + ".type", "hadoop"); + spark.conf().set("spark.sql.catalog." + CATALOG + ".default-namespace", "default"); + spark.conf().set("spark.sql.catalog." + CATALOG + ".cache-enabled", "false"); + // point warehouse to a temp directory + String temp = System.getProperty("java.io.tmpdir") + "/iceberg_spark_variant_warehouse"; + spark.conf().set("spark.sql.catalog." + CATALOG + ".warehouse", temp); + + sql("DROP TABLE IF EXISTS %s", TABLE); + sql( + "CREATE TABLE %s (id BIGINT, v1 VARIANT, v2 VARIANT) USING iceberg " + + "TBLPROPERTIES ('format-version'='3')", + TABLE); + + String v1r1 = "{\"a\":1}"; + String v2r1 = "{\"x\":10}"; + String v1r2 = "{\"b\":2}"; + String v2r2 = "{\"y\":20}"; + + sql("INSERT INTO %s SELECT 1, parse_json('%s'), parse_json('%s')", TABLE, v1r1, v2r1); + sql("INSERT INTO %s SELECT 2, parse_json('%s'), parse_json('%s')", TABLE, v1r2, v2r2); + } + + @AfterEach + public void cleanup() { + sql("DROP TABLE IF EXISTS %s", TABLE); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testVariantColumnProjection_singleVariant(boolean vectorized) { + assumeFalse(vectorized, "Variant vectorized Parquet read is not implemented yet"); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('read.parquet.vectorization.enabled'='%s')", + TABLE, String.valueOf(vectorized)); + Dataset df = spark.table(TABLE).select("id", "v1").orderBy("id"); + assertThat(df.schema().fieldNames()).containsExactly("id", "v1"); + assertThat(df.count()).isEqualTo(2); + + java.util.List directRows = df.collectAsList(); + Object v1row1 = directRows.get(0).get(1); + Object v1row2 = directRows.get(1).get(1); + Variant vv1; + Variant vv2; + if (v1row1 instanceof Variant) { + vv1 = (Variant) v1row1; + vv2 = (Variant) v1row2; + } else if (v1row1 instanceof VariantVal) { + vv1 = new Variant(((VariantVal) v1row1).getValue(), ((VariantVal) v1row1).getMetadata()); + vv2 = new Variant(((VariantVal) v1row2).getValue(), ((VariantVal) v1row2).getMetadata()); + } else { + org.assertj.core.api.Assertions.fail( + "Expected Variant/VariantVal but got: " + (v1row1 == null ? "null" : v1row1.getClass())); + return; + } + + // row 1 has {"a":1} + Variant fieldA = vv1.getFieldByKey("a"); + assertThat(fieldA).isNotNull(); + assertThat(fieldA.getLong()).isEqualTo(1L); + + // row 2 has {"b":2} + Variant fieldB = vv2.getFieldByKey("b"); + assertThat(fieldB).isNotNull(); + assertThat(fieldB.getLong()).isEqualTo(2L); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testVariantColumnProjection_noVariant(boolean vectorized) { + assumeFalse(vectorized, "Variant vectorized Parquet read is not implemented yet"); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('read.parquet.vectorization.enabled'='%s')", + TABLE, String.valueOf(vectorized)); + Dataset df = spark.table(TABLE).select("id"); + assertThat(df.schema().fieldNames()).containsExactly("id"); + assertThat(df.count()).isEqualTo(2); + assertThat(df.collectAsList()).extracting(r -> r.getLong(0)).containsExactlyInAnyOrder(1L, 2L); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testFilterOnVariantColumnOnWholeValue(boolean vectorized) { + assumeFalse(vectorized, "Variant vectorized Parquet read is not implemented yet"); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('read.parquet.vectorization.enabled'='%s')", + TABLE, String.valueOf(vectorized)); + sql("INSERT INTO %s SELECT 3, NULL, NULL", TABLE); + + Dataset nullDf = spark.table(TABLE).where("v1 IS NULL").select("id"); + assertThat(nullDf.collectAsList()).extracting(r -> r.getLong(0)).containsExactly(3L); + + Dataset notNullDf = spark.table(TABLE).where("v1 IS NOT NULL").select("id"); + assertThat(notNullDf.collectAsList()) + .extracting(r -> r.getLong(0)) + .containsExactlyInAnyOrder(1L, 2L); + + // verify variant contents for non-null rows + Dataset notNullVals = + spark + .table(TABLE) + .where("v1 IS NOT NULL") + .selectExpr("id", "to_json(v1) as v1_json") + .orderBy("id"); + java.util.List nn = notNullVals.collectAsList(); + assertThat(nn).hasSize(2); + assertThat(nn.get(0).getLong(0)).isEqualTo(1L); + assertThat(nn.get(0).getString(1)).isEqualTo("{\"a\":1}"); + assertThat(nn.get(1).getLong(0)).isEqualTo(2L); + assertThat(nn.get(1).getString(1)).isEqualTo("{\"b\":2}"); + } +} From 6e4873a524367d85225abf75468e493bec0b932c Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 7 Oct 2025 11:40:32 -0700 Subject: [PATCH 2/6] add a test for variant null value projection --- .../spark/sql/TestSparkVariantRead.java | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java index d22f476136d5..938acf34cfab 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java @@ -109,7 +109,7 @@ public void testVariantColumnProjection_singleVariant(boolean vectorized) { @ParameterizedTest @ValueSource(booleans = {false, true}) - public void testVariantColumnProjection_noVariant(boolean vectorized) { + public void testVariantColumnProjectionNoVariant(boolean vectorized) { assumeFalse(vectorized, "Variant vectorized Parquet read is not implemented yet"); sql( "ALTER TABLE %s SET TBLPROPERTIES ('read.parquet.vectorization.enabled'='%s')", @@ -151,4 +151,24 @@ public void testFilterOnVariantColumnOnWholeValue(boolean vectorized) { assertThat(nn.get(1).getLong(0)).isEqualTo(2L); assertThat(nn.get(1).getString(1)).isEqualTo("{\"b\":2}"); } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testVariantNullValueProjection(boolean vectorized) { + assumeFalse(vectorized, "Variant vectorized Parquet read is not implemented yet"); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('read.parquet.vectorization.enabled'='%s')", + TABLE, String.valueOf(vectorized)); + + // insert a row with NULL variant values + sql("INSERT INTO %s SELECT 10, NULL, NULL", TABLE); + + // select id and variant; ensure the variant value is null + Dataset df = spark.table(TABLE).where("id = 10").select("id", "v1"); + java.util.List rows = df.collectAsList(); + assertThat(rows).hasSize(1); + Row row = rows.get(0); + assertThat(row.getLong(0)).isEqualTo(10L); + assertThat(row.isNullAt(1)).isTrue(); + } } From d6d1fb178512e33203edb40e2a70d1025415a7cd Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Thu, 9 Oct 2025 10:40:00 -0700 Subject: [PATCH 3/6] address comment --- .../spark/sql/TestSparkVariantRead.java | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java index 938acf34cfab..7a31c8148e3e 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java @@ -19,8 +19,10 @@ package org.apache.iceberg.spark.sql; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assumptions.assumeFalse; +import static org.assertj.core.api.Assertions.fail; +import static org.assertj.core.api.Assumptions.assumeThat; +import java.util.List; import org.apache.iceberg.spark.SparkCatalog; import org.apache.iceberg.spark.TestBase; import org.apache.spark.sql.Dataset; @@ -28,6 +30,7 @@ import org.apache.spark.types.variant.Variant; import org.apache.spark.unsafe.types.VariantVal; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -37,8 +40,8 @@ public class TestSparkVariantRead extends TestBase { private static final String CATALOG = "local"; private static final String TABLE = CATALOG + ".default.var"; - @BeforeEach - public void setupCatalog() { + @BeforeAll + public static void setupCatalog() { // Use a Hadoop catalog to avoid Hive schema conversion (Hive doesn't support VARIANT yet) spark.conf().set("spark.sql.catalog." + CATALOG, SparkCatalog.class.getName()); spark.conf().set("spark.sql.catalog." + CATALOG + ".type", "hadoop"); @@ -47,7 +50,10 @@ public void setupCatalog() { // point warehouse to a temp directory String temp = System.getProperty("java.io.tmpdir") + "/iceberg_spark_variant_warehouse"; spark.conf().set("spark.sql.catalog." + CATALOG + ".warehouse", temp); + } + @BeforeEach + public void setupTable() { sql("DROP TABLE IF EXISTS %s", TABLE); sql( "CREATE TABLE %s (id BIGINT, v1 VARIANT, v2 VARIANT) USING iceberg " @@ -71,7 +77,7 @@ public void cleanup() { @ParameterizedTest @ValueSource(booleans = {false, true}) public void testVariantColumnProjection_singleVariant(boolean vectorized) { - assumeFalse(vectorized, "Variant vectorized Parquet read is not implemented yet"); + assumeThat(vectorized).as("Variant vectorized Parquet read is not implemented yet").isFalse(); sql( "ALTER TABLE %s SET TBLPROPERTIES ('read.parquet.vectorization.enabled'='%s')", TABLE, String.valueOf(vectorized)); @@ -79,7 +85,7 @@ public void testVariantColumnProjection_singleVariant(boolean vectorized) { assertThat(df.schema().fieldNames()).containsExactly("id", "v1"); assertThat(df.count()).isEqualTo(2); - java.util.List directRows = df.collectAsList(); + List directRows = df.collectAsList(); Object v1row1 = directRows.get(0).get(1); Object v1row2 = directRows.get(1).get(1); Variant vv1; @@ -91,8 +97,7 @@ public void testVariantColumnProjection_singleVariant(boolean vectorized) { vv1 = new Variant(((VariantVal) v1row1).getValue(), ((VariantVal) v1row1).getMetadata()); vv2 = new Variant(((VariantVal) v1row2).getValue(), ((VariantVal) v1row2).getMetadata()); } else { - org.assertj.core.api.Assertions.fail( - "Expected Variant/VariantVal but got: " + (v1row1 == null ? "null" : v1row1.getClass())); + fail("Expected Variant/VariantVal but got: " + (v1row1 == null ? "null" : v1row1.getClass())); return; } @@ -110,7 +115,7 @@ public void testVariantColumnProjection_singleVariant(boolean vectorized) { @ParameterizedTest @ValueSource(booleans = {false, true}) public void testVariantColumnProjectionNoVariant(boolean vectorized) { - assumeFalse(vectorized, "Variant vectorized Parquet read is not implemented yet"); + assumeThat(vectorized).as("Variant vectorized Parquet read is not implemented yet").isFalse(); sql( "ALTER TABLE %s SET TBLPROPERTIES ('read.parquet.vectorization.enabled'='%s')", TABLE, String.valueOf(vectorized)); @@ -123,7 +128,7 @@ public void testVariantColumnProjectionNoVariant(boolean vectorized) { @ParameterizedTest @ValueSource(booleans = {false, true}) public void testFilterOnVariantColumnOnWholeValue(boolean vectorized) { - assumeFalse(vectorized, "Variant vectorized Parquet read is not implemented yet"); + assumeThat(vectorized).as("Variant vectorized Parquet read is not implemented yet").isFalse(); sql( "ALTER TABLE %s SET TBLPROPERTIES ('read.parquet.vectorization.enabled'='%s')", TABLE, String.valueOf(vectorized)); @@ -144,7 +149,7 @@ public void testFilterOnVariantColumnOnWholeValue(boolean vectorized) { .where("v1 IS NOT NULL") .selectExpr("id", "to_json(v1) as v1_json") .orderBy("id"); - java.util.List nn = notNullVals.collectAsList(); + List nn = notNullVals.collectAsList(); assertThat(nn).hasSize(2); assertThat(nn.get(0).getLong(0)).isEqualTo(1L); assertThat(nn.get(0).getString(1)).isEqualTo("{\"a\":1}"); @@ -155,7 +160,7 @@ public void testFilterOnVariantColumnOnWholeValue(boolean vectorized) { @ParameterizedTest @ValueSource(booleans = {false, true}) public void testVariantNullValueProjection(boolean vectorized) { - assumeFalse(vectorized, "Variant vectorized Parquet read is not implemented yet"); + assumeThat(vectorized).as("Variant vectorized Parquet read is not implemented yet").isFalse(); sql( "ALTER TABLE %s SET TBLPROPERTIES ('read.parquet.vectorization.enabled'='%s')", TABLE, String.valueOf(vectorized)); @@ -165,7 +170,7 @@ public void testVariantNullValueProjection(boolean vectorized) { // select id and variant; ensure the variant value is null Dataset df = spark.table(TABLE).where("id = 10").select("id", "v1"); - java.util.List rows = df.collectAsList(); + List rows = df.collectAsList(); assertThat(rows).hasSize(1); Row row = rows.get(0); assertThat(row.getLong(0)).isEqualTo(10L); From fd3f6567483998ebe6b09d1d526a4dc35a482f9d Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 13 Oct 2025 14:20:15 -0700 Subject: [PATCH 4/6] address comments --- .../apache/iceberg/spark/sql/TestSparkVariantRead.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java index 7a31c8148e3e..a53af333a11d 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java @@ -90,14 +90,14 @@ public void testVariantColumnProjection_singleVariant(boolean vectorized) { Object v1row2 = directRows.get(1).get(1); Variant vv1; Variant vv2; - if (v1row1 instanceof Variant) { - vv1 = (Variant) v1row1; - vv2 = (Variant) v1row2; - } else if (v1row1 instanceof VariantVal) { + if (v1row1 instanceof VariantVal) { vv1 = new Variant(((VariantVal) v1row1).getValue(), ((VariantVal) v1row1).getMetadata()); vv2 = new Variant(((VariantVal) v1row2).getValue(), ((VariantVal) v1row2).getMetadata()); } else { - fail("Expected Variant/VariantVal but got: " + (v1row1 == null ? "null" : v1row1.getClass())); + fail( + "Expected VariantVal but got: row1=%s, row2=%s", + (v1row1 == null ? "null" : v1row1.getClass()), + (v1row2 == null ? "null" : v1row2.getClass())); return; } From 15063e419cb892063513458e9468b0b454923afa Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 15 Oct 2025 14:39:16 -0700 Subject: [PATCH 5/6] address comments --- .../spark/sql/TestSparkVariantRead.java | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java index a53af333a11d..7b81a77fb581 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java @@ -88,18 +88,12 @@ public void testVariantColumnProjection_singleVariant(boolean vectorized) { List directRows = df.collectAsList(); Object v1row1 = directRows.get(0).get(1); Object v1row2 = directRows.get(1).get(1); - Variant vv1; - Variant vv2; - if (v1row1 instanceof VariantVal) { - vv1 = new Variant(((VariantVal) v1row1).getValue(), ((VariantVal) v1row1).getMetadata()); - vv2 = new Variant(((VariantVal) v1row2).getValue(), ((VariantVal) v1row2).getMetadata()); - } else { - fail( - "Expected VariantVal but got: row1=%s, row2=%s", - (v1row1 == null ? "null" : v1row1.getClass()), - (v1row2 == null ? "null" : v1row2.getClass())); - return; - } + assertThat(v1row1).isInstanceOf(VariantVal.class); + assertThat(v1row2).isInstanceOf(VariantVal.class); + VariantVal r1 = (VariantVal) v1row1; + VariantVal r2 = (VariantVal) v1row2; + Variant vv1 = new Variant(r1.getValue(), r1.getMetadata()); + Variant vv2 = new Variant(r2.getValue(), r2.getMetadata()); // row 1 has {"a":1} Variant fieldA = vv1.getFieldByKey("a"); From ece81cba4f0eb53d39ae2ac4370f67d9d48c5b02 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 15 Oct 2025 14:49:07 -0700 Subject: [PATCH 6/6] remove un-used import --- .../java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java index 7b81a77fb581..5ffb3523914f 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkVariantRead.java @@ -19,7 +19,6 @@ package org.apache.iceberg.spark.sql; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; import static org.assertj.core.api.Assumptions.assumeThat; import java.util.List;