alamb commented on code in PR #13991: URL: https://github.com/apache/datafusion/pull/13991#discussion_r1903111345
########## datafusion/functions-aggregate/src/min_max.rs: ########## @@ -230,7 +235,13 @@ impl AggregateUDFImpl for Max { } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> { - Ok(Box::new(MaxAccumulator::try_new(acc_args.return_type)?)) + if acc_args.return_type.is_nested() { + Ok(Box::new(GenericMaxAccumulator::try_new( + acc_args.return_type, + )?)) Review Comment: If you wanted to make this less verbose you could probably do something like ```suggestion GenericMaxAccumulator::try_new( acc_args.return_type, ).map(Box::new) ``` ########## datafusion/functions-aggregate/src/min_max.rs: ########## @@ -230,7 +235,13 @@ impl AggregateUDFImpl for Max { } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> { - Ok(Box::new(MaxAccumulator::try_new(acc_args.return_type)?)) + if acc_args.return_type.is_nested() { + Ok(Box::new(GenericMaxAccumulator::try_new( + acc_args.return_type, + )?)) + } else { + Ok(Box::new(MaxAccumulator::try_new(acc_args.return_type)?)) Review Comment: The pattern used in other accumulators is slightly different I think -- It would have a function on `MaxAccumulator` that explicitly declared the types it supported and then fallback to the `GenericMaxAccumulator` if not. For example, something like ```rust if MaxAccumulator::supports_type(&acc_args.return_type) { Ok(Box::new(MaxAccumulator::try_new(acc_args.return_type)?)) } else { Ok(Box::new(GenericMaxAccumulator::try_new( acc_args.return_type, )?)) } ``` This is a minor detail and not required, I just wanted to point it out ########## datafusion/sqllogictest/test_files/array_query.slt: ########## @@ -108,11 +108,15 @@ SELECT * FROM data WHERE column2 is not distinct from null; # Aggregates ########### -query error Internal error: Min/Max accumulator not implemented for type List +query ? SELECT min(column1) FROM data; Review Comment: Can we also add tests for empty input on min/max as well to test boundary conditions? Something like ```sql SELECT MIN(column1) FROM data WHERE false ``` Which should return a null value? ########## datafusion/functions-aggregate/src/min_max/min_max_generic.rs: ########## @@ -0,0 +1,491 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::min_max::{MovingMax, MovingMin}; +use arrow::array::ArrayRef; +use arrow::datatypes::DataType; +use arrow::row::{OwnedRow, Row, RowConverter, SortField}; +use arrow_schema::SortOptions; +use datafusion_common::Result; +use datafusion_common::ScalarValue; +use datafusion_expr::Accumulator; +use std::fmt::Debug; +use std::marker::PhantomData; +use std::mem::size_of_val; +use std::ops::Deref; + +pub(crate) type GenericMinAccumulator = GenericMinMaxAccumulator<MinAccumulatorHelper>; +pub(crate) type GenericMaxAccumulator = GenericMinMaxAccumulator<MaxAccumulatorHelper>; + +pub(crate) type GenericSlidingMinAccumulator = + GenericSlidingMinMaxAccumulator<GenericMovingMin>; +pub(crate) type GenericSlidingMaxAccumulator = + GenericSlidingMinMaxAccumulator<GenericMovingMax>; + +fn convert_row_to_scalar( + row_converter: &RowConverter, + owned_row: &OwnedRow, +) -> Result<ScalarValue> { + // Convert the row back to array so we can return it + let converted = row_converter.convert_rows(vec![owned_row.row()])?; + + // Get the first value from the array (as we only have one row) + ScalarValue::try_from_array(converted[0].deref(), 0) +} + +/// Helper trait for min/max accumulators to avoid code duplication +pub(crate) trait GenericMinMaxAccumulatorHelper: Debug + Send + Sync { + /// Return true if the new value should replace the current value + /// for minimum the new value should be less than the current value + /// for maximum the new value should be greater than the current value + fn should_replace<'a>(current: &Row<'a>, possibly_new: &Row<'a>) -> bool; + + /// Get the minimum/maximum value from an iterator + fn get_value_from_iter<Item: Ord + Sized, I: Iterator<Item = Item>>( + iter: I, + ) -> Option<Item>; +} + +#[derive(Debug)] +pub(crate) struct MinAccumulatorHelper; + +impl GenericMinMaxAccumulatorHelper for MinAccumulatorHelper { + /// Should replace the current value if the new value is less than the current value + #[inline] + fn should_replace<'a>(current: &Row<'a>, possibly_new: &Row<'a>) -> bool { + current > possibly_new + } + + #[inline] + fn get_value_from_iter<Item: Ord + Sized, I: Iterator<Item = Item>>( + iter: I, + ) -> Option<Item> { + iter.min() + } +} + +#[derive(Debug)] +pub(crate) struct MaxAccumulatorHelper; + +impl GenericMinMaxAccumulatorHelper for MaxAccumulatorHelper { + /// Should replace the current value if the new value is greater than the current value + #[inline] + fn should_replace<'a>(current: &Row<'a>, possibly_new: &Row<'a>) -> bool { + current < possibly_new + } + + #[inline] + fn get_value_from_iter<Item: Ord + Sized, I: Iterator<Item = Item>>( + iter: I, + ) -> Option<Item> { + iter.max() + } +} + +/// Accumulator for min/max of lists +/// this accumulator is using arrow-row as the internal representation and a way for comparing +#[derive(Debug)] +pub(crate) struct GenericMinMaxAccumulator<Helper> +where + Helper: GenericMinMaxAccumulatorHelper, +{ + /// Convert the columns to row so we can compare them + row_converter: RowConverter, + + /// The current minimum/maximum value + current_value: Option<OwnedRow>, + + /// Null row to use for fast filtering + null_row: OwnedRow, + + phantom_data: PhantomData<Helper>, +} + +impl<Helper> GenericMinMaxAccumulator<Helper> +where + Helper: GenericMinMaxAccumulatorHelper, +{ + pub fn try_new(datatype: &DataType) -> Result<Self> { + let converter = RowConverter::new(vec![ + (SortField::new_with_options( + datatype.clone(), + SortOptions { + descending: false, + nulls_first: true, + }, + )), + ])?; + + // Create a null row to use for filtering out nulls from the input + let null_row = { + let null_array = ScalarValue::try_from(datatype)?.to_array_of_size(1)?; + + let rows = converter.convert_columns(&[null_array])?; + + rows.row(0).owned() + }; + + Ok(Self { + row_converter: converter, + null_row, + current_value: None, + phantom_data: PhantomData, + }) + } +} + +impl<Helper> Accumulator for GenericMinMaxAccumulator<Helper> +where + Helper: GenericMinMaxAccumulatorHelper, +{ + fn state(&mut self) -> Result<Vec<ScalarValue>> { + Ok(vec![self.evaluate()?]) + } + + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let rows = self.row_converter.convert_columns(values)?; + + let wanted_value_in_batch = Helper::get_value_from_iter( + rows.iter() + // Filter out nulls + .filter(|row| row != &self.null_row.row()), + ); + + match (&self.current_value, wanted_value_in_batch) { + // Update the minimum/maximum based on the should replace logic + (Some(current_val), Some(current_val_in_batch)) => { + if Helper::should_replace(¤t_val.row(), ¤t_val_in_batch) { + self.current_value = Some(current_val_in_batch.owned()); + } + } + // First minimum/maximum for the accumulator + (None, Some(new_value)) => { + self.current_value = Some(new_value.owned()); + } + // If no minimum/maximum in batch (all nulls), do nothing + (_, None) => {} + } + + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.update_batch(states) + } + + fn evaluate(&mut self) -> Result<ScalarValue> { + // Get the current value or null if no value has been seen + let current_value = self.current_value.as_ref().unwrap_or(&self.null_row); + + convert_row_to_scalar(&self.row_converter, current_value) + } + + fn size(&self) -> usize { + size_of_val(self) - size_of_val(&self.row_converter) + self.row_converter.size() + } +} + +/// Helper trait for sliding min/max accumulators to avoid code duplication +pub(crate) trait GenericSlidingMinMaxAccumulatorHelper: + Debug + Default + Send + Sync +{ + fn push(&mut self, row: OwnedRow); + + fn pop(&mut self); + + fn value(&self) -> Option<&OwnedRow>; +} + +#[derive(Debug, Default)] +pub(crate) struct GenericMovingMin(MovingMin<OwnedRow>); + +impl GenericSlidingMinMaxAccumulatorHelper for GenericMovingMin { + fn push(&mut self, row: OwnedRow) { + self.0.push(row); + } + + fn pop(&mut self) { + self.0.pop(); + } + + fn value(&self) -> Option<&OwnedRow> { + self.0.min() + } +} + +#[derive(Debug, Default)] +pub(crate) struct GenericMovingMax(MovingMax<OwnedRow>); + +impl GenericSlidingMinMaxAccumulatorHelper for GenericMovingMax { + fn push(&mut self, row: OwnedRow) { + self.0.push(row); + } + + fn pop(&mut self) { + self.0.pop(); + } + + fn value(&self) -> Option<&OwnedRow> { + self.0.max() + } +} + +#[derive(Debug)] +pub(crate) struct GenericSlidingMinMaxAccumulator< + MovingWindowHelper: GenericSlidingMinMaxAccumulatorHelper, +> { + /// Convert the columns to row so we can compare them + row_converter: RowConverter, + + /// The current minimum value + current_value: Option<OwnedRow>, + + moving_helper: MovingWindowHelper, + + /// Null row to use for fast filtering + null_row: OwnedRow, +} + +impl<MovingWindowHelper: GenericSlidingMinMaxAccumulatorHelper> + GenericSlidingMinMaxAccumulator<MovingWindowHelper> +{ + pub fn try_new(datatype: &DataType) -> Result<Self> { + let converter = RowConverter::new(vec![ + (SortField::new_with_options( + datatype.clone(), + SortOptions { + descending: false, + nulls_first: true, + }, + )), + ])?; + + // Create a null row to use for filtering out nulls from the input + let null_row = { + let null_array = ScalarValue::try_from(datatype)?.to_array_of_size(1)?; + + let rows = converter.convert_columns(&[null_array])?; + + rows.row(0).owned() + }; + + Ok(Self { + row_converter: converter, + null_row, + current_value: None, + moving_helper: MovingWindowHelper::default(), + }) + } +} + +impl<MovingWindowHelper: GenericSlidingMinMaxAccumulatorHelper> Accumulator + for GenericSlidingMinMaxAccumulator<MovingWindowHelper> +{ + fn state(&mut self) -> Result<Vec<ScalarValue>> { + // Get the current value or null if no value has been seen + let min = self.current_value.as_ref().unwrap_or(&self.null_row); + + Ok(vec![convert_row_to_scalar(&self.row_converter, min)?]) + } + + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + // TODO - should assert getting only one column? Review Comment: I think it would be nice to either add the assert or remove the TODO as part of this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org