diff --git a/core/src/main/java/org/apache/iceberg/RequiresRemoteScanPlanning.java b/core/src/main/java/org/apache/iceberg/RequiresRemoteScanPlanning.java new file mode 100644 index 000000000000..440ea70007ee --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/RequiresRemoteScanPlanning.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +/** Marker interface to indicate whether a Table requires remote scan planning */ +public interface RequiresRemoteScanPlanning {} diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTable.java b/core/src/main/java/org/apache/iceberg/rest/RESTTable.java index 052e1432703f..0abe41e25f50 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTTable.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTable.java @@ -25,12 +25,13 @@ import org.apache.iceberg.BatchScan; import org.apache.iceberg.BatchScanAdapter; import org.apache.iceberg.ImmutableTableScanContext; +import org.apache.iceberg.RequiresRemoteScanPlanning; import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableScan; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.metrics.MetricsReporter; -class RESTTable extends BaseTable { +class RESTTable extends BaseTable implements RequiresRemoteScanPlanning { private final RESTClient client; private final Supplier> headers; private final MetricsReporter reporter; diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java new file mode 100644 index 000000000000..6e389730f7a7 --- /dev/null +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.RESTCatalogProperties; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.sql.TestSelect; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestRemoteScanPlanning extends TestSelect { + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, binaryTableName = {3}") + protected static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.REST.catalogName(), + SparkCatalogConfig.REST.implementation(), + ImmutableMap.builder() + .putAll(SparkCatalogConfig.REST.properties()) + .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) + // this flag is typically only set by the server, but we set it from the client for + // testing + .put(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true") + .build(), + SparkCatalogConfig.REST.catalogName() + ".default.binary_table" + } + }; + } + + @TestTemplate + @Disabled( + "binary filter that is used by Spark is not working because ExpressionParser.fromJSON doesn't have the Schema to properly parse the filter expression") + public void testBinaryInFilter() { + super.testBinaryInFilter(); + } + + @TestTemplate + @Disabled("Metadata tables are currently not supported") + public void testMetadataTables() { + super.testMetadataTables(); + } +} diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 11b0ba58af51..dd914f1617bd 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -34,6 +34,7 @@ import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.MetricsModes; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RequiresRemoteScanPlanning; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SparkDistributedDataScan; @@ -760,7 +761,9 @@ public StructType readSchema() { } private BatchScan newBatchScan() { - if (table instanceof BaseTable && readConf.distributedPlanningEnabled()) { + if (table instanceof RequiresRemoteScanPlanning) { + return table.newBatchScan(); + } else if (table instanceof BaseTable && readConf.distributedPlanningEnabled()) { return new SparkDistributedDataScan(spark, table, readConf); } else { return table.newBatchScan(); diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java index 2d4d63c1a127..cf4ccd62dbc3 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java @@ -21,6 +21,7 @@ import static org.apache.iceberg.TableProperties.SPLIT_SIZE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.sql.Timestamp; import java.text.SimpleDateFormat; @@ -36,6 +37,7 @@ import org.apache.iceberg.events.ScanEvent; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hive.HiveCatalog; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.spark.CatalogTestBase; import org.apache.iceberg.spark.Spark3Util; @@ -679,4 +681,35 @@ public void simpleTypesInFilter() { sql("DROP TABLE IF EXISTS %s", tableName); } + + @TestTemplate + public void variantTypeInFilter() { + assumeThat(validationCatalog) + .as("Variant is not supported in Hive catalog") + .isNotInstanceOf(HiveCatalog.class); + + String tableName = tableName("variant_table"); + sql( + "CREATE TABLE %s (id BIGINT, v1 VARIANT, v2 VARIANT) USING iceberg TBLPROPERTIES ('format-version'='3')", + tableName); + + String v1r1 = "{\"a\":5}"; + String v1r2 = "{\"a\":10}"; + String v2r1 = "{\"x\":15}"; + String v2r2 = "{\"x\":20}"; + + sql("INSERT INTO %s SELECT 1, parse_json('%s'), parse_json('%s')", tableName, v1r1, v2r1); + sql("INSERT INTO %s SELECT 2, parse_json('%s'), parse_json('%s')", tableName, v1r2, v2r2); + + assertThat( + sql( + "SELECT id, try_variant_get(v1, '$.a', 'int') FROM %s WHERE try_variant_get(v1, '$.a', 'int') > 5", + tableName)) + .containsExactly(row(2L, 10)); + assertThat( + sql( + "SELECT id, try_variant_get(v2, '$.x', 'int') FROM %s WHERE try_variant_get(v2, '$.x', 'int') < 100", + tableName)) + .containsExactlyInAnyOrder(row(1L, 15), row(2L, 20)); + } }