This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d5aea4b5a perf: Optimize `array_min`, `array_max` for arrays of 
primitive types (#21101)
4d5aea4b5a is described below

commit 4d5aea4b5ad3f3e172f9b7dafae30069c4204bfb
Author: Neil Conway <[email protected]>
AuthorDate: Tue Mar 24 16:41:55 2026 -0400

    perf: Optimize `array_min`, `array_max` for arrays of primitive types 
(#21101)
    
    ## Which issue does this PR close?
    
    - Closes #21100.
    
    ## Rationale for this change
    
    In the current implementation, we construct a `PrimitiveArray` for each
    row, feed it to the Arrow `min` / `max` kernel, and then collect the
    resulting `ScalarValue`s in a `Vec`. We then construct a final
    `PrimitiveArray` for the result via `ScalarValue::iter_to_array` of the
    `Vec`.
    
    We can do better for ListArrays of primitive types. First, we can
    iterate directly over the flat values buffer of the `ListArray` for the
    batch and compute the min/max from each row's slice directly. Second,
    Arrow's `min` / `max` kernels have a reasonable amount of per-call
    overhead; for small arrays, it is more efficient to compute the min/max
    ourselves via direct iteration.
    
    Benchmarks (8192 rows, arrays of int64 values, M4 Max):
    
      - no_nulls / list_size=10: 309 µs → 26.6 µs (11.6x faster)
      - no_nulls / list_size=100: 392 µs → 150 µs (2.6x faster)
      - no_nulls / list_size=1000: 1.20 ms → 951 µs (1.26x faster)
      - nulls / list_size=10: 385 µs → 69.0 µs (5.6x faster)
      - nulls / list_size=100: 790 µs → 616 µs (1.28x faster)
      - nulls / list_size=1000: 5.34 ms → 5.21 ms (1.02x faster)
    
    ## What changes are included in this PR?
    
    * Add benchmark for `array_max`
    * Expand SLT test coverage
    * Implement optimization
    
    ## Are these changes tested?
    
    Yes.
    
    ## Are there any user-facing changes?
    
    No.
---
 datafusion/functions-nested/Cargo.toml             |   4 +
 .../functions-nested/benches/array_min_max.rs      | 121 +++++++++++++++++++++
 datafusion/functions-nested/src/min_max.rs         | 118 ++++++++++++++++++--
 datafusion/sqllogictest/test_files/array.slt       | 115 ++++++++++++++++++++
 4 files changed, 351 insertions(+), 7 deletions(-)

diff --git a/datafusion/functions-nested/Cargo.toml 
b/datafusion/functions-nested/Cargo.toml
index 2ce9532a22..6e96a44fc9 100644
--- a/datafusion/functions-nested/Cargo.toml
+++ b/datafusion/functions-nested/Cargo.toml
@@ -70,6 +70,10 @@ rand = { workspace = true }
 harness = false
 name = "array_concat"
 
+[[bench]]
+harness = false
+name = "array_min_max"
+
 [[bench]]
 harness = false
 name = "array_expression"
diff --git a/datafusion/functions-nested/benches/array_min_max.rs 
b/datafusion/functions-nested/benches/array_min_max.rs
new file mode 100644
index 0000000000..391a84d148
--- /dev/null
+++ b/datafusion/functions-nested/benches/array_min_max.rs
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int64Array, ListArray};
+use arrow::buffer::{NullBuffer, OffsetBuffer};
+use arrow::datatypes::{DataType, Field};
+use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
+use datafusion_common::config::ConfigOptions;
+use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
+use datafusion_functions_nested::min_max::ArrayMax;
+use rand::rngs::StdRng;
+use rand::{Rng, SeedableRng};
+
+const NUM_ROWS: usize = 8192;
+const SEED: u64 = 42;
+const LIST_NULL_DENSITY: f64 = 0.1;
+const ELEMENT_NULL_DENSITY: f64 = 0.1;
+
+fn create_int64_list_array(
+    num_rows: usize,
+    list_size: usize,
+    element_null_density: f64,
+) -> ArrayRef {
+    let mut rng = StdRng::seed_from_u64(SEED);
+    let total_values = num_rows * list_size;
+
+    if element_null_density > 0.0 {
+        let values: Vec<Option<i64>> = (0..total_values)
+            .map(|_| {
+                if rng.random::<f64>() < element_null_density {
+                    None
+                } else {
+                    Some(rng.random::<i64>() % 10_000)
+                }
+            })
+            .collect();
+        let values_array = Arc::new(Int64Array::from(values));
+
+        let offsets: Vec<i32> = (0..=num_rows).map(|i| (i * list_size) as 
i32).collect();
+        let nulls: Vec<bool> = (0..num_rows)
+            .map(|_| rng.random::<f64>() >= LIST_NULL_DENSITY)
+            .collect();
+
+        Arc::new(ListArray::new(
+            Arc::new(Field::new("item", DataType::Int64, true)),
+            OffsetBuffer::new(offsets.into()),
+            values_array,
+            Some(NullBuffer::from(nulls)),
+        ))
+    } else {
+        // No element nulls — values array has no null buffer
+        let values: Vec<i64> = (0..total_values)
+            .map(|_| rng.random::<i64>() % 10_000)
+            .collect();
+        let values_array = Arc::new(Int64Array::from(values));
+
+        let offsets: Vec<i32> = (0..=num_rows).map(|i| (i * list_size) as 
i32).collect();
+        let nulls: Vec<bool> = (0..num_rows)
+            .map(|_| rng.random::<f64>() >= LIST_NULL_DENSITY)
+            .collect();
+
+        Arc::new(ListArray::new(
+            Arc::new(Field::new("item", DataType::Int64, false)),
+            OffsetBuffer::new(offsets.into()),
+            values_array,
+            Some(NullBuffer::from(nulls)),
+        ))
+    }
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+    let udf = ArrayMax::new();
+    let config_options = Arc::new(ConfigOptions::default());
+
+    for list_size in [10, 100, 1000] {
+        for (label, null_density) in [("nulls", ELEMENT_NULL_DENSITY), 
("no_nulls", 0.0)]
+        {
+            let list_array = create_int64_list_array(NUM_ROWS, list_size, 
null_density);
+            let args = vec![ColumnarValue::Array(Arc::clone(&list_array))];
+            let arg_fields =
+                vec![Field::new("arg_0", list_array.data_type().clone(), 
true).into()];
+            let return_field: Arc<Field> = Field::new("f", DataType::Int64, 
true).into();
+
+            c.bench_with_input(
+                BenchmarkId::new("array_max", 
format!("{label}/list_size={list_size}")),
+                &list_array,
+                |b, _| {
+                    b.iter(|| {
+                        udf.invoke_with_args(ScalarFunctionArgs {
+                            args: args.clone(),
+                            arg_fields: arg_fields.clone(),
+                            number_rows: NUM_ROWS,
+                            return_field: return_field.clone(),
+                            config_options: config_options.clone(),
+                        })
+                        .unwrap()
+                    });
+                },
+            );
+        }
+    }
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/datafusion/functions-nested/src/min_max.rs 
b/datafusion/functions-nested/src/min_max.rs
index e3603b731f..c65de1d9b9 100644
--- a/datafusion/functions-nested/src/min_max.rs
+++ b/datafusion/functions-nested/src/min_max.rs
@@ -15,9 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! [`ScalarUDFImpl`] definitions for array_max function.
+//! [`ScalarUDFImpl`] definitions for array_min and array_max functions.
 use crate::utils::make_scalar_function;
-use arrow::array::{ArrayRef, GenericListArray, OffsetSizeTrait};
+use arrow::array::{
+    Array, ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, AsArray, 
GenericListArray,
+    OffsetSizeTrait, PrimitiveBuilder, downcast_primitive,
+};
 use arrow::datatypes::DataType;
 use arrow::datatypes::DataType::{LargeList, List};
 use datafusion_common::Result;
@@ -32,6 +35,7 @@ use 
datafusion_functions_aggregate_common::min_max::{max_batch, min_batch};
 use datafusion_macros::user_doc;
 use itertools::Itertools;
 use std::any::Any;
+use std::sync::Arc;
 
 make_udf_expr_and_func!(
     ArrayMax,
@@ -116,8 +120,8 @@ impl ScalarUDFImpl for ArrayMax {
 fn array_max_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
     let [array] = take_function_args("array_max", args)?;
     match array.data_type() {
-        List(_) => array_min_max_helper(as_list_array(array)?, max_batch),
-        LargeList(_) => array_min_max_helper(as_large_list_array(array)?, 
max_batch),
+        List(_) => array_min_max_helper(as_list_array(array)?, false),
+        LargeList(_) => array_min_max_helper(as_large_list_array(array)?, 
false),
         arg_type => exec_err!("array_max does not support type: {arg_type}"),
     }
 }
@@ -198,16 +202,23 @@ impl ScalarUDFImpl for ArrayMin {
 fn array_min_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
     let [array] = take_function_args("array_min", args)?;
     match array.data_type() {
-        List(_) => array_min_max_helper(as_list_array(array)?, min_batch),
-        LargeList(_) => array_min_max_helper(as_large_list_array(array)?, 
min_batch),
+        List(_) => array_min_max_helper(as_list_array(array)?, true),
+        LargeList(_) => array_min_max_helper(as_large_list_array(array)?, 
true),
         arg_type => exec_err!("array_min does not support type: {arg_type}"),
     }
 }
 
 fn array_min_max_helper<O: OffsetSizeTrait>(
     array: &GenericListArray<O>,
-    agg_fn: fn(&ArrayRef) -> Result<ScalarValue>,
+    is_min: bool,
 ) -> Result<ArrayRef> {
+    // Try the primitive fast path first
+    if let Some(result) = try_primitive_array_min_max(array, is_min) {
+        return result;
+    }
+
+    // Fallback: per-row ScalarValue path for non-primitive types
+    let agg_fn = if is_min { min_batch } else { max_batch };
     let null_value = ScalarValue::try_from(array.value_type())?;
     let result_vec: Vec<ScalarValue> = array
         .iter()
@@ -215,3 +226,96 @@ fn array_min_max_helper<O: OffsetSizeTrait>(
         .try_collect()?;
     ScalarValue::iter_to_array(result_vec)
 }
+
+/// Dispatches to a typed primitive min/max implementation, or returns `None` 
if
+/// the element type is not a primitive.
+fn try_primitive_array_min_max<O: OffsetSizeTrait>(
+    list_array: &GenericListArray<O>,
+    is_min: bool,
+) -> Option<Result<ArrayRef>> {
+    macro_rules! helper {
+        ($t:ty) => {
+            return Some(primitive_array_min_max::<O, $t>(list_array, is_min))
+        };
+    }
+    downcast_primitive! {
+        list_array.value_type() => (helper),
+        _ => {}
+    }
+    None
+}
+
+/// Threshold to switch from direct iteration to using `min` / `max` kernel 
from
+/// `arrow::compute`. The latter has enough per-invocation overhead that direct
+/// iteration is faster for small lists.
+const ARROW_COMPUTE_THRESHOLD: usize = 32;
+
+/// Computes min or max for each row of a primitive ListArray.
+fn primitive_array_min_max<O: OffsetSizeTrait, T: ArrowPrimitiveType>(
+    list_array: &GenericListArray<O>,
+    is_min: bool,
+) -> Result<ArrayRef> {
+    let values_array = list_array.values().as_primitive::<T>();
+    let values_slice = values_array.values();
+    let values_nulls = values_array.nulls();
+    let mut result_builder = 
PrimitiveBuilder::<T>::with_capacity(list_array.len())
+        .with_data_type(values_array.data_type().clone());
+
+    for (row, w) in list_array.offsets().windows(2).enumerate() {
+        let row_result = if list_array.is_null(row) {
+            None
+        } else {
+            let start = w[0].as_usize();
+            let end = w[1].as_usize();
+            let len = end - start;
+
+            match len {
+                0 => None,
+                _ if len < ARROW_COMPUTE_THRESHOLD => {
+                    scalar_min_max::<T>(values_slice, values_nulls, start, 
end, is_min)
+                }
+                _ => {
+                    let slice = values_array.slice(start, len);
+                    if is_min {
+                        arrow::compute::min::<T>(&slice)
+                    } else {
+                        arrow::compute::max::<T>(&slice)
+                    }
+                }
+            }
+        };
+
+        result_builder.append_option(row_result);
+    }
+
+    Ok(Arc::new(result_builder.finish()) as ArrayRef)
+}
+
+/// Computes min or max for a single list row by directly scanning a slice of
+/// the flat values buffer.
+#[inline]
+fn scalar_min_max<T: ArrowPrimitiveType>(
+    values_slice: &[T::Native],
+    values_nulls: Option<&arrow::buffer::NullBuffer>,
+    start: usize,
+    end: usize,
+    is_min: bool,
+) -> Option<T::Native> {
+    let mut best: Option<T::Native> = None;
+    for (i, &val) in values_slice[start..end].iter().enumerate() {
+        if let Some(nulls) = values_nulls
+            && !nulls.is_valid(start + i)
+        {
+            continue;
+        }
+        let update_best = match best {
+            None => true,
+            Some(current) if is_min => val.is_lt(current),
+            Some(current) => val.is_gt(current),
+        };
+        if update_best {
+            best = Some(val);
+        }
+    }
+    best
+}
diff --git a/datafusion/sqllogictest/test_files/array.slt 
b/datafusion/sqllogictest/test_files/array.slt
index d422924ad5..1216e1e023 100644
--- a/datafusion/sqllogictest/test_files/array.slt
+++ b/datafusion/sqllogictest/test_files/array.slt
@@ -1551,6 +1551,44 @@ NULL
 query error DataFusion error: Error during planning: 'array_max' does not 
support zero arguments
 select array_max();
 
+# array_max over multiple rows (exercises the offsets-based iteration)
+query I
+select array_max(column1) from (values
+    (make_array(1, 5, 3)),
+    (make_array(10, 2, 8)),
+    (NULL),
+    (make_array(NULL, 7, NULL)),
+    (make_array(100))
+) as t(column1);
+----
+5
+10
+NULL
+7
+100
+
+# array_max with NaN values (NaN should not be returned as max)
+query R
+select array_max(make_array(1.0, 'NaN'::double, 3.0));
+----
+NaN
+
+query R
+select array_max(make_array('NaN'::double, 'NaN'::double));
+----
+NaN
+
+query R
+select array_max(make_array('NaN'::double, NULL));
+----
+NaN
+
+# array_max with Int32 (exercises a different primitive type than Int64)
+query I
+select array_max(arrow_cast(make_array(10, -5, 3), 'List(Int32)'));
+----
+10
+
 ## array_min
 
 query I
@@ -1641,6 +1679,83 @@ NULL
 query error DataFusion error: Error during planning: 'array_min' does not 
support zero arguments
 select array_min();
 
+# array_min over multiple rows (exercises the offsets-based iteration)
+query I
+select array_min(column1) from (values
+    (make_array(1, 5, 3)),
+    (make_array(10, 2, 8)),
+    (NULL),
+    (make_array(NULL, 7, NULL)),
+    (make_array(100))
+) as t(column1);
+----
+1
+2
+NULL
+7
+100
+
+# array_min with NaN values (NaN should not be returned as min)
+query R
+select array_min(make_array(1.0, 'NaN'::double, 3.0));
+----
+1
+
+query R
+select array_min(make_array('NaN'::double, 'NaN'::double));
+----
+NaN
+
+query R
+select array_min(make_array('NaN'::double, NULL));
+----
+NaN
+
+# array_min with Int32 (exercises a different primitive type than Int64)
+query I
+select array_min(arrow_cast(make_array(10, -5, 3), 'List(Int32)'));
+----
+-5
+
+# array_min/array_max preserve parameterized primitive metadata
+query PPTT
+select
+  array_min(ts_list),
+  array_max(ts_list),
+  arrow_typeof(array_min(ts_list)),
+  arrow_typeof(array_max(ts_list))
+from (
+  select arrow_cast(
+    make_array(
+      arrow_cast(20, 'Timestamp(Nanosecond, Some("UTC"))'),
+      arrow_cast(10, 'Timestamp(Nanosecond, Some("UTC"))'),
+      arrow_cast(30, 'Timestamp(Nanosecond, Some("UTC"))')
+    ),
+    'List(Timestamp(Nanosecond, Some("UTC")))'
+  ) as ts_list
+) t;
+----
+1970-01-01T00:00:00.000000010Z 1970-01-01T00:00:00.000000030Z Timestamp(ns, 
"UTC") Timestamp(ns, "UTC")
+
+query RRTT
+select
+  array_min(dec_list),
+  array_max(dec_list),
+  arrow_typeof(array_min(dec_list)),
+  arrow_typeof(array_max(dec_list))
+from (
+  select arrow_cast(
+    make_array(
+      arrow_cast(200, 'Decimal128(20, 4)'),
+      arrow_cast(100, 'Decimal128(20, 4)'),
+      arrow_cast(300, 'Decimal128(20, 4)')
+    ),
+    'List(Decimal128(20, 4))'
+  ) as dec_list
+) t;
+----
+100 300 Decimal128(20, 4) Decimal128(20, 4)
+
 
 ## array_pop_back (aliases: `list_pop_back`)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to