korowa commented on code in PR #11627:
URL: https://github.com/apache/datafusion/pull/11627#discussion_r1702847927


##########
datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs:
##########
@@ -134,6 +136,51 @@ where
         self.update_batch(values, group_indices, opt_filter, total_num_groups)
     }
 
+    fn convert_to_state(
+        &self,
+        values: &[ArrayRef],
+        opt_filter: Option<&BooleanArray>,
+    ) -> Result<Vec<ArrayRef>> {
+        let values = values[0].as_primitive::<T>();
+
+        // Initializing state with starting values
+        let initial_state =
+            PrimitiveArray::<T>::from_value(self.starting_value, values.len());
+
+        // Recalculating values in case there is filter
+        let values = match opt_filter {
+            None => values,
+            Some(filter) => {
+                let (filter_values, filter_nulls) = 
filter.clone().into_parts();
+                // Calculating filter mask as a result of bitand of filter, 
and converting it to null buffer
+                let filter_bool = match filter_nulls {
+                    Some(filter_nulls) => filter_nulls.inner() & 
&filter_values,
+                    None => filter_values,
+                };
+                let filter_nulls = NullBuffer::from(filter_bool);
+
+                // Rebuilding input values with a new nulls mask, which is 
equal to
+                // the union of original nulls and filter mask
+                let (dt, values_buf, original_nulls) = 
values.clone().into_parts();
+                let nulls_buf =
+                    NullBuffer::union(original_nulls.as_ref(), 
Some(&filter_nulls));
+                &PrimitiveArray::<T>::new(values_buf, 
nulls_buf).with_data_type(dt)
+            }
+        };
+
+        let state_values = compute::binary_mut(initial_state, values, |mut x, 
y| {
+            (self.prim_fn)(&mut x, y);
+            x
+        });
+        let state_values = state_values.unwrap().unwrap();

Review Comment:
   Unwraps were the part of in progress work -- I've replaced them with proper 
error handling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to