neilconway commented on code in PR #20504:
URL: https://github.com/apache/datafusion/pull/20504#discussion_r2848950640


##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -414,6 +435,293 @@ impl Accumulator for ArrayAggAccumulator {
     }
 }
 
+#[derive(Debug)]
+struct ArrayAggGroupsAccumulator {
+    datatype: DataType,
+    ignore_nulls: bool,
+    /// Input batches received via `update_batch`.
+    batches: Vec<ArrayRef>,
+    /// Per-group list of `(batch_index, row_index)` pairs into `batches`.
+    indices: Vec<Vec<(u32, u32)>>,
+    /// Number of index entries referencing each batch.
+    batch_refcounts: Vec<u32>,
+    /// Per-group array chunks from `merge_batch`.
+    merged: Vec<Vec<ArrayRef>>,
+}
+
+impl ArrayAggGroupsAccumulator {
+    fn new(datatype: DataType, ignore_nulls: bool) -> Self {
+        Self {
+            datatype,
+            ignore_nulls,
+            batches: Vec::new(),
+            indices: Vec::new(),
+            batch_refcounts: Vec::new(),
+            merged: Vec::new(),
+        }
+    }
+}
+
+impl GroupsAccumulator for ArrayAggGroupsAccumulator {
+    /// Store references to each input batch and record per-group
+    /// `(batch_index, row_index)` pairs. Materialization is deferred
+    /// to `evaluate`, which minimizes the work done per-batch.
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indices: &[usize],
+        opt_filter: Option<&BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()> {
+        assert_eq!(values.len(), 1, "single argument to update_batch");
+        let input = &values[0];
+
+        self.indices.resize_with(total_num_groups, Vec::new);
+        self.merged.resize_with(total_num_groups, Vec::new);
+
+        let nulls = if self.ignore_nulls {
+            input.logical_nulls()
+        } else {
+            None
+        };
+
+        let batch_idx = self.batches.len();
+        let mut batch_pushed = false;
+
+        for (row_idx, &group_idx) in group_indices.iter().enumerate() {
+            // Skip filtered rows
+            if let Some(filter) = opt_filter
+                && (filter.is_null(row_idx) || !filter.value(row_idx))
+            {
+                continue;
+            }
+
+            // Skip null values when ignore_nulls is set
+            if let Some(ref nulls) = nulls
+                && nulls.is_null(row_idx)
+            {
+                continue;
+            }
+
+            if !batch_pushed {
+                self.batches.push(Arc::clone(input));
+                self.batch_refcounts.push(0);
+                batch_pushed = true;
+            }
+            self.batch_refcounts[batch_idx] += 1;
+            self.indices[group_idx].push((batch_idx as u32, row_idx as u32));
+        }
+
+        Ok(())
+    }
+
+    fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
+        let emit_indices = emit_to.take_needed(&mut self.indices);
+        let emit_merged = emit_to.take_needed(&mut self.merged);
+        let num_groups = emit_indices.len();
+
+        let mut offsets = Vec::<i32>::with_capacity(num_groups + 1);
+        offsets.push(0);
+        let mut nulls_builder = NullBufferBuilder::new(num_groups);
+        let mut cur_offset = 0i32;
+
+        // Build ListArray offsets and nulls: groups with no elements
+        // are null, others occupy offsets[i]..offsets[i+1] in the
+        // flat values array.
+        for (group_indices, group_merged) in 
emit_indices.iter().zip(emit_merged.iter()) {
+            let merged_len = group_merged.iter().map(|a| 
a.len()).sum::<usize>();
+            let total_len = group_indices.len() + merged_len;
+
+            if total_len == 0 {
+                nulls_builder.append_null();
+            } else {
+                nulls_builder.append_non_null();
+            }
+
+            cur_offset += total_len as i32;

Review Comment:
   Yep, might as well check it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to