Hi FengYu,

This is because of count_distinct() on user id. We rewrite this into a
multi-step aggregation where we first aggregate per on the grouping keys
(date) and the distinct values (user_id), and then we aggregate on the
grouping keys only.  The first step is needed to get the distinct values.

I hope this helps.

Cheers,
Herman

On Thu, Oct 16, 2025 at 6:08 AM FengYu Cao <[email protected]> wrote:

> Hi Spark community,
>
> I encountered an unexpected behavior when using `percentile()` and
> `count_distinct()` in a simple groupBy aggregation, and I’d like to confirm
> whether this is expected behavior or a potential correctness issue.
>
> Environment:
> - Apache Spark 3.5.7
> - Data source: Parquet
> - Schema:
>   root
>    |-- user_id: long (nullable = true)
>    |-- duration: long (nullable = true)
>    |-- date: date (nullable = true)
>
> Reproduction code:
> ------------------------------------------------------------
> df.groupBy("date").agg(
>     F.count_distinct('user_id').alias('n_users'),
>     F.percentile('duration', 0.95).alias('p95')
> ).explain()
> ------------------------------------------------------------
>
> Physical plan:
> ------------------------------------------------------------
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- ObjectHashAggregate(keys=[date#561],
> functions=[percentile(duration#559L, 0.95, 1, 0, 0, false), count(distinct
> user_id#558L)])
>    +- Exchange hashpartitioning(date#561, 200), ENSURE_REQUIREMENTS,
> [plan_id=717]
>       +- ObjectHashAggregate(keys=[date#561],
> functions=[merge_percentile(duration#559L, 0.95, 1, 0, 0, false),
> partial_count(distinct user_id#558L)])
>          +- ObjectHashAggregate(keys=[date#561, user_id#558L],
> functions=[merge_percentile(duration#559L, 0.95, 1, 0, 0, false)])
>             +- Exchange hashpartitioning(date#561, user_id#558L, 200),
> ENSURE_REQUIREMENTS, [plan_id=713]
>                +- ObjectHashAggregate(keys=[date#561, user_id#558L],
> functions=[partial_percentile(duration#559L, 0.95, 1, 0, 0, false)])
>                   +- FileScan parquet
> [user_id#558L,duration#559L,date#561] ...
> ------------------------------------------------------------
>
> Question:
> Why is there an additional `ObjectHashAggregate` on `(date, user_id)` when
> the logical aggregation only groups by `date`?
>
> For the case of groupBy("date") + count_distinct + sum, it seems fine.
> But for percentile, could this lead to a potential correctness issue?
> (when [date, user_id] is not unique)
>
> --
> *camper42 (曹丰宇)*
> Douban, Inc.
>

Reply via email to