UBarney commented on code in PR #15266:
URL: https://github.com/apache/datafusion/pull/15266#discussion_r2002297890

##########
datafusion/functions-aggregate/src/first_last.rs:
##########
@@ -179,6 +292,424 @@ impl AggregateUDFImpl for FirstValue {
     }
 }
 
+struct FirstPrimitiveGroupsAccumulator<T>
+where
+    T: ArrowPrimitiveType + Send,
+{
+    // ================ state ===========
+    vals: Vec<T::Native>,
+    // Stores ordering values, of the aggregator requirement corresponding to 
first value
+    // of the aggregator.
+    // The `orderings` are stored row-wise, meaning that `orderings[group_idx]`
+    // represents the ordering values corresponding to the `group_idx`-th 
group.
+    orderings: Vec<Vec<ScalarValue>>,
+    // At the beginning, `is_sets[group_idx]` is false, which means `first` is 
not seen yet.
+    // Once we see the first value, we set the `is_sets[group_idx]` flag
+    is_sets: BooleanBufferBuilder,
+    // null_builder[group_idx] == false => vals[group_idx] is null
+    null_builder: BooleanBufferBuilder,
+    // size of `self.orderings`
+    // Calculating the memory usage of `self.orderings` using 
`ScalarValue::size_of_vec` is quite costly.
+    // Therefore, we cache it and compute `size_of` only after each update
+    // to avoid calling `ScalarValue::size_of_vec` by Self.size.
+    size_of_orderings: usize,
+
+    // =========== option ============
+
+    // Stores the applicable ordering requirement.
+    ordering_req: LexOrdering,
+    // derived from `ordering_req`.
+    sort_options: Vec<SortOptions>,
+    // Stores whether incoming data already satisfies the ordering requirement.
+    input_requirement_satisfied: bool,
+    // Ignore null values.
+    ignore_nulls: bool,
+    /// The output type
+    data_type: DataType,
+    default_orderings: Vec<ScalarValue>,
+}
+
+impl<T> FirstPrimitiveGroupsAccumulator<T>
+where
+    T: ArrowPrimitiveType + Send,
+{
+    fn try_new(
+        ordering_req: LexOrdering,
+        ignore_nulls: bool,
+        data_type: &DataType,
+        ordering_dtypes: &[DataType],
+    ) -> Result<Self> {
+        let requirement_satisfied = ordering_req.is_empty();
+
+        let default_orderings = ordering_dtypes
+            .iter()
+            .map(ScalarValue::try_from)
+            .collect::<Result<Vec<_>>>()?;
+
+        let sort_options = get_sort_options(ordering_req.as_ref());
+
+        Ok(Self {
+            null_builder: BooleanBufferBuilder::new(0),
+            ordering_req,
+            sort_options,
+            input_requirement_satisfied: requirement_satisfied,
+            ignore_nulls,
+            default_orderings,
+            data_type: data_type.clone(),
+            vals: Vec::new(),
+            orderings: Vec::new(),
+            is_sets: BooleanBufferBuilder::new(0),
+            size_of_orderings: 0,
+        })
+    }
+
+    fn need_update(&self, group_idx: usize) -> bool {
+        if !self.is_sets.get_bit(group_idx) {
+            return true;
+        }
+
+        if self.ignore_nulls && !self.null_builder.get_bit(group_idx) {
+            return true;
+        }
+
+        !self.input_requirement_satisfied
+    }
+
+    fn should_update_state(
+        &self,
+        group_idx: usize,
+        new_ordering_values: &[ScalarValue],
+    ) -> Result<bool> {
+        if !self.is_sets.get_bit(group_idx) {
+            return Ok(true);
+        }
+
+        assert!(new_ordering_values.len() == self.ordering_req.len());
+        let current_ordering = &self.orderings[group_idx];
+        compare_rows(current_ordering, new_ordering_values, &self.sort_options)
+            .map(|x| x.is_gt())
+    }
+
+    fn take_orderings(&mut self, emit_to: EmitTo) -> Vec<Vec<ScalarValue>> {
+        let result = emit_to.take_needed(&mut self.orderings);
+
+        match emit_to {
+            EmitTo::All => self.size_of_orderings = 0,
+            EmitTo::First(_) => {
+                self.size_of_orderings -=
+                    result.iter().map(ScalarValue::size_of_vec).sum::<usize>()
+            }
+        }
+
+        result
+    }
+
+    fn take_need(
+        bool_buf_builder: &mut BooleanBufferBuilder,
+        emit_to: EmitTo,
+    ) -> BooleanBuffer {
+        let bool_buf = bool_buf_builder.finish();
+        match emit_to {
+            EmitTo::All => bool_buf,
+            EmitTo::First(n) => {
+                // split off the first N values in seen_values
+                //
+                // TODO make this more efficient rather than two
+                // copies and bitwise manipulation
+                let first_n: BooleanBuffer = bool_buf.iter().take(n).collect();
+                // reset the existing buffer
+                for b in bool_buf.iter().skip(n) {
+                    bool_buf_builder.append(b);
+                }
+                first_n
+            }
+        }
+    }
+
+    fn resize_states(&mut self, new_size: usize) {
+        self.vals.resize(new_size, T::default_value());
+
+        if self.null_builder.len() < new_size {
+            self.null_builder
+                .append_n(new_size - self.null_builder.len(), false);
+        }
+
+        if self.orderings.len() < new_size {
+            let current_len = self.orderings.len();
+
+            self.orderings
+                .resize(new_size, self.default_orderings.clone());
+
+            self.size_of_orderings += (new_size - current_len)
+                * ScalarValue::size_of_vec(
+                    // Note: In some cases (such as in the unit test below)
+                    // ScalarValue::size_of_vec(&self.default_orderings) != 
ScalarValue::size_of_vec(&self.default_orderings.clone())
+                    // This may be caused by the different vec.capacity() 
values?
+                    self.orderings.last().unwrap(),
+                );
+        }
+
+        if self.is_sets.len() < new_size {
+            self.is_sets.append_n(new_size - self.is_sets.len(), false);
+        }

Review Comment:
   Yes. Using `.resize` is better approach. 
https://github.com/apache/datafusion/pull/15266/commits/56b11c479dc56dec47da5b905b0cd690a9770c74



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to