Rachelint commented on code in PR #13681:
URL: https://github.com/apache/datafusion/pull/13681#discussion_r1931213097


##########
datafusion/functions-aggregate/src/median.rs:
##########
@@ -230,6 +276,201 @@ impl<T: ArrowNumericType> Accumulator for 
MedianAccumulator<T> {
     }
 }
 
+/// The median groups accumulator accumulates the raw input values
+///
+/// For calculating the accurate medians of groups, we need to store all values
+/// of groups before final evaluation.
+/// So values in each group will be stored in a `Vec<T>`, and the total group 
values
+/// will be actually organized as a `Vec<Vec<T>>`.
+///
+#[derive(Debug)]
+struct MedianGroupsAccumulator<T: ArrowNumericType + Send> {
+    data_type: DataType,
+    group_values: Vec<Vec<T::Native>>,
+}
+
+impl<T: ArrowNumericType + Send> MedianGroupsAccumulator<T> {
+    pub fn new(data_type: DataType) -> Self {
+        Self {
+            data_type,
+            group_values: Vec::new(),
+        }
+    }
+}
+
+impl<T: ArrowNumericType + Send> GroupsAccumulator for 
MedianGroupsAccumulator<T> {
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indices: &[usize],
+        opt_filter: Option<&BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()> {
+        assert_eq!(values.len(), 1, "single argument to update_batch");
+        let values = values[0].as_primitive::<T>();
+
+        // Push the `not nulls + not filtered` row into its group
+        self.group_values.resize(total_num_groups, Vec::new());
+        accumulate(
+            group_indices,
+            values,
+            opt_filter,
+            |group_index, new_value| {
+                self.group_values[group_index].push(new_value);
+            },
+        );
+
+        Ok(())
+    }
+
+    fn merge_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indices: &[usize],
+        // Since aggregate filter should be applied in partial stage, in final 
stage there should be no filter
+        _opt_filter: Option<&BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()> {
+        assert_eq!(values.len(), 1, "one argument to merge_batch");
+
+        // The merged values should be organized like as a `ListArray` which 
is nullable
+        // (input with nulls usually generated from `convert_to_state`), but 
`inner array` of
+        // `ListArray`  is `non-nullable`.
+        //
+        // Following is the possible and impossible input `values`:
+        //
+        // # Possible values
+        // ```text
+        //   group 0: [1, 2, 3]
+        //   group 1: null (list array is nullable)
+        //   group 2: [6, 7, 8]
+        //   ...
+        //   group n: [...]
+        // ```
+        //
+        // # Impossible values
+        // ```text
+        //   group x: [1, 2, null] (values in list array is non-nullable)
+        // ```
+        //
+        let input_group_values = values[0].as_list::<i32>();
+
+        // Ensure group values big enough
+        self.group_values.resize(total_num_groups, Vec::new());
+
+        // Extend values to related groups
+        // TODO: avoid using iterator of the `ListArray`, this will lead to
+        // many calls of `slice` of its ``inner array`, and `slice` is not
+        // so efficient(due to the calculation of `null_count` for each 
`slice`).
+        group_indices
+            .iter()
+            .zip(input_group_values.iter())
+            .for_each(|(&group_index, values_opt)| {
+                if let Some(values) = values_opt {
+                    let values = values.as_primitive::<T>();
+                    
self.group_values[group_index].extend(values.values().iter());
+                }
+            });
+
+        Ok(())
+    }
+
+    fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+        // Emit values
+        let emit_group_values = emit_to.take_needed(&mut self.group_values);
+
+        // Build offsets
+        let mut offsets = Vec::with_capacity(self.group_values.len() + 1);
+        offsets.push(0);
+        let mut cur_len = 0;
+        for group_value in &emit_group_values {
+            cur_len += group_value.len() as i32;
+            offsets.push(cur_len);
+        }
+        let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
+
+        // Build inner array
+        let flatten_group_values =
+            emit_group_values.into_iter().flatten().collect::<Vec<_>>();
+        let group_values_array =
+            PrimitiveArray::<T>::new(ScalarBuffer::from(flatten_group_values), 
None)
+                .with_data_type(self.data_type.clone());
+
+        // Build the result list array
+        let result_list_array = ListArray::new(
+            Arc::new(Field::new_list_field(self.data_type.clone(), true)),
+            offsets,
+            Arc::new(group_values_array),
+            None,
+        );
+
+        Ok(vec![Arc::new(result_list_array)])
+    }
+
+    fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
+        // Emit values
+        let emit_group_values = emit_to.take_needed(&mut self.group_values);
+
+        // Calculate median for each group
+        let mut evaluate_result_builder =
+            
PrimitiveBuilder::<T>::new().with_data_type(self.data_type.clone());
+        for values in emit_group_values {
+            let median = calculate_median::<T>(values);
+            evaluate_result_builder.append_option(median);
+        }
+
+        Ok(Arc::new(evaluate_result_builder.finish()))
+    }
+
+    fn convert_to_state(
+        &self,
+        values: &[ArrayRef],
+        opt_filter: Option<&BooleanArray>,
+    ) -> Result<Vec<ArrayRef>> {
+        assert_eq!(values.len(), 1, "one argument to merge_batch");
+
+        let input_array = values[0].as_primitive::<T>();
+
+        // Directly convert the input array to states, each row will be
+        // seen as a respective group.
+        // For detail, the `input_array` will be converted to a `ListArray`.
+        // And if row is `not null + not filtered`, it will be converted to a 
list
+        // with only one element; otherwise, this row in `ListArray` will be 
set
+        // to null.
+
+        // Reuse values buffer in `input_array` to build `values` in 
`ListArray`
+        let values = PrimitiveArray::<T>::new(input_array.values().clone(), 
None)
+            .with_data_type(self.data_type.clone());
+
+        // `offsets` in `ListArray`, each row as a list element
+        let offsets = (0..=input_array.len() as i32).collect::<Vec<_>>();
+        let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));

Review Comment:
   Done.
   It is easy to ensure all check in `OffsetBuffer::new` can be passed by adding
   ```rust
           assert!(input_array.len() <= i32::MAX as usize);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to