Dandandan opened a new pull request, #21856:
URL: https://github.com/apache/datafusion/pull/21856
## Which issue does this PR close?
- Closes #.
## Rationale for this change
Apache DataFusion's grouped hash aggregate today scales beyond a thread's L3
cache by letting the in-memory hash table grow unbounded, then spilling to disk
on memory-pool exhaustion. Between "fits in L3" and "memory pool exhausted"
there's a wide regime where the hash table thrashes cache (one near-miss per
row), with no defense.
This PR adds a cache-efficient fallback inspired by Müller et al.,
*Cache-Efficient Aggregation: Hashing Is Sorting* (SIGMOD 2015): when the
working set outgrows a thread's share of last-level cache, the operator
radix-partitions the in-memory partial-aggregate state into a fixed number of
bucketed runs, and after the input is drained re-aggregates each bucket
independently with a fresh, cache-resident hash table. Each bucket holds ~`K /
32` groups, so it stays small.
The trigger is the **working-set size** (sized to L3), not memory-pool
exhaustion — by the time the pool is full the hash table has been thrashing
cache for a while.
## What changes are included in this PR?
- New `OutOfMemoryMode::RadixPartition` for non-Partial modes with
`GroupOrdering::None`.
- New `RadixPartitionState` holding `Vec<Vec<RecordBatch>>` indexed by 5-bit
hash bucket (`NUM_RADIX_PARTITIONS = 32`).
- Proactive cache-size trigger in the `ReadingInput` poll path: after each
ingested batch, if `group_values.size() + accumulators.size() > threshold`,
flush the hash table into bucketed runs and continue.
- Bucket-by-bucket drain phase reuses the existing `is_stream_merging`
machinery (each bucket's runs become a `BucketStream` that flows back through
`merge_batch`).
- Fast-path bypass: if no radix flush ever fired during ingestion, fall
through to the normal emit path so output ordering and tiny-query latency are
unchanged.
- Two new config options:
- `datafusion.execution.aggregate_radix_partitioned` (bool, default
**true**)
- `datafusion.execution.aggregate_radix_partitioned_threshold_bytes`
(default 32 MiB)
### Known limitations (draft)
- **Disk-spill fallback for an oversized single bucket is not implemented.**
If a single bucket itself exceeds the memory budget during drain, the operator
surfaces an error rather than recursively partitioning or falling back to disk.
Recursive partitioning (paper's approach, with progressively higher hash bits)
is the right fix; this draft chose to defer it. Tests that explicitly assert
the disk-spill path now disable the flag.
- **The 32 MiB default threshold is too low** for many ClickBench queries —
see benchmark numbers below.
## Are these changes tested?
Three new unit tests in `datafusion-physical-plan`
(`test_radix_partitioned_high_cardinality`, `_low_cardinality`,
`_single_group`) drive a `Single`-mode aggregate with the cache-size threshold
pinned to 64 bytes (so the radix flush fires repeatedly) and assert the output
multiset matches the non-radix path on the same input.
The pre-existing spill-specific tests
(`test_aggregate_with_spill_if_necessary`,
`test_sort_reservation_fails_during_spill`, `aggregate_source_*_with_spill`)
explicitly disable the flag in their `TaskContext` so they continue to exercise
the disk-spill code path.
All 89 `aggregates::` tests pass.
## Are there any user-facing changes?
The two new config keys appear in `information_schema.df_settings` and
`docs/source/user-guide/configs.md`. No public Rust API changes.
## Preliminary ClickBench numbers (single `hits.parquet`, 3 iterations)
| Metric | Radix off | Radix on (default) |
| --- | --- | --- |
| Total time | 37.95 s | 37.48 s (**~1.2% faster**) |
| Faster | — | 6 queries |
| Slower | — | 16 queries |
| No change | — | 21 queries |
Notable wins:
- Q19: **1.50x** faster (65 → 44 ms)
- Q32: **1.40x** faster (4.29 s → 3.06 s)
- Q18: 1.16x faster (3.15 s → 2.71 s)
Notable regressions:
- Q31: 1.33x slower (458 → 610 ms)
- Q13: 1.29x slower (`GROUP BY URL`, ~10M distinct)
- Q8: 1.25x slower
- Q4–Q9, Q14–Q16: all 1.13–1.25x slower
The regression pattern is consistent with a too-low threshold: queries whose
per-thread hash table sits just above 32 MiB get a forced extra round trip
through bucket drain even though the data fit fine in L3. Threshold tuning + a
reduction-factor gate (only flush if recent batches show poor aggregation) are
the obvious next steps before this is ready for merge.
## Follow-ups
- Tune the default threshold (try 128 MiB / 256 MiB; likely needs to be
cpu-count-aware).
- Add a reduction-factor gate analogous to `SkipAggregationProbe` so we
don't flush a hash table that is aggregating well.
- Implement recursive partitioning with higher-order hash bits for oversized
buckets, with disk-spill as the final fallback.
- Run with `--iterations 5+` and on the partitioned dataset for less
variance.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]