jayzhan211 commented on code in PR #12996:
URL: https://github.com/apache/datafusion/pull/12996#discussion_r1823736082
##########
datafusion/physical-plan/src/aggregates/group_values/column.rs:
##########
@@ -196,6 +570,324 @@ impl GroupValues for GroupValuesColumn {
let b =
ByteViewGroupValueBuilder::<BinaryViewType>::new();
v.push(Box::new(b) as _)
}
+ dt => {
+ return not_impl_err!(
+ "{dt} not supported in VectorizedGroupValuesColumn"
+ )
+ }
+ }
+ }
+ self.group_values = v;
+ }
+
+ // tracks to which group each of the input rows belongs
+ groups.clear();
+ groups.resize(n_rows, usize::MAX);
+
+ let mut batch_hashes = mem::take(&mut self.hashes_buffer);
+ batch_hashes.clear();
+ batch_hashes.resize(n_rows, 0);
+ create_hashes(cols, &self.random_state, &mut batch_hashes)?;
+
+ // General steps for one round `vectorized equal_to & append`:
+ // 1. Collect vectorized context by checking hash values of `cols`
in `map`,
+ // mainly fill `vectorized_append_row_indices`,
`vectorized_equal_to_row_indices`
+ // and `vectorized_equal_to_group_indices`
+ //
+ // 2. Perform `vectorized_append` for
`vectorized_append_row_indices`.
+ // `vectorized_append` must be performed before
`vectorized_equal_to`,
+ // because some `group indices` in
`vectorized_equal_to_group_indices`
+ // may be actually placeholders, and still point to no actual
values in
+ // `group_values` before performing append.
+ //
+ // 3. Perform `vectorized_equal_to` for
`vectorized_equal_to_row_indices`
+ // and `vectorized_equal_to_group_indices`. If found some rows in
input `cols`
+ // not equal to `exist rows` in `group_values`, place them in
`scalarized_indices`
+ // and perform `scalarized_intern` for them similar as what in
[`GroupValuesColumn`]
+ // after.
+ //
+ // 4. Perform `scalarized_intern` for rows mentioned above, when we
process like this
+ // can see the comments of `scalarized_intern`.
+ //
+
+ // 1. Collect vectorized context by checking hash values of `cols` in
`map`
+ self.collect_vectorized_process_context(&batch_hashes, groups);
+
+ // 2. Perform `vectorized_append`
+ self.vectorized_append(cols);
+
+ // 3. Perform `vectorized_equal_to`
+ self.vectorized_equal_to(cols, groups);
+
+ // 4. Perform `scalarized_intern`
+ self.scalarized_intern(cols, &batch_hashes, groups);
+
+ self.hashes_buffer = batch_hashes;
+
+ Ok(())
+ }
+
+ fn size(&self) -> usize {
+ let group_values_size: usize = self.group_values.iter().map(|v|
v.size()).sum();
+ group_values_size + self.map_size + self.hashes_buffer.allocated_size()
+ }
+
+ fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ fn len(&self) -> usize {
+ if self.group_values.is_empty() {
+ return 0;
+ }
+
+ self.group_values[0].len()
+ }
+
+ fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+ let mut output = match emit_to {
+ EmitTo::All => {
+ let group_values = mem::take(&mut self.group_values);
+ debug_assert!(self.group_values.is_empty());
+
+ group_values
+ .into_iter()
+ .map(|v| v.build())
+ .collect::<Vec<_>>()
+ }
+ EmitTo::First(n) => {
+ let output = self
Review Comment:
~I think first n is only called in `emit_early_if_necessary` in
`GroupOrdering::None` case, but it seems there is no test coverage of this. If
I add panic here, the existing test passes too.~
I saw the test below
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]