alamb commented on code in PR #11943:
URL: https://github.com/apache/datafusion/pull/11943#discussion_r1727162759
##########
datafusion/expr-common/src/groups_accumulator.rs:
##########
@@ -143,6 +145,25 @@ pub trait GroupsAccumulator: Send {
/// [`Accumulator::state`]: crate::accumulator::Accumulator::state
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
+ /// Returns `true` if this accumulator supports blocked mode.
+ fn supports_blocked_mode(&self) -> bool {
+ false
+ }
+
+ /// Switch the accumulator to flat or blocked mode.
+ /// You can see detail about the mode on [GroupStatesMode].
+ ///
+ /// After switching mode, all data in previous mode will be cleared.
+ fn switch_to_mode(&mut self, mode: GroupStatesMode) -> Result<()> {
Review Comment:
I understand why you introduced this API, but I think it makes the
accumulators harder to reason about because now each now has two potential
modes so there are two similar, but not the same parallel implementations that
we have to ensure are tested. I had an idea to avoid this switch_to_mode API
below
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -529,10 +552,53 @@ impl GroupedHashAggregateStream {
spill_state,
group_values_soft_limit: agg.limit,
skip_aggregation_probe,
+ enable_blocked_group_states,
})
}
}
+/// Check if we can enable the blocked optimization for `GroupValues` and
`GroupsAccumulator`s.
+/// The blocked optimization will be enabled when:
+/// - It is not streaming aggregation(because blocked mode can't support
Emit::first(exact n))
+/// - The spilling is disabled(still need to consider more to support it
efficiently)
+/// - The accumulator is not empty(I am still not sure about logic in this
case)
+/// - `GroupValues` and all `GroupsAccumulator`s support blocked mode
+// TODO: support blocked optimization in streaming, spilling, and maybe empty
accumulators case?
+fn maybe_enable_blocked_group_states(
+ context: &TaskContext,
+ group_values: &mut dyn GroupValues,
+ accumulators: &mut [Box<dyn GroupsAccumulator>],
+ block_size: usize,
+ group_ordering: &GroupOrdering,
+) -> Result<bool> {
+ if !matches!(group_ordering, GroupOrdering::None)
+ || accumulators.is_empty()
+ || enable_spilling(context.memory_pool().as_ref())
+ {
+ return Ok(false);
+ }
+
+ let group_supports_blocked = group_values.supports_blocked_mode();
+ let accumulators_support_blocked =
+ accumulators.iter().all(|acc| acc.supports_blocked_mode());
+
+ match (group_supports_blocked, accumulators_support_blocked) {
+ (true, true) => {
+ group_values.switch_to_mode(GroupStatesMode::Blocked(block_size))?;
+ accumulators.iter_mut().try_for_each(|acc| {
+ acc.switch_to_mode(GroupStatesMode::Blocked(block_size))
+ })?;
+ Ok(true)
+ }
+ _ => Ok(false),
+ }
+}
+
+// TODO: we should add a function(like `name`) to distinguish different memory
pools.
+fn enable_spilling(memory_pool: &dyn MemoryPool) -> bool {
+ !format!("{memory_pool:?}").contains("UnboundedMemoryPool")
Review Comment:
I think using this check
https://docs.rs/datafusion/latest/datafusion/execution/struct.DiskManager.html#method.tmp_files_enabled
is likely the more correct way.
Also, the fact that many systems won't use a unbounded pool during execution
means that this check will make this optimization only supported in very
specialized cases.
However I see that the issue is that when chunked emission is enabled, then
we haven't figured out spilling yet.
##########
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs:
##########
@@ -92,32 +101,69 @@ where
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
+ if total_num_groups == 0 {
+ return Ok(());
+ }
+
assert_eq!(values.len(), 1, "single argument to update_batch");
let values = values[0].as_primitive::<T>();
- // update values
- self.values.resize(total_num_groups, self.starting_value);
-
// NullState dispatches / handles tracking nulls and groups that saw
no values
- self.null_state.accumulate(
- group_indices,
- values,
- opt_filter,
- total_num_groups,
- |group_index, new_value| {
- let value = &mut self.values[group_index];
- (self.prim_fn)(value, new_value);
- },
- );
+ match self.mode {
+ GroupStatesMode::Flat => {
+ // Ensure enough room in values
+ ensure_enough_room_for_flat_values(
+ &mut self.values_blocks,
+ total_num_groups,
+ self.starting_value,
+ );
+
+ let block = self.values_blocks.current_mut().unwrap();
+ self.null_state.accumulate_for_flat(
+ group_indices,
+ values,
+ opt_filter,
+ total_num_groups,
+ |group_index, new_value| {
+ let value = &mut block[group_index];
+ (self.prim_fn)(value, new_value);
+ },
+ );
+ }
+ GroupStatesMode::Blocked(blk_size) => {
Review Comment:
Would it be possible to change `prim_op` so that it *always* used blocked
state. I am concerned (as I mentioned above) about the fact that we have now
two parallel implementations in *all* the accumulators that support this
chunked state
Not only is this a bit more code, now we have a second path that must be
tested in all of them, which I think is a substantial undertaking (@2010YOUY01
referred to this as well)
What if we changed the group by hash operator so it always got blocked
output (`Vec<RecordBatch>`) from the accumulators that supported it? It could
then slice the output from accumulators that could only output a single record
batch, as it does today.
This would mean that if an accumulator supported Blocked output, it could
always create BlockedOutput.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]