This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4d5aea4b5a perf: Optimize `array_min`, `array_max` for arrays of
primitive types (#21101)
4d5aea4b5a is described below
commit 4d5aea4b5ad3f3e172f9b7dafae30069c4204bfb
Author: Neil Conway <[email protected]>
AuthorDate: Tue Mar 24 16:41:55 2026 -0400
perf: Optimize `array_min`, `array_max` for arrays of primitive types
(#21101)
## Which issue does this PR close?
- Closes #21100.
## Rationale for this change
In the current implementation, we construct a `PrimitiveArray` for each
row, feed it to the Arrow `min` / `max` kernel, and then collect the
resulting `ScalarValue`s in a `Vec`. We then construct a final
`PrimitiveArray` for the result via `ScalarValue::iter_to_array` of the
`Vec`.
We can do better for ListArrays of primitive types. First, we can
iterate directly over the flat values buffer of the `ListArray` for the
batch and compute the min/max from each row's slice directly. Second,
Arrow's `min` / `max` kernels have a reasonable amount of per-call
overhead; for small arrays, it is more efficient to compute the min/max
ourselves via direct iteration.
Benchmarks (8192 rows, arrays of int64 values, M4 Max):
- no_nulls / list_size=10: 309 µs → 26.6 µs (11.6x faster)
- no_nulls / list_size=100: 392 µs → 150 µs (2.6x faster)
- no_nulls / list_size=1000: 1.20 ms → 951 µs (1.26x faster)
- nulls / list_size=10: 385 µs → 69.0 µs (5.6x faster)
- nulls / list_size=100: 790 µs → 616 µs (1.28x faster)
- nulls / list_size=1000: 5.34 ms → 5.21 ms (1.02x faster)
## What changes are included in this PR?
* Add benchmark for `array_max`
* Expand SLT test coverage
* Implement optimization
## Are these changes tested?
Yes.
## Are there any user-facing changes?
No.
---
datafusion/functions-nested/Cargo.toml | 4 +
.../functions-nested/benches/array_min_max.rs | 121 +++++++++++++++++++++
datafusion/functions-nested/src/min_max.rs | 118 ++++++++++++++++++--
datafusion/sqllogictest/test_files/array.slt | 115 ++++++++++++++++++++
4 files changed, 351 insertions(+), 7 deletions(-)
diff --git a/datafusion/functions-nested/Cargo.toml
b/datafusion/functions-nested/Cargo.toml
index 2ce9532a22..6e96a44fc9 100644
--- a/datafusion/functions-nested/Cargo.toml
+++ b/datafusion/functions-nested/Cargo.toml
@@ -70,6 +70,10 @@ rand = { workspace = true }
harness = false
name = "array_concat"
+[[bench]]
+harness = false
+name = "array_min_max"
+
[[bench]]
harness = false
name = "array_expression"
diff --git a/datafusion/functions-nested/benches/array_min_max.rs
b/datafusion/functions-nested/benches/array_min_max.rs
new file mode 100644
index 0000000000..391a84d148
--- /dev/null
+++ b/datafusion/functions-nested/benches/array_min_max.rs
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int64Array, ListArray};
+use arrow::buffer::{NullBuffer, OffsetBuffer};
+use arrow::datatypes::{DataType, Field};
+use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
+use datafusion_common::config::ConfigOptions;
+use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
+use datafusion_functions_nested::min_max::ArrayMax;
+use rand::rngs::StdRng;
+use rand::{Rng, SeedableRng};
+
+const NUM_ROWS: usize = 8192;
+const SEED: u64 = 42;
+const LIST_NULL_DENSITY: f64 = 0.1;
+const ELEMENT_NULL_DENSITY: f64 = 0.1;
+
+fn create_int64_list_array(
+ num_rows: usize,
+ list_size: usize,
+ element_null_density: f64,
+) -> ArrayRef {
+ let mut rng = StdRng::seed_from_u64(SEED);
+ let total_values = num_rows * list_size;
+
+ if element_null_density > 0.0 {
+ let values: Vec<Option<i64>> = (0..total_values)
+ .map(|_| {
+ if rng.random::<f64>() < element_null_density {
+ None
+ } else {
+ Some(rng.random::<i64>() % 10_000)
+ }
+ })
+ .collect();
+ let values_array = Arc::new(Int64Array::from(values));
+
+ let offsets: Vec<i32> = (0..=num_rows).map(|i| (i * list_size) as
i32).collect();
+ let nulls: Vec<bool> = (0..num_rows)
+ .map(|_| rng.random::<f64>() >= LIST_NULL_DENSITY)
+ .collect();
+
+ Arc::new(ListArray::new(
+ Arc::new(Field::new("item", DataType::Int64, true)),
+ OffsetBuffer::new(offsets.into()),
+ values_array,
+ Some(NullBuffer::from(nulls)),
+ ))
+ } else {
+ // No element nulls — values array has no null buffer
+ let values: Vec<i64> = (0..total_values)
+ .map(|_| rng.random::<i64>() % 10_000)
+ .collect();
+ let values_array = Arc::new(Int64Array::from(values));
+
+ let offsets: Vec<i32> = (0..=num_rows).map(|i| (i * list_size) as
i32).collect();
+ let nulls: Vec<bool> = (0..num_rows)
+ .map(|_| rng.random::<f64>() >= LIST_NULL_DENSITY)
+ .collect();
+
+ Arc::new(ListArray::new(
+ Arc::new(Field::new("item", DataType::Int64, false)),
+ OffsetBuffer::new(offsets.into()),
+ values_array,
+ Some(NullBuffer::from(nulls)),
+ ))
+ }
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+ let udf = ArrayMax::new();
+ let config_options = Arc::new(ConfigOptions::default());
+
+ for list_size in [10, 100, 1000] {
+ for (label, null_density) in [("nulls", ELEMENT_NULL_DENSITY),
("no_nulls", 0.0)]
+ {
+ let list_array = create_int64_list_array(NUM_ROWS, list_size,
null_density);
+ let args = vec![ColumnarValue::Array(Arc::clone(&list_array))];
+ let arg_fields =
+ vec![Field::new("arg_0", list_array.data_type().clone(),
true).into()];
+ let return_field: Arc<Field> = Field::new("f", DataType::Int64,
true).into();
+
+ c.bench_with_input(
+ BenchmarkId::new("array_max",
format!("{label}/list_size={list_size}")),
+ &list_array,
+ |b, _| {
+ b.iter(|| {
+ udf.invoke_with_args(ScalarFunctionArgs {
+ args: args.clone(),
+ arg_fields: arg_fields.clone(),
+ number_rows: NUM_ROWS,
+ return_field: return_field.clone(),
+ config_options: config_options.clone(),
+ })
+ .unwrap()
+ });
+ },
+ );
+ }
+ }
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/datafusion/functions-nested/src/min_max.rs
b/datafusion/functions-nested/src/min_max.rs
index e3603b731f..c65de1d9b9 100644
--- a/datafusion/functions-nested/src/min_max.rs
+++ b/datafusion/functions-nested/src/min_max.rs
@@ -15,9 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-//! [`ScalarUDFImpl`] definitions for array_max function.
+//! [`ScalarUDFImpl`] definitions for array_min and array_max functions.
use crate::utils::make_scalar_function;
-use arrow::array::{ArrayRef, GenericListArray, OffsetSizeTrait};
+use arrow::array::{
+ Array, ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, AsArray,
GenericListArray,
+ OffsetSizeTrait, PrimitiveBuilder, downcast_primitive,
+};
use arrow::datatypes::DataType;
use arrow::datatypes::DataType::{LargeList, List};
use datafusion_common::Result;
@@ -32,6 +35,7 @@ use
datafusion_functions_aggregate_common::min_max::{max_batch, min_batch};
use datafusion_macros::user_doc;
use itertools::Itertools;
use std::any::Any;
+use std::sync::Arc;
make_udf_expr_and_func!(
ArrayMax,
@@ -116,8 +120,8 @@ impl ScalarUDFImpl for ArrayMax {
fn array_max_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
let [array] = take_function_args("array_max", args)?;
match array.data_type() {
- List(_) => array_min_max_helper(as_list_array(array)?, max_batch),
- LargeList(_) => array_min_max_helper(as_large_list_array(array)?,
max_batch),
+ List(_) => array_min_max_helper(as_list_array(array)?, false),
+ LargeList(_) => array_min_max_helper(as_large_list_array(array)?,
false),
arg_type => exec_err!("array_max does not support type: {arg_type}"),
}
}
@@ -198,16 +202,23 @@ impl ScalarUDFImpl for ArrayMin {
fn array_min_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
let [array] = take_function_args("array_min", args)?;
match array.data_type() {
- List(_) => array_min_max_helper(as_list_array(array)?, min_batch),
- LargeList(_) => array_min_max_helper(as_large_list_array(array)?,
min_batch),
+ List(_) => array_min_max_helper(as_list_array(array)?, true),
+ LargeList(_) => array_min_max_helper(as_large_list_array(array)?,
true),
arg_type => exec_err!("array_min does not support type: {arg_type}"),
}
}
fn array_min_max_helper<O: OffsetSizeTrait>(
array: &GenericListArray<O>,
- agg_fn: fn(&ArrayRef) -> Result<ScalarValue>,
+ is_min: bool,
) -> Result<ArrayRef> {
+ // Try the primitive fast path first
+ if let Some(result) = try_primitive_array_min_max(array, is_min) {
+ return result;
+ }
+
+ // Fallback: per-row ScalarValue path for non-primitive types
+ let agg_fn = if is_min { min_batch } else { max_batch };
let null_value = ScalarValue::try_from(array.value_type())?;
let result_vec: Vec<ScalarValue> = array
.iter()
@@ -215,3 +226,96 @@ fn array_min_max_helper<O: OffsetSizeTrait>(
.try_collect()?;
ScalarValue::iter_to_array(result_vec)
}
+
+/// Dispatches to a typed primitive min/max implementation, or returns `None`
if
+/// the element type is not a primitive.
+fn try_primitive_array_min_max<O: OffsetSizeTrait>(
+ list_array: &GenericListArray<O>,
+ is_min: bool,
+) -> Option<Result<ArrayRef>> {
+ macro_rules! helper {
+ ($t:ty) => {
+ return Some(primitive_array_min_max::<O, $t>(list_array, is_min))
+ };
+ }
+ downcast_primitive! {
+ list_array.value_type() => (helper),
+ _ => {}
+ }
+ None
+}
+
+/// Threshold to switch from direct iteration to using `min` / `max` kernel
from
+/// `arrow::compute`. The latter has enough per-invocation overhead that direct
+/// iteration is faster for small lists.
+const ARROW_COMPUTE_THRESHOLD: usize = 32;
+
+/// Computes min or max for each row of a primitive ListArray.
+fn primitive_array_min_max<O: OffsetSizeTrait, T: ArrowPrimitiveType>(
+ list_array: &GenericListArray<O>,
+ is_min: bool,
+) -> Result<ArrayRef> {
+ let values_array = list_array.values().as_primitive::<T>();
+ let values_slice = values_array.values();
+ let values_nulls = values_array.nulls();
+ let mut result_builder =
PrimitiveBuilder::<T>::with_capacity(list_array.len())
+ .with_data_type(values_array.data_type().clone());
+
+ for (row, w) in list_array.offsets().windows(2).enumerate() {
+ let row_result = if list_array.is_null(row) {
+ None
+ } else {
+ let start = w[0].as_usize();
+ let end = w[1].as_usize();
+ let len = end - start;
+
+ match len {
+ 0 => None,
+ _ if len < ARROW_COMPUTE_THRESHOLD => {
+ scalar_min_max::<T>(values_slice, values_nulls, start,
end, is_min)
+ }
+ _ => {
+ let slice = values_array.slice(start, len);
+ if is_min {
+ arrow::compute::min::<T>(&slice)
+ } else {
+ arrow::compute::max::<T>(&slice)
+ }
+ }
+ }
+ };
+
+ result_builder.append_option(row_result);
+ }
+
+ Ok(Arc::new(result_builder.finish()) as ArrayRef)
+}
+
+/// Computes min or max for a single list row by directly scanning a slice of
+/// the flat values buffer.
+#[inline]
+fn scalar_min_max<T: ArrowPrimitiveType>(
+ values_slice: &[T::Native],
+ values_nulls: Option<&arrow::buffer::NullBuffer>,
+ start: usize,
+ end: usize,
+ is_min: bool,
+) -> Option<T::Native> {
+ let mut best: Option<T::Native> = None;
+ for (i, &val) in values_slice[start..end].iter().enumerate() {
+ if let Some(nulls) = values_nulls
+ && !nulls.is_valid(start + i)
+ {
+ continue;
+ }
+ let update_best = match best {
+ None => true,
+ Some(current) if is_min => val.is_lt(current),
+ Some(current) => val.is_gt(current),
+ };
+ if update_best {
+ best = Some(val);
+ }
+ }
+ best
+}
diff --git a/datafusion/sqllogictest/test_files/array.slt
b/datafusion/sqllogictest/test_files/array.slt
index d422924ad5..1216e1e023 100644
--- a/datafusion/sqllogictest/test_files/array.slt
+++ b/datafusion/sqllogictest/test_files/array.slt
@@ -1551,6 +1551,44 @@ NULL
query error DataFusion error: Error during planning: 'array_max' does not
support zero arguments
select array_max();
+# array_max over multiple rows (exercises the offsets-based iteration)
+query I
+select array_max(column1) from (values
+ (make_array(1, 5, 3)),
+ (make_array(10, 2, 8)),
+ (NULL),
+ (make_array(NULL, 7, NULL)),
+ (make_array(100))
+) as t(column1);
+----
+5
+10
+NULL
+7
+100
+
+# array_max with NaN values (NaN should not be returned as max)
+query R
+select array_max(make_array(1.0, 'NaN'::double, 3.0));
+----
+NaN
+
+query R
+select array_max(make_array('NaN'::double, 'NaN'::double));
+----
+NaN
+
+query R
+select array_max(make_array('NaN'::double, NULL));
+----
+NaN
+
+# array_max with Int32 (exercises a different primitive type than Int64)
+query I
+select array_max(arrow_cast(make_array(10, -5, 3), 'List(Int32)'));
+----
+10
+
## array_min
query I
@@ -1641,6 +1679,83 @@ NULL
query error DataFusion error: Error during planning: 'array_min' does not
support zero arguments
select array_min();
+# array_min over multiple rows (exercises the offsets-based iteration)
+query I
+select array_min(column1) from (values
+ (make_array(1, 5, 3)),
+ (make_array(10, 2, 8)),
+ (NULL),
+ (make_array(NULL, 7, NULL)),
+ (make_array(100))
+) as t(column1);
+----
+1
+2
+NULL
+7
+100
+
+# array_min with NaN values (NaN should not be returned as min)
+query R
+select array_min(make_array(1.0, 'NaN'::double, 3.0));
+----
+1
+
+query R
+select array_min(make_array('NaN'::double, 'NaN'::double));
+----
+NaN
+
+query R
+select array_min(make_array('NaN'::double, NULL));
+----
+NaN
+
+# array_min with Int32 (exercises a different primitive type than Int64)
+query I
+select array_min(arrow_cast(make_array(10, -5, 3), 'List(Int32)'));
+----
+-5
+
+# array_min/array_max preserve parameterized primitive metadata
+query PPTT
+select
+ array_min(ts_list),
+ array_max(ts_list),
+ arrow_typeof(array_min(ts_list)),
+ arrow_typeof(array_max(ts_list))
+from (
+ select arrow_cast(
+ make_array(
+ arrow_cast(20, 'Timestamp(Nanosecond, Some("UTC"))'),
+ arrow_cast(10, 'Timestamp(Nanosecond, Some("UTC"))'),
+ arrow_cast(30, 'Timestamp(Nanosecond, Some("UTC"))')
+ ),
+ 'List(Timestamp(Nanosecond, Some("UTC")))'
+ ) as ts_list
+) t;
+----
+1970-01-01T00:00:00.000000010Z 1970-01-01T00:00:00.000000030Z Timestamp(ns,
"UTC") Timestamp(ns, "UTC")
+
+query RRTT
+select
+ array_min(dec_list),
+ array_max(dec_list),
+ arrow_typeof(array_min(dec_list)),
+ arrow_typeof(array_max(dec_list))
+from (
+ select arrow_cast(
+ make_array(
+ arrow_cast(200, 'Decimal128(20, 4)'),
+ arrow_cast(100, 'Decimal128(20, 4)'),
+ arrow_cast(300, 'Decimal128(20, 4)')
+ ),
+ 'List(Decimal128(20, 4))'
+ ) as dec_list
+) t;
+----
+100 300 Decimal128(20, 4) Decimal128(20, 4)
+
## array_pop_back (aliases: `list_pop_back`)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]