alamb commented on code in PR #12996: URL: https://github.com/apache/datafusion/pull/12996#discussion_r1829848355
########## datafusion/physical-plan/src/aggregates/group_values/column.rs: ########## @@ -75,55 +148,653 @@ pub struct GroupValuesColumn { random_state: RandomState, } -impl GroupValuesColumn { +/// Buffers to store intermediate results in `vectorized_append` +/// and `vectorized_equal_to`, for reducing memory allocation +#[derive(Default)] +struct VectorizedOperationBuffers { + /// The `vectorized append` row indices buffer + append_row_indices: Vec<usize>, + + /// The `vectorized_equal_to` row indices buffer + equal_to_row_indices: Vec<usize>, + + /// The `vectorized_equal_to` group indices buffer + equal_to_group_indices: Vec<usize>, + + /// The `vectorized_equal_to` result buffer + equal_to_results: Vec<bool>, + + /// The buffer for storing row indices found not equal to + /// exist groups in `group_values` in `vectorized_equal_to`. + /// We will perform `scalarized_intern` for such rows. + remaining_row_indices: Vec<usize>, +} + +impl VectorizedOperationBuffers { + fn clear(&mut self) { + self.append_row_indices.clear(); + self.equal_to_row_indices.clear(); + self.equal_to_group_indices.clear(); + self.equal_to_results.clear(); + self.remaining_row_indices.clear(); + } +} + +impl<const STREAMING: bool> GroupValuesColumn<STREAMING> { + // ======================================================================== + // Initialization functions + // ======================================================================== + /// Create a new instance of GroupValuesColumn if supported for the specified schema pub fn try_new(schema: SchemaRef) -> Result<Self> { let map = RawTable::with_capacity(0); Review Comment: This `with_capacity` can probably be improved (as a follow on PR) to avoid some smaller allocations ########## datafusion/physical-plan/src/aggregates/group_values/group_column.rs: ########## @@ -56,14 +59,40 @@ pub trait GroupColumn: Send + Sync { /// /// Note that this comparison returns true if both elements are NULL fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool; + /// Appends the row at `row` in `array` to this builder fn append_val(&mut self, array: &ArrayRef, row: usize); + + /// The vectorized version equal to + /// + /// When found nth row stored in this builder at `lhs_row` + /// is equal to the row in `array` at `rhs_row`, + /// it will record the `true` result at the corresponding + /// position in `equal_to_results`. + /// + /// And if found nth result in `equal_to_results` is already Review Comment: this is quite clever to pass in the existing "is equal to results" ########## datafusion/physical-plan/src/aggregates/group_values/column.rs: ########## @@ -75,55 +148,653 @@ pub struct GroupValuesColumn { random_state: RandomState, } -impl GroupValuesColumn { +/// Buffers to store intermediate results in `vectorized_append` +/// and `vectorized_equal_to`, for reducing memory allocation +#[derive(Default)] +struct VectorizedOperationBuffers { + /// The `vectorized append` row indices buffer + append_row_indices: Vec<usize>, + + /// The `vectorized_equal_to` row indices buffer + equal_to_row_indices: Vec<usize>, + + /// The `vectorized_equal_to` group indices buffer + equal_to_group_indices: Vec<usize>, + + /// The `vectorized_equal_to` result buffer + equal_to_results: Vec<bool>, + + /// The buffer for storing row indices found not equal to + /// exist groups in `group_values` in `vectorized_equal_to`. + /// We will perform `scalarized_intern` for such rows. + remaining_row_indices: Vec<usize>, +} + +impl VectorizedOperationBuffers { + fn clear(&mut self) { + self.append_row_indices.clear(); + self.equal_to_row_indices.clear(); + self.equal_to_group_indices.clear(); + self.equal_to_results.clear(); + self.remaining_row_indices.clear(); + } +} + +impl<const STREAMING: bool> GroupValuesColumn<STREAMING> { + // ======================================================================== + // Initialization functions + // ======================================================================== + /// Create a new instance of GroupValuesColumn if supported for the specified schema pub fn try_new(schema: SchemaRef) -> Result<Self> { let map = RawTable::with_capacity(0); Ok(Self { schema, map, + group_index_lists: Vec::new(), + emit_group_index_list_buffer: Vec::new(), + vectorized_operation_buffers: VectorizedOperationBuffers::default(), map_size: 0, group_values: vec![], hashes_buffer: Default::default(), random_state: Default::default(), }) } - /// Returns true if [`GroupValuesColumn`] supported for the specified schema - pub fn supported_schema(schema: &Schema) -> bool { - schema - .fields() + // ======================================================================== + // Scalarized intern + // ======================================================================== + + /// Scalarized intern + /// + /// This is used only for `streaming aggregation`, because `streaming aggregation` + /// depends on the order between `input rows` and their corresponding `group indices`. + /// + /// For example, assuming `input rows` in `cols` with 4 new rows + /// (not equal to `exist rows` in `group_values`, and need to create + /// new groups for them): + /// + /// ```text + /// row1 (hash collision with the exist rows) + /// row2 + /// row3 (hash collision with the exist rows) + /// row4 + /// ``` + /// + /// # In `scalarized_intern`, their `group indices` will be + /// + /// ```text + /// row1 --> 0 + /// row2 --> 1 + /// row3 --> 2 + /// row4 --> 3 + /// ``` + /// + /// `Group indices` order agrees with their input order, and the `streaming aggregation` + /// depends on this. + /// + /// # However In `vectorized_intern`, their `group indices` will be + /// + /// ```text + /// row1 --> 2 + /// row2 --> 0 + /// row3 --> 3 + /// row4 --> 1 + /// ``` + /// + /// `Group indices` order are against with their input order, and this will lead to error + /// in `streaming aggregation`. + /// + fn scalarized_intern( Review Comment: this is basically the same as `GroupValuesColumn::intern` was previously, which makes sense to me ########## datafusion/physical-plan/src/aggregates/group_values/column.rs: ########## @@ -75,55 +148,653 @@ pub struct GroupValuesColumn { random_state: RandomState, } -impl GroupValuesColumn { +/// Buffers to store intermediate results in `vectorized_append` Review Comment: 👍 ########## datafusion/physical-plan/src/aggregates/group_values/group_column.rs: ########## @@ -56,14 +59,40 @@ pub trait GroupColumn: Send + Sync { /// /// Note that this comparison returns true if both elements are NULL fn equal_to(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool; + /// Appends the row at `row` in `array` to this builder fn append_val(&mut self, array: &ArrayRef, row: usize); Review Comment: Maybe as a follow on we can consider removing `append_val` and `equal_to` and simpl change all codepaths to use the vectorized version ########## datafusion/physical-plan/src/aggregates/group_values/group_column.rs: ########## @@ -287,6 +469,63 @@ where }; } + fn vectorized_equal_to( Review Comment: What i have been dreaming about with @XiangpengHao is maybe something like adding `take` / `filter` to arrow array builders I took this opportunity to write up the idea (finally) for your amusement: - https://github.com/apache/arrow-rs/issues/6692 ########## datafusion/physical-plan/src/aggregates/group_values/group_column.rs: ########## @@ -128,6 +157,89 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn } } + fn vectorized_equal_to( + &self, + lhs_rows: &[usize], + array: &ArrayRef, + rhs_rows: &[usize], + equal_to_results: &mut [bool], + ) { + let array = array.as_primitive::<T>(); + + let iter = izip!( + lhs_rows.iter(), + rhs_rows.iter(), + equal_to_results.iter_mut(), + ); + + for (&lhs_row, &rhs_row, equal_to_result) in iter { + // Has found not equal to in previous column, don't need to check + if !*equal_to_result { + continue; + } + + // Perf: skip null check (by short circuit) if input is not nullable + if NULLABLE { + let exist_null = self.nulls.is_null(lhs_row); + let input_null = array.is_null(rhs_row); + if let Some(result) = nulls_equal_to(exist_null, input_null) { + *equal_to_result = result; + continue; + } + // Otherwise, we need to check their values + } + + *equal_to_result = self.group_values[lhs_row] == array.value(rhs_row); + } + } + + fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) { + let arr = array.as_primitive::<T>(); + + let null_count = array.null_count(); + let num_rows = array.len(); + let all_null_or_non_null = if null_count == 0 { + Some(true) + } else if null_count == num_rows { + Some(false) + } else { + None + }; + + match (NULLABLE, all_null_or_non_null) { + (true, None) => { + for &row in rows { + if array.is_null(row) { + self.nulls.append(true); + self.group_values.push(T::default_value()); + } else { + self.nulls.append(false); + self.group_values.push(arr.value(row)); + } + } + } + + (true, Some(true)) => { + self.nulls.append_n(rows.len(), false); + for &row in rows { + self.group_values.push(arr.value(row)); + } + } + + (true, Some(false)) => { + self.nulls.append_n(rows.len(), true); + self.group_values + .extend(iter::repeat(T::default_value()).take(rows.len())); + } + + (false, _) => { + for &row in rows { + self.group_values.push(arr.value(row)); Review Comment:  that uf possible the inner loop just looks like this (memcopy!) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org