Hi FengYu, This is because of count_distinct() on user id. We rewrite this into a multi-step aggregation where we first aggregate per on the grouping keys (date) and the distinct values (user_id), and then we aggregate on the grouping keys only. The first step is needed to get the distinct values.
I hope this helps. Cheers, Herman On Thu, Oct 16, 2025 at 6:08 AM FengYu Cao <[email protected]> wrote: > Hi Spark community, > > I encountered an unexpected behavior when using `percentile()` and > `count_distinct()` in a simple groupBy aggregation, and I’d like to confirm > whether this is expected behavior or a potential correctness issue. > > Environment: > - Apache Spark 3.5.7 > - Data source: Parquet > - Schema: > root > |-- user_id: long (nullable = true) > |-- duration: long (nullable = true) > |-- date: date (nullable = true) > > Reproduction code: > ------------------------------------------------------------ > df.groupBy("date").agg( > F.count_distinct('user_id').alias('n_users'), > F.percentile('duration', 0.95).alias('p95') > ).explain() > ------------------------------------------------------------ > > Physical plan: > ------------------------------------------------------------ > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- ObjectHashAggregate(keys=[date#561], > functions=[percentile(duration#559L, 0.95, 1, 0, 0, false), count(distinct > user_id#558L)]) > +- Exchange hashpartitioning(date#561, 200), ENSURE_REQUIREMENTS, > [plan_id=717] > +- ObjectHashAggregate(keys=[date#561], > functions=[merge_percentile(duration#559L, 0.95, 1, 0, 0, false), > partial_count(distinct user_id#558L)]) > +- ObjectHashAggregate(keys=[date#561, user_id#558L], > functions=[merge_percentile(duration#559L, 0.95, 1, 0, 0, false)]) > +- Exchange hashpartitioning(date#561, user_id#558L, 200), > ENSURE_REQUIREMENTS, [plan_id=713] > +- ObjectHashAggregate(keys=[date#561, user_id#558L], > functions=[partial_percentile(duration#559L, 0.95, 1, 0, 0, false)]) > +- FileScan parquet > [user_id#558L,duration#559L,date#561] ... > ------------------------------------------------------------ > > Question: > Why is there an additional `ObjectHashAggregate` on `(date, user_id)` when > the logical aggregation only groups by `date`? > > For the case of groupBy("date") + count_distinct + sum, it seems fine. > But for percentile, could this lead to a potential correctness issue? > (when [date, user_id] is not unique) > > -- > *camper42 (曹丰宇)* > Douban, Inc. >
