alamb commented on code in PR #11943:
URL: https://github.com/apache/datafusion/pull/11943#discussion_r1723125203
##########
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs:
##########
@@ -458,6 +717,91 @@ fn initialize_builder(
builder
}
+/// Similar as the [initialize_builder] but designed for the blocked version
accumulator
+fn ensure_enough_room_for_nulls(
+ builder_blocks: &mut Blocks<BooleanBufferBuilder>,
+ mode: GroupStatesMode,
+ total_num_groups: usize,
+ default_value: bool,
+) {
+ if total_num_groups == 0 {
+ return;
+ }
+
+ match mode {
+ // It flat mode, we just a single builder, and grow it constantly.
+ GroupStatesMode::Flat => {
+ if builder_blocks.num_blocks() == 0 {
+ builder_blocks.push_block(BooleanBufferBuilder::new(0));
+ }
+
+ let builder = builder_blocks.current_mut().unwrap();
+ if builder.len() < total_num_groups {
+ let new_groups = total_num_groups - builder.len();
+ builder.append_n(new_groups, default_value);
+ }
+ }
+ // In blocked mode, we ensure the blks are enough first,
+ // and then ensure slots in blks are enough.
+ GroupStatesMode::Blocked(blk_size) => {
+ let (mut cur_blk_idx, exist_slots) = if
builder_blocks.num_blocks() > 0 {
+ let cur_blk_idx = builder_blocks.num_blocks() - 1;
+ let exist_slots = (builder_blocks.num_blocks() - 1) * blk_size
+ + builder_blocks.current().unwrap().len();
+
+ (cur_blk_idx, exist_slots)
+ } else {
+ (0, 0)
+ };
+
+ // No new groups, don't need to expand, just return.
+ if exist_slots >= total_num_groups {
+ return;
+ }
+
+ // Ensure blks are enough
+ let exist_blks = builder_blocks.num_blocks();
+ let new_blks = (total_num_groups + blk_size - 1) / blk_size -
exist_blks;
+ if new_blks > 0 {
+ for _ in 0..new_blks {
+
builder_blocks.push_block(BooleanBufferBuilder::new(blk_size));
+ }
+ }
+
+ // Ensure slots are enough.
+ let mut new_slots = total_num_groups - exist_slots;
+
+ // Expand current blk.
+ let cur_blk_rest_slots = blk_size -
builder_blocks[cur_blk_idx].len();
+ if cur_blk_rest_slots >= new_slots {
+ builder_blocks[cur_blk_idx].append_n(new_slots, default_value);
+ return;
+ }
+
+ // Expand current blk to full, and expand next blks
+ builder_blocks[cur_blk_idx].append_n(cur_blk_rest_slots,
default_value);
+ new_slots -= cur_blk_rest_slots;
+ cur_blk_idx += 1;
+
+ // Expand blks
+ let expand_blks = new_slots / blk_size;
+ for _ in 0..expand_blks {
+ builder_blocks[cur_blk_idx].append_n(blk_size, default_value);
+ cur_blk_idx += 1;
+ }
+
+ // Expand the last blk.
+ let last_expand_slots = new_slots % blk_size;
+ if last_expand_slots > 0 {
+ builder_blocks
+ .current_mut()
+ .unwrap()
+ .append_n(last_expand_slots, default_value);
+ }
+ }
+ }
+}
+
Review Comment:
given the amount of code here, I this should have unit tests that cover the
basic operation
##########
datafusion/expr-common/src/groups_accumulator.rs:
##########
@@ -52,8 +71,248 @@ impl EmitTo {
std::mem::swap(v, &mut t);
t
}
+ Self::NextBlock(_) => unreachable!(
+ "can not support blocked emission in take_needed, you should
use take_needed_from_blocks"
+ ),
+ }
+ }
+
+ /// Removes the number of rows from `blocks` required to emit,
+ /// returning a `Vec` with elements taken.
+ ///
+ /// The detailed behavior in different emissions:
+ /// - For Emit::CurrentBlock, the first block will be taken and return.
+ /// - For Emit::All and Emit::First, it will be only supported in
`GroupStatesMode::Flat`,
+ /// similar as `take_needed`.
+ pub fn take_needed_from_blocks<T>(
+ &self,
+ blocks: &mut VecBlocks<T>,
+ mode: GroupStatesMode,
+ ) -> Vec<T> {
+ match self {
+ Self::All => {
+ debug_assert!(matches!(mode, GroupStatesMode::Flat));
+ blocks.pop_first_block().unwrap()
+ }
+ Self::First(n) => {
+ debug_assert!(matches!(mode, GroupStatesMode::Flat));
+
+ let block = blocks.current_mut().unwrap();
+ let split_at = min(block.len(), *n);
+
+ // get end n+1,.. values into t
+ let mut t = block.split_off(split_at);
+ // leave n+1,.. in v
+ std::mem::swap(block, &mut t);
+ t
+ }
+ Self::NextBlock(_) => {
+ debug_assert!(matches!(mode, GroupStatesMode::Blocked(_)));
+ blocks.pop_first_block().unwrap()
+ }
+ }
+ }
+}
+
+/// Mode for `accumulators` and `group values`
Review Comment:
👍
##########
datafusion/expr-common/src/groups_accumulator.rs:
##########
@@ -52,8 +71,248 @@ impl EmitTo {
std::mem::swap(v, &mut t);
t
}
+ Self::NextBlock(_) => unreachable!(
+ "can not support blocked emission in take_needed, you should
use take_needed_from_blocks"
+ ),
+ }
+ }
+
+ /// Removes the number of rows from `blocks` required to emit,
+ /// returning a `Vec` with elements taken.
+ ///
+ /// The detailed behavior in different emissions:
+ /// - For Emit::CurrentBlock, the first block will be taken and return.
+ /// - For Emit::All and Emit::First, it will be only supported in
`GroupStatesMode::Flat`,
+ /// similar as `take_needed`.
+ pub fn take_needed_from_blocks<T>(
+ &self,
+ blocks: &mut VecBlocks<T>,
+ mode: GroupStatesMode,
+ ) -> Vec<T> {
+ match self {
+ Self::All => {
+ debug_assert!(matches!(mode, GroupStatesMode::Flat));
+ blocks.pop_first_block().unwrap()
+ }
+ Self::First(n) => {
+ debug_assert!(matches!(mode, GroupStatesMode::Flat));
+
+ let block = blocks.current_mut().unwrap();
+ let split_at = min(block.len(), *n);
+
+ // get end n+1,.. values into t
+ let mut t = block.split_off(split_at);
+ // leave n+1,.. in v
+ std::mem::swap(block, &mut t);
+ t
+ }
+ Self::NextBlock(_) => {
+ debug_assert!(matches!(mode, GroupStatesMode::Blocked(_)));
+ blocks.pop_first_block().unwrap()
+ }
+ }
+ }
+}
+
+/// Mode for `accumulators` and `group values`
+///
+/// Their meanings:
+/// - Flat, the values in them will be managed with a single `Vec`.
+/// It will grow constantly when more and more values are inserted,
+/// that leads to a considerable amount of copying, and finally a bad
performance.
+///
+/// - Blocked(block_size), the values in them will be managed with multiple
`Vec`s.
+/// When the block is large enough(reach block_size), a new block will be
allocated
+/// and used for inserting.
+/// Obviously, this strategy can avoid copying and get a good performance.
+#[derive(Debug, Clone, Copy)]
+pub enum GroupStatesMode {
+ Flat,
+ Blocked(usize),
+}
+
+/// Blocked style group index used in blocked mode group values and
accumulators
Review Comment:
a minor comment here is it would be great to try and keep the logic
(`BlockedGroupIndex`, `Blocks`, etc) in another crate (as datafusion-expr
mostly currently contains logical `Expr`s and interfaces)
Perhaps we could move them to
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/
(obviously we can't move some things like `GroupStatesMode` as that is
needed for `GroupsAccumulator::switch_to_mode`)
##########
datafusion/physical-plan/src/aggregates/group_values/bytes.rs:
##########
@@ -115,6 +116,11 @@ impl<O: OffsetSizeTrait> GroupValues for
GroupValuesByes<O> {
emit_group_values
}
+ EmitTo::NextBlock(_) => {
Review Comment:
I think supporting chunked emission in groups would also likely help click
bench performance, but is a natural follow on project.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]