Rachelint commented on code in PR #13681:
URL: https://github.com/apache/datafusion/pull/13681#discussion_r1931237984


##########
datafusion/functions-aggregate/src/median.rs:
##########
@@ -230,6 +276,201 @@ impl<T: ArrowNumericType> Accumulator for 
MedianAccumulator<T> {
     }
 }
 
+/// The median groups accumulator accumulates the raw input values
+///
+/// For calculating the accurate medians of groups, we need to store all values
+/// of groups before final evaluation.
+/// So values in each group will be stored in a `Vec<T>`, and the total group 
values
+/// will be actually organized as a `Vec<Vec<T>>`.
+///
+#[derive(Debug)]
+struct MedianGroupsAccumulator<T: ArrowNumericType + Send> {
+    data_type: DataType,
+    group_values: Vec<Vec<T::Native>>,
+}
+
+impl<T: ArrowNumericType + Send> MedianGroupsAccumulator<T> {
+    pub fn new(data_type: DataType) -> Self {
+        Self {
+            data_type,
+            group_values: Vec::new(),
+        }
+    }
+}
+
+impl<T: ArrowNumericType + Send> GroupsAccumulator for 
MedianGroupsAccumulator<T> {
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indices: &[usize],
+        opt_filter: Option<&BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()> {
+        assert_eq!(values.len(), 1, "single argument to update_batch");
+        let values = values[0].as_primitive::<T>();
+
+        // Push the `not nulls + not filtered` row into its group
+        self.group_values.resize(total_num_groups, Vec::new());
+        accumulate(
+            group_indices,
+            values,
+            opt_filter,
+            |group_index, new_value| {
+                self.group_values[group_index].push(new_value);
+            },
+        );
+
+        Ok(())
+    }
+
+    fn merge_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indices: &[usize],
+        // Since aggregate filter should be applied in partial stage, in final 
stage there should be no filter
+        _opt_filter: Option<&BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()> {
+        assert_eq!(values.len(), 1, "one argument to merge_batch");
+
+        // The merged values should be organized like as a `ListArray` which 
is nullable
+        // (input with nulls usually generated from `convert_to_state`), but 
`inner array` of
+        // `ListArray`  is `non-nullable`.
+        //
+        // Following is the possible and impossible input `values`:
+        //
+        // # Possible values
+        // ```text
+        //   group 0: [1, 2, 3]
+        //   group 1: null (list array is nullable)
+        //   group 2: [6, 7, 8]
+        //   ...
+        //   group n: [...]
+        // ```
+        //
+        // # Impossible values
+        // ```text
+        //   group x: [1, 2, null] (values in list array is non-nullable)
+        // ```
+        //
+        let input_group_values = values[0].as_list::<i32>();
+
+        // Ensure group values big enough
+        self.group_values.resize(total_num_groups, Vec::new());
+
+        // Extend values to related groups
+        // TODO: avoid using iterator of the `ListArray`, this will lead to
+        // many calls of `slice` of its ``inner array`, and `slice` is not
+        // so efficient(due to the calculation of `null_count` for each 
`slice`).
+        group_indices
+            .iter()
+            .zip(input_group_values.iter())
+            .for_each(|(&group_index, values_opt)| {
+                if let Some(values) = values_opt {
+                    let values = values.as_primitive::<T>();
+                    
self.group_values[group_index].extend(values.values().iter());
+                }
+            });
+
+        Ok(())
+    }
+
+    fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+        // Emit values
+        let emit_group_values = emit_to.take_needed(&mut self.group_values);
+
+        // Build offsets
+        let mut offsets = Vec::with_capacity(self.group_values.len() + 1);
+        offsets.push(0);
+        let mut cur_len = 0;
+        for group_value in &emit_group_values {
+            cur_len += group_value.len() as i32;

Review Comment:
   I left a todo about this, and just keep using `OffsetBuffer::new` currently.
   
   ```
           // TODO: maybe we can use `OffsetBuffer::new_unchecked` like what in 
`convert_to_state`,
           // but safety should be considered more carefully here(and I am not 
sure if it can get
           // performance improvement when we introduce checks to keep the 
safety...).
           //
           // Can see more details in:
           // 
https://github.com/apache/datafusion/pull/13681#discussion_r1931209791
           //
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to