Skip to content

feat: Add native collect_list aggregate support#4727

Draft
peterxcli wants to merge 5 commits into
apache:mainfrom
peterxcli:multi-stage-distinct-combined-collect-list-collect-set
Draft

feat: Add native collect_list aggregate support#4727
peterxcli wants to merge 5 commits into
apache:mainfrom
peterxcli:multi-stage-distinct-combined-collect-list-collect-set

Conversation

@peterxcli

Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #4724.

Rationale for this change

collect_list / array_agg and collect_set use Spark TypedImperativeAggregate buffers that Spark declares as serialized BinaryType, while Comet’s native implementations keep the real aggregate state as an Arrow/Spark ArrayType.

The existing schema adjustment only handled simple two-stage Partial -> Final collect aggregates. Spark’s distinct-aggregate rewrite can introduce multi-stage plans with PartialMerge stages, for example:

SELECT x, count(DISTINCT y), collect_list(z) FROM t GROUP BY x

Without correcting the intermediate buffer schema for these stages, a fully-native pipeline can fail when native list state is treated as Spark binary state. This change makes the native array state round-trip through Partial, PartialMerge, and mixed {Partial, PartialMerge} stages so collect_list / collect_set can run fully native in distinct-combined aggregate plans.

What changes are included in this PR?

  • Adds native collect_list / array_agg aggregate support:

    • Adds CollectList to the aggregate expression proto.
    • Adds JVM serde for Spark CollectList.
    • Registers CollectList -> CometCollectList.
    • Wires native planning to datafusion_spark::function::aggregate::collect::SparkCollectList.
  • Extends collect aggregate native-state schema adjustment:

    • Updates CometObjectHashAggregateExec.adjustOutputForNativeState to handle both CollectList and CollectSet.
    • Applies the rewrite to intermediate Partial, PartialMerge, and mixed {Partial, PartialMerge} stages, not only pure Partial.
    • Rewrites Spark’s serialized BinaryType buffer attributes to Comet’s native ArrayType(elementType, containsNull = true) state type.
  • Adds regression coverage for fully-native distinct-combined collect aggregates.

  • Updates expression support docs and aggregate audit notes for collect_list / array_agg.

How are these changes tested?

Ran:

cargo fmt --manifest-path native/Cargo.toml --all
cargo check --manifest-path native/Cargo.toml -p datafusion-comet
make core
./mvnw test -Dtest=none -Dsuites="org.apache.comet.exec.CometAggregateSuite collect_list/collect_set combined with distinct aggregate runs fully native" -Dscalastyle.skip=true -Pspark-3.5
./mvnw spotless:check -Dscalastyle.skip=true -Pspark-3.5

* binary). However, the native Comet aggregate produces the actual state type (e.g.,
* ArrayType(elementType) for CollectSet). This method corrects the output schema to match the
* native state types so the shuffle exchange schema is consistent with the actual data.
* For intermediate aggregates containing TypedImperativeAggregate functions (like CollectSet or

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peterxcli peterxcli marked this pull request as draft June 25, 2026 17:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support fully-native multi-stage (distinct-combined) collect_list / collect_set

2 participants