jayzhan211 commented on code in PR #13991: URL: https://github.com/apache/datafusion/pull/13991#discussion_r1909649089
########## datafusion/functions-aggregate/src/min_max/min_max_generic.rs: ########## @@ -0,0 +1,491 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::min_max::{MovingMax, MovingMin}; +use arrow::array::ArrayRef; +use arrow::datatypes::DataType; +use arrow::row::{OwnedRow, Row, RowConverter, SortField}; +use arrow_schema::SortOptions; +use datafusion_common::Result; +use datafusion_common::ScalarValue; +use datafusion_expr::Accumulator; +use std::fmt::Debug; +use std::marker::PhantomData; +use std::mem::size_of_val; +use std::ops::Deref; + +pub(crate) type GenericMinAccumulator = GenericMinMaxAccumulator<MinAccumulatorHelper>; +pub(crate) type GenericMaxAccumulator = GenericMinMaxAccumulator<MaxAccumulatorHelper>; + +pub(crate) type GenericSlidingMinAccumulator = + GenericSlidingMinMaxAccumulator<GenericMovingMin>; +pub(crate) type GenericSlidingMaxAccumulator = + GenericSlidingMinMaxAccumulator<GenericMovingMax>; + +fn convert_row_to_scalar( + row_converter: &RowConverter, + owned_row: &OwnedRow, +) -> Result<ScalarValue> { + // Convert the row back to array so we can return it + let converted = row_converter.convert_rows(vec![owned_row.row()])?; + + // Get the first value from the array (as we only have one row) + ScalarValue::try_from_array(converted[0].deref(), 0) +} + +/// Helper trait for min/max accumulators to avoid code duplication +pub(crate) trait GenericMinMaxAccumulatorHelper: Debug + Send + Sync { + /// Return true if the new value should replace the current value + /// for minimum the new value should be less than the current value + /// for maximum the new value should be greater than the current value + fn should_replace<'a>(current: &Row<'a>, possibly_new: &Row<'a>) -> bool; + + /// Get the minimum/maximum value from an iterator + fn get_value_from_iter<Item: Ord + Sized, I: Iterator<Item = Item>>( + iter: I, + ) -> Option<Item>; +} + +#[derive(Debug)] +pub(crate) struct MinAccumulatorHelper; + +impl GenericMinMaxAccumulatorHelper for MinAccumulatorHelper { + /// Should replace the current value if the new value is less than the current value + #[inline] + fn should_replace<'a>(current: &Row<'a>, possibly_new: &Row<'a>) -> bool { + current > possibly_new + } + + #[inline] + fn get_value_from_iter<Item: Ord + Sized, I: Iterator<Item = Item>>( + iter: I, + ) -> Option<Item> { + iter.min() + } +} + +#[derive(Debug)] +pub(crate) struct MaxAccumulatorHelper; + +impl GenericMinMaxAccumulatorHelper for MaxAccumulatorHelper { + /// Should replace the current value if the new value is greater than the current value + #[inline] + fn should_replace<'a>(current: &Row<'a>, possibly_new: &Row<'a>) -> bool { + current < possibly_new + } + + #[inline] + fn get_value_from_iter<Item: Ord + Sized, I: Iterator<Item = Item>>( + iter: I, + ) -> Option<Item> { + iter.max() + } +} + +/// Accumulator for min/max of lists +/// this accumulator is using arrow-row as the internal representation and a way for comparing +#[derive(Debug)] +pub(crate) struct GenericMinMaxAccumulator<Helper> +where + Helper: GenericMinMaxAccumulatorHelper, +{ + /// Convert the columns to row so we can compare them + row_converter: RowConverter, + + /// The current minimum/maximum value + current_value: Option<OwnedRow>, + + /// Null row to use for fast filtering + null_row: OwnedRow, + + phantom_data: PhantomData<Helper>, +} + +impl<Helper> GenericMinMaxAccumulator<Helper> +where + Helper: GenericMinMaxAccumulatorHelper, +{ + pub fn try_new(datatype: &DataType) -> Result<Self> { + let converter = RowConverter::new(vec![ + (SortField::new_with_options( + datatype.clone(), + SortOptions { + descending: false, + nulls_first: true, + }, + )), + ])?; + + // Create a null row to use for filtering out nulls from the input + let null_row = { + let null_array = ScalarValue::try_from(datatype)?.to_array_of_size(1)?; Review Comment: ```suggestion let null_array = new_null_array(datatype, 1); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org