This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d2df7a57c4 perf: Optimize `array_concat` using `MutableArrayData` 
(#20620)
d2df7a57c4 is described below

commit d2df7a57c417e1e0534a60e4c283513604008763
Author: Neil Conway <[email protected]>
AuthorDate: Tue Mar 3 12:03:24 2026 -0500

    perf: Optimize `array_concat` using `MutableArrayData` (#20620)
    
    ## Which issue does this PR close?
    
    - Closes #20619 .
    
    ## Rationale for this change
    
    The current implementation of `array_concat` creates an `ArrayRef` for
    each row, uses Arrow's `concat` kernel to merge the elements together,
    and then uses `concat` again to produce the final results. This does a
    lot of unnecessary allocation and copying.
    
    Instead, we can use `MutableArrayData::extend` to copy element ranges in
    bulk, which avoids much of this intermediate copying and allocation.
    This approach is 5-15x faster on a microbenchmark.
    
    ## What changes are included in this PR?
    
    * Add benchmark
    * Improve SLT test coverage for `array_concat`
    * Implement optimization
    
    ## Are these changes tested?
    
    Yes, and benchmarked.
    
    ## Are there any user-facing changes?
    
    No.
---
 datafusion/functions-nested/Cargo.toml             |   4 +
 .../functions-nested/benches/array_concat.rs       |  94 +++++++++++++++++++
 datafusion/functions-nested/src/concat.rs          | 101 +++++++++++----------
 datafusion/sqllogictest/test_files/array.slt       |  16 ++++
 4 files changed, 168 insertions(+), 47 deletions(-)

diff --git a/datafusion/functions-nested/Cargo.toml 
b/datafusion/functions-nested/Cargo.toml
index 0b26170dbb..0fdb69e6e7 100644
--- a/datafusion/functions-nested/Cargo.toml
+++ b/datafusion/functions-nested/Cargo.toml
@@ -67,6 +67,10 @@ paste = { workspace = true }
 criterion = { workspace = true, features = ["async_tokio"] }
 rand = { workspace = true }
 
+[[bench]]
+harness = false
+name = "array_concat"
+
 [[bench]]
 harness = false
 name = "array_expression"
diff --git a/datafusion/functions-nested/benches/array_concat.rs 
b/datafusion/functions-nested/benches/array_concat.rs
new file mode 100644
index 0000000000..75dcc88f14
--- /dev/null
+++ b/datafusion/functions-nested/benches/array_concat.rs
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::hint::black_box;
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int32Array, ListArray};
+use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
+use arrow::datatypes::{DataType, Field};
+use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
+use rand::rngs::StdRng;
+use rand::{Rng, SeedableRng};
+
+use datafusion_functions_nested::concat::array_concat_inner;
+
+const SEED: u64 = 42;
+
+/// Build a `ListArray<i32>` with `num_lists` rows, each containing
+/// `elements_per_list` random i32 values. Every 10th row is null.
+fn make_list_array(
+    rng: &mut StdRng,
+    num_lists: usize,
+    elements_per_list: usize,
+) -> ArrayRef {
+    let total_values = num_lists * elements_per_list;
+    let values: Vec<i32> = (0..total_values).map(|_| rng.random()).collect();
+    let values = Arc::new(Int32Array::from(values));
+
+    let offsets: Vec<i32> = (0..=num_lists)
+        .map(|i| (i * elements_per_list) as i32)
+        .collect();
+    let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
+
+    let nulls: Vec<bool> = (0..num_lists).map(|i| i % 10 != 0).collect();
+    let nulls = Some(NullBuffer::from(nulls));
+
+    Arc::new(ListArray::new(
+        Arc::new(Field::new("item", DataType::Int32, false)),
+        offsets,
+        values,
+        nulls,
+    ))
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+    let mut group = c.benchmark_group("array_concat");
+
+    // Benchmark: varying number of rows, 20 elements per list
+    for num_rows in [100, 1000, 10000] {
+        let mut rng = StdRng::seed_from_u64(SEED);
+        let list_a = make_list_array(&mut rng, num_rows, 20);
+        let list_b = make_list_array(&mut rng, num_rows, 20);
+        let args: Vec<ArrayRef> = vec![list_a, list_b];
+
+        group.bench_with_input(BenchmarkId::new("rows", num_rows), &args, |b, 
args| {
+            b.iter(|| black_box(array_concat_inner(args).unwrap()));
+        });
+    }
+
+    // Benchmark: 1000 rows, varying element counts per list
+    for elements_per_list in [5, 50, 500] {
+        let mut rng = StdRng::seed_from_u64(SEED);
+        let list_a = make_list_array(&mut rng, 1000, elements_per_list);
+        let list_b = make_list_array(&mut rng, 1000, elements_per_list);
+        let args: Vec<ArrayRef> = vec![list_a, list_b];
+
+        group.bench_with_input(
+            BenchmarkId::new("elements_per_list", elements_per_list),
+            &args,
+            |b, args| {
+                b.iter(|| black_box(array_concat_inner(args).unwrap()));
+            },
+        );
+    }
+
+    group.finish();
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/datafusion/functions-nested/src/concat.rs 
b/datafusion/functions-nested/src/concat.rs
index 0a7402060a..78519d2de2 100644
--- a/datafusion/functions-nested/src/concat.rs
+++ b/datafusion/functions-nested/src/concat.rs
@@ -24,9 +24,9 @@ use crate::make_array::make_array_inner;
 use crate::utils::{align_array_dimensions, check_datatypes, 
make_scalar_function};
 use arrow::array::{
     Array, ArrayData, ArrayRef, Capacities, GenericListArray, MutableArrayData,
-    NullBufferBuilder, OffsetSizeTrait,
+    OffsetSizeTrait,
 };
-use arrow::buffer::OffsetBuffer;
+use arrow::buffer::{NullBuffer, OffsetBuffer};
 use arrow::datatypes::{DataType, Field};
 use datafusion_common::Result;
 use datafusion_common::utils::{
@@ -352,7 +352,7 @@ impl ScalarUDFImpl for ArrayConcat {
     }
 }
 
-fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
+pub fn array_concat_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
     if args.is_empty() {
         return exec_err!("array_concat expects at least one argument");
     }
@@ -396,58 +396,65 @@ fn concat_internal<O: OffsetSizeTrait>(args: &[ArrayRef]) 
-> Result<ArrayRef> {
         .iter()
         .map(|arg| as_generic_list_array::<O>(arg))
         .collect::<Result<Vec<_>>>()?;
-    // Assume number of rows is the same for all arrays
     let row_count = list_arrays[0].len();
 
-    let mut array_lengths = vec![];
-    let mut arrays = vec![];
-    let mut valid = NullBufferBuilder::new(row_count);
-    for i in 0..row_count {
-        let nulls = list_arrays
+    // Extract underlying values ArrayData from each list array for 
MutableArrayData.
+    let values_data: Vec<ArrayData> =
+        list_arrays.iter().map(|la| la.values().to_data()).collect();
+    let values_data_refs: Vec<&ArrayData> = values_data.iter().collect();
+
+    // Estimate capacity as the sum of all values arrays' lengths.
+    let total_capacity: usize = values_data.iter().map(|d| d.len()).sum();
+
+    let mut mutable = MutableArrayData::with_capacities(
+        values_data_refs,
+        false,
+        Capacities::Array(total_capacity),
+    );
+    let mut offsets: Vec<O> = Vec::with_capacity(row_count + 1);
+    offsets.push(O::zero());
+
+    // Compute the output null buffer: a row is null only if null in ALL input
+    // arrays. This is the bitwise OR of validity bits (valid if valid in ANY
+    // input). If any array has no null buffer (all valid), no output row can 
be
+    // null.
+    let nulls = list_arrays
+        .iter()
+        .filter_map(|la| la.nulls())
+        .collect::<Vec<_>>();
+    let valid = if nulls.len() == list_arrays.len() {
+        nulls
             .iter()
-            .map(|arr| arr.is_null(i))
-            .collect::<Vec<_>>();
-
-        // If all the arrays are null, the concatenated array is null
-        let is_null = nulls.iter().all(|&x| x);
-        if is_null {
-            array_lengths.push(0);
-            valid.append_null();
-        } else {
-            // Get all the arrays on i-th row
-            let values = list_arrays
-                .iter()
-                .map(|arr| arr.value(i))
-                .collect::<Vec<_>>();
-
-            let elements = values
-                .iter()
-                .map(|a| a.as_ref())
-                .collect::<Vec<&dyn Array>>();
-
-            // Concatenated array on i-th row
-            let concatenated_array = 
arrow::compute::concat(elements.as_slice())?;
-            array_lengths.push(concatenated_array.len());
-            arrays.push(concatenated_array);
-            valid.append_non_null();
+            .map(|n| n.inner().clone())
+            .reduce(|a, b| &a | &b)
+            .map(NullBuffer::new)
+    } else {
+        None
+    };
+
+    for row_idx in 0..row_count {
+        for (arr_idx, list_array) in list_arrays.iter().enumerate() {
+            if list_array.is_null(row_idx) {
+                continue;
+            }
+            let start = list_array.offsets()[row_idx].to_usize().unwrap();
+            let end = list_array.offsets()[row_idx + 1].to_usize().unwrap();
+            if start < end {
+                mutable.extend(arr_idx, start, end);
+            }
         }
+        offsets.push(O::usize_as(mutable.len()));
     }
-    // Assume all arrays have the same data type
-    let data_type = list_arrays[0].value_type();
 
-    let elements = arrays
-        .iter()
-        .map(|a| a.as_ref())
-        .collect::<Vec<&dyn Array>>();
+    let data_type = list_arrays[0].value_type();
+    let data = mutable.freeze();
 
-    let list_arr = GenericListArray::<O>::new(
+    Ok(Arc::new(GenericListArray::<O>::try_new(
         Arc::new(Field::new_list_field(data_type, true)),
-        OffsetBuffer::from_lengths(array_lengths),
-        Arc::new(arrow::compute::concat(elements.as_slice())?),
-        valid.finish(),
-    );
-
-    Ok(Arc::new(list_arr))
+        OffsetBuffer::new(offsets.into()),
+        arrow::array::make_array(data),
+        valid,
+    )?))
 }
 
 // Kernel functions
diff --git a/datafusion/sqllogictest/test_files/array.slt 
b/datafusion/sqllogictest/test_files/array.slt
index 17475c6a11..00d28d38d6 100644
--- a/datafusion/sqllogictest/test_files/array.slt
+++ b/datafusion/sqllogictest/test_files/array.slt
@@ -3453,6 +3453,22 @@ select
 ----
 [1, 2, 3] List(Utf8View)
 
+# array_concat with NULL elements inside arrays
+query ?
+select array_concat([1, NULL, 3], [NULL, 5]);
+----
+[1, NULL, 3, NULL, 5]
+
+query ?
+select array_concat([NULL, NULL], [1, 2], [NULL]);
+----
+[NULL, NULL, 1, 2, NULL]
+
+query ?
+select array_concat([NULL, NULL], [NULL, NULL]);
+----
+[NULL, NULL, NULL, NULL]
+
 # array_concat error
 query error DataFusion error: Error during planning: Execution error: Function 
'array_concat' user-defined coercion failed with "Error during planning: 
array_concat does not support type Int64"
 select array_concat(1, 2);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to