-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Spark: Enable remote scan planning with REST catalog #14822
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg; | ||
|
|
||
| /** Marker interface to indicate whether a Table requires remote scan planning */ | ||
| public interface RequiresRemoteScanPlanning {} | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,64 @@ | ||||||||||||||||
| /* | ||||||||||||||||
| * Licensed to the Apache Software Foundation (ASF) under one | ||||||||||||||||
| * or more contributor license agreements. See the NOTICE file | ||||||||||||||||
| * distributed with this work for additional information | ||||||||||||||||
| * regarding copyright ownership. The ASF licenses this file | ||||||||||||||||
| * to you under the Apache License, Version 2.0 (the | ||||||||||||||||
| * "License"); you may not use this file except in compliance | ||||||||||||||||
| * with the License. You may obtain a copy of the License at | ||||||||||||||||
| * | ||||||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||
| * | ||||||||||||||||
| * Unless required by applicable law or agreed to in writing, | ||||||||||||||||
| * software distributed under the License is distributed on an | ||||||||||||||||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||||||||||||
| * KIND, either express or implied. See the License for the | ||||||||||||||||
| * specific language governing permissions and limitations | ||||||||||||||||
| * under the License. | ||||||||||||||||
| */ | ||||||||||||||||
| package org.apache.iceberg.spark.extensions; | ||||||||||||||||
|
|
||||||||||||||||
| import org.apache.iceberg.CatalogProperties; | ||||||||||||||||
| import org.apache.iceberg.ParameterizedTestExtension; | ||||||||||||||||
| import org.apache.iceberg.Parameters; | ||||||||||||||||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||||||||||||||||
| import org.apache.iceberg.rest.RESTCatalogProperties; | ||||||||||||||||
| import org.apache.iceberg.spark.SparkCatalogConfig; | ||||||||||||||||
| import org.apache.iceberg.spark.sql.TestSelect; | ||||||||||||||||
| import org.junit.jupiter.api.Disabled; | ||||||||||||||||
| import org.junit.jupiter.api.TestTemplate; | ||||||||||||||||
| import org.junit.jupiter.api.extension.ExtendWith; | ||||||||||||||||
|
|
||||||||||||||||
| @ExtendWith(ParameterizedTestExtension.class) | ||||||||||||||||
| public class TestRemoteScanPlanning extends TestSelect { | ||||||||||||||||
| @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, binaryTableName = {3}") | ||||||||||||||||
| protected static Object[][] parameters() { | ||||||||||||||||
| return new Object[][] { | ||||||||||||||||
| { | ||||||||||||||||
| SparkCatalogConfig.REST.catalogName(), | ||||||||||||||||
| SparkCatalogConfig.REST.implementation(), | ||||||||||||||||
| ImmutableMap.builder() | ||||||||||||||||
| .putAll(SparkCatalogConfig.REST.properties()) | ||||||||||||||||
| .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) | ||||||||||||||||
| // this flag is typically only set by the server, but we set it from the client for | ||||||||||||||||
| // testing | ||||||||||||||||
| .put(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true") | ||||||||||||||||
| .build(), | ||||||||||||||||
| SparkCatalogConfig.REST.catalogName() + ".default.binary_table" | ||||||||||||||||
| } | ||||||||||||||||
| }; | ||||||||||||||||
|
Comment on lines
+36
to
+49
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we should add an entry here : iceberg/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java Lines 49 to 55 in cc02655
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we should run all of the tests now with remote scan planning enabled across all Spark tests that use the REST catalog
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not ? i am mostly coming from POV for non protected tables this should work as expected ? is it the execution time we are concerned for
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there are tests that rely on local/distributed planning and setting this flag would bypass those. Also there are still some cases that don't work with remote scan planning, such as querying metadata tables or certain filter conditions, therefore I don't think we should make remote scan planning the default as we would lose test coverage for local/distributed testing
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I agree with @nastra here, I'd probably start out with a separate set of tests for RemoteScanPlanning. Ideally in the long run we should just be able to plug in remote planning basically everywhere but I'm not sure it's a good idea to start there at least in main because it may be more noisy than it's worth at this stage. And as @nastra said I believe there are tests which are predicated on client side planning. We can always remove this down the line but for now I think we just want some reasonable confidence in the scan planning integration, rather than plugging in tests everywhere. I also do think test times are an issue but I think the solution space for that is a bit different. I do think it's a good idea though @singhpk234 to do that separately, and we can use that to identify any gaps!
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My recommendation was mostly from POV that we need tables reads via remote scan planning can be written to as for normal tables flow and leveraging the existing test case specially in future addition to lets say geo type expression in the rest spec which can be used in filtering, one just add the test for that and remote scan planning will automatically be tested or will flag hey this is broken with remote planning. Regarding the metadata table not being read, we do want to atleast test that it works when remote planning planning is enabled (even though the client skips remote planning for it) But we can always build on it incrementally, need to be this PR :), totally make sense to move forward for this one ! |
||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| @TestTemplate | ||||||||||||||||
| @Disabled( | ||||||||||||||||
| "binary filter that is used by Spark is not working because ExpressionParser.fromJSON doesn't have the Schema to properly parse the filter expression") | ||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can pass context via ParserContext to the parser, like we did we parsing planning response. are we tracking the a followup post this ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I haven't open a separate issue yet but will do and follow up on this
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've created #14879 that shows the detailed steps that are happening. The error basically happens on the server before we actually use whatever is passed in |
||||||||||||||||
| public void testBinaryInFilter() { | ||||||||||||||||
| super.testBinaryInFilter(); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| @TestTemplate | ||||||||||||||||
| @Disabled("Metadata tables are currently not supported") | ||||||||||||||||
|
singhpk234 marked this conversation as resolved.
|
||||||||||||||||
| public void testMetadataTables() { | ||||||||||||||||
| super.testMetadataTables(); | ||||||||||||||||
| } | ||||||||||||||||
| } | ||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| import static org.apache.iceberg.TableProperties.SPLIT_SIZE; | ||
| import static org.assertj.core.api.Assertions.assertThat; | ||
| import static org.assertj.core.api.Assertions.assertThatThrownBy; | ||
| import static org.assertj.core.api.Assumptions.assumeThat; | ||
|
|
||
| import java.sql.Timestamp; | ||
| import java.text.SimpleDateFormat; | ||
|
|
@@ -36,6 +37,7 @@ | |
| import org.apache.iceberg.events.ScanEvent; | ||
| import org.apache.iceberg.exceptions.ValidationException; | ||
| import org.apache.iceberg.expressions.Expressions; | ||
| import org.apache.iceberg.hive.HiveCatalog; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; | ||
| import org.apache.iceberg.spark.CatalogTestBase; | ||
| import org.apache.iceberg.spark.Spark3Util; | ||
|
|
@@ -679,4 +681,35 @@ public void simpleTypesInFilter() { | |
|
|
||
| sql("DROP TABLE IF EXISTS %s", tableName); | ||
| } | ||
|
|
||
| @TestTemplate | ||
| public void variantTypeInFilter() { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Late review. Is this from another PR? |
||
| assumeThat(validationCatalog) | ||
| .as("Variant is not supported in Hive catalog") | ||
| .isNotInstanceOf(HiveCatalog.class); | ||
|
|
||
| String tableName = tableName("variant_table"); | ||
| sql( | ||
| "CREATE TABLE %s (id BIGINT, v1 VARIANT, v2 VARIANT) USING iceberg TBLPROPERTIES ('format-version'='3')", | ||
| tableName); | ||
|
|
||
| String v1r1 = "{\"a\":5}"; | ||
| String v1r2 = "{\"a\":10}"; | ||
| String v2r1 = "{\"x\":15}"; | ||
| String v2r2 = "{\"x\":20}"; | ||
|
|
||
| sql("INSERT INTO %s SELECT 1, parse_json('%s'), parse_json('%s')", tableName, v1r1, v2r1); | ||
| sql("INSERT INTO %s SELECT 2, parse_json('%s'), parse_json('%s')", tableName, v1r2, v2r2); | ||
|
|
||
| assertThat( | ||
| sql( | ||
| "SELECT id, try_variant_get(v1, '$.a', 'int') FROM %s WHERE try_variant_get(v1, '$.a', 'int') > 5", | ||
| tableName)) | ||
| .containsExactly(row(2L, 10)); | ||
| assertThat( | ||
| sql( | ||
| "SELECT id, try_variant_get(v2, '$.x', 'int') FROM %s WHERE try_variant_get(v2, '$.x', 'int') < 100", | ||
| tableName)) | ||
| .containsExactlyInAnyOrder(row(1L, 15), row(2L, 20)); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't RESTTable itself means its a table is RemoteScanPlanned ? can we do an instance of check with RESTTable ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RESTTable is package-private and we don't want to leak that into Spark. I'm also not sure whether
RequiresRemoteScanPlanningshould actually be in therestpackage or notThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think that it's a good idea to decouple the concrete RESTTable type from any required planning behavior. Any table implementation can just choose to implement the interface rather than having to be tied to RESTTable, so it's an easy way to achieve flexibility without too much additional indirection or complexity.
More importantly though after more thought I think there's a conceptual difference between the type of the table and the planning mode, enough of a separation that justifies the additional interface at least in my head. Sure the current implementation could only return a RESTTable if remote planning is required but it does not have to be that way, e.g. if I wanted to plugin my own client side of scan planning in my own table implementation for example (For tests or if I know I only care about a slimmed down version of the protocol, or some concurrency requirements that better fit my environment etc).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've moved
RequiresRemoteScanPlanningout of therestpackage in order to decouple it completely from REST behavior