feat: native collect_list / array_agg aggregate#4720
Open
andygrove wants to merge 2 commits into
Open
Conversation
Wires Spark's CollectList aggregate to datafusion-spark's SparkCollectList. array_agg, registered as a SQL alias of CollectList in FunctionRegistry, is also covered. Closes apache#2524.
bf6b0c1 to
c9c19e3
Compare
…gates
A distinct aggregate combined with collect_list/collect_set produces a
multi-stage plan (Partial -> PartialMerge -> Final). CollectList/CollectSet
declare a BinaryType buffer in Spark but produce a native ArrayType state, so
Comet cannot read a Spark-produced Binary buffer, nor round-trip its own
ArrayType buffer across the intermediate PartialMerge stages. Both led to
native crashes ("could not cast Binary to List" / "cast List to Binary").
Force these multi-stage aggregates to fall back to Spark consistently:
- tag the feeding Partial when a PartialMerge stage of CollectList/CollectSet
is present (CometExecRule.tagUnsafePartialAggregates), and
- fall back a PartialMerge stage whose buffer was produced by a Spark partial
(CometBaseAggregate.doConvert).
Two-stage collect_list/collect_set continue to run natively.
Patch the upstream SPARK-22223 plan-shape test to disable Comet, since native
collect_list removes the ObjectHashAggregateExec it asserts on.
Enabling fully-native multi-stage execution is tracked in apache#4724.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #2524.
Rationale for this change
collect_listand its aliasarray_aggare common aggregate functions that previously fell back to Spark, breaking native execution for many real workloads (notably plans that group rows into arrays before further processing). Adding native support keeps these queries on the Comet path.What changes are included in this PR?
This change was scaffolded with the
implement-comet-expressionskill.CollectListmessage toexpr.protoand a newcollectList = 18arm in theAggExproneof.CometCollectListserde inaggregates.scalaand registerclassOf[CollectList] -> CometCollectListinQueryPlanSerde.datafusion_spark::function::aggregate::collect::SparkCollectList, the upstream Spark-compatible accumulator (Spark 3.4 through 4.1 useignore_nulls = truesemantics that match it). No Comet-local Rust function is added.adjustOutputForNativeStatefix inoperators.scalato coverCollectList(same pattern already used forCollectSet: native producesArrayType(elementType)while Spark declares the buffer asBinaryType).collect_listandarray_aggas supported in the user-guide expression page.docs/source/contributor-guide/expression-audits/agg_funcs.mdcovering Spark 3.4.3, 3.5.8, 4.0.1, 4.1.1.How are these changes tested?
spark/src/test/resources/sql-tests/expressions/aggregate/collect_list.sqlexercises ~30 queries across types (boolean, byte/short/int/bigint, float/double including NaN/Inf/-0, string, binary, decimal up to (38,0), date, timestamp, struct, nested array), GROUP BY, NULLs, all-NULL groups, empty tables, single-row, mixed aggregates, multiplecollect_listcolumns, DISTINCT, HAVING, thearray_aggalias, INT/BIGINT boundary values, and an SPARK-17641 null-filter regression. The fixture runs underConfigMatrix: parquet.enable.dictionary=false,true, so each query executes twice.CometSqlFileTestSuitetests pass (./mvnw test -Dsuites="org.apache.comet.CometSqlFileTestSuite expressions/aggregate" -Dtest=none -Pspark-3.5), confirming no regression tocollect_setor other aggregates.cargo clippy --all-targets --workspace -- -D warningsis clean.