alamb commented on code in PR #11943:
URL: https://github.com/apache/datafusion/pull/11943#discussion_r1715835534
##########
datafusion/expr-common/src/groups_accumulator.rs:
##########
@@ -123,7 +151,7 @@ pub trait GroupsAccumulator: Send {
/// future use. The group_indices on subsequent calls to
/// `update_batch` or `merge_batch` will be shifted down by
/// `n`. See [`EmitTo::First`] for more details.
- fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef>;
+ fn evaluate(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
Review Comment:
it would help to document what expectations are on the Vec of array refs
##########
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs:
##########
@@ -68,11 +70,21 @@ where
fn update_batch(
Review Comment:
In order to realize the benefit of this blocked implementation I think you
will need to change the state of the accumulators so that instead of a single
large buffer
```rust
/// values per group
values: BooleanBufferBuilder,
```
The state is held in chunks like
```rust
/// blocks of values per group
values: Vec<BooleanBufferBuilder>
```
(or possibly this to support taking them out individually)
```rust
/// blocks of values per group, None when taken
values: Vec<Option<BooleanBufferBuilder>>
```
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -353,7 +355,7 @@ pub(crate) struct GroupedHashAggregateStream {
/// scratch space for the current input [`RecordBatch`] being
/// processed. Reused across batches here to avoid reallocations
- current_group_indices: Vec<usize>,
+ current_group_indices: Vec<u64>,
Review Comment:
Another alternative might be to ensure that the block size is aligned across
the aggegators and group values -- that way there would be no stitching arrays
together into batches during emission
##########
datafusion/expr-common/src/groups_accumulator.rs:
##########
@@ -31,6 +31,13 @@ pub enum EmitTo {
/// For example, if `n=10`, group_index `0, 1, ... 9` are emitted
/// and group indexes '`10, 11, 12, ...` become `0, 1, 2, ...`.
First(usize),
+ /// Emit all groups managed by blocks
+ AllBlocks,
+ /// Emit only the first `n` group blocks,
+ /// similar as `First`, but used in blocked `GroupValues` and
`GroupAccumulator`.
+ ///
+ /// For example, `n=3`, `block size=4`, finally 12 groups will be returned.
+ FirstBlocks(usize),
Review Comment:
Rather than having two parallel emission modes for blocked output, I wonder
if we could have some sort of "take" mode whose semantics did not shift the
existing values down
For example, what if we introduced a notion of "block" across the group keys
and and aggregators
```rust
pub enum EmitTo {
/// Same
All,
/// Same
First(usize),
/// Takes the N'th block of rows from this accumulator
/// it is an error to take the same batch twice or to emit `All` or
`First`
/// after any TakeBatch(usize)
TakeBlock(usize)
}
```
And then we would for example, make sure the group values and aggregators
all saved data using blocks of 100K rows
Then to emit 1M rows, the accumulators would emit like
```
EmitTo::TakeBlock(0)
EmitTo::TakeBlock(1)
EmitTo::TakeBlock(2)
...
EmitTo::TakeBlock(9)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]