This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 803cce881b feat: implement Spark size function for arrays and maps 
(#19592)
803cce881b is described below

commit 803cce881bbe1dce1f89c688b30ef742f03df9a6
Author: Yu-Chuan Hung <[email protected]>
AuthorDate: Tue Jan 13 12:15:20 2026 +0800

    feat: implement Spark size function for arrays and maps (#19592)
    
    ## Which issue does this PR close?
    - Closes #5338.
    - Part of #15914.
    
    ## Rationale for this change
    The size function is a commonly used Spark SQL function that returns the
    number of elements in an array or the number of key-value pairs in a
    map.
    
    ## What changes are included in this PR?
    Implement Spark-compatible size function in the datafusion-spark crate:
    - Supports List, LargeList, FixedSizeList, and Map types
    - Returns Int32 to match Spark's IntegerType
    - Returns NULL for NULL input (modern Spark 3.0+ behavior)
    
    ## Are these changes tested?
    Yes:
    - Unit tests in size.rs for nullability, ListArray, MapArray, and
    FixedSizeListArray
    - SQL logic tests in spark/collection/size.slt
    
    ## Are there any user-facing changes?
    Yes, new size function available in the Spark crate.
---
 datafusion/spark/src/function/collection/mod.rs    |  13 +-
 datafusion/spark/src/function/collection/size.rs   | 162 +++++++++++++++++++++
 .../test_files/spark/collection/size.slt           | 132 +++++++++++++++++
 3 files changed, 305 insertions(+), 2 deletions(-)

diff --git a/datafusion/spark/src/function/collection/mod.rs 
b/datafusion/spark/src/function/collection/mod.rs
index a87df9a2c8..6871e3aba6 100644
--- a/datafusion/spark/src/function/collection/mod.rs
+++ b/datafusion/spark/src/function/collection/mod.rs
@@ -15,11 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
+pub mod size;
+
 use datafusion_expr::ScalarUDF;
+use datafusion_functions::make_udf_function;
 use std::sync::Arc;
 
-pub mod expr_fn {}
+make_udf_function!(size::SparkSize, size);
+
+pub mod expr_fn {
+    use datafusion_functions::export_functions;
+
+    export_functions!((size, "Return the size of an array or map.", arg));
+}
 
 pub fn functions() -> Vec<Arc<ScalarUDF>> {
-    vec![]
+    vec![size()]
 }
diff --git a/datafusion/spark/src/function/collection/size.rs 
b/datafusion/spark/src/function/collection/size.rs
new file mode 100644
index 0000000000..05b8ba3156
--- /dev/null
+++ b/datafusion/spark/src/function/collection/size.rs
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Array, ArrayRef, AsArray, Int32Array};
+use arrow::compute::kernels::length::length as arrow_length;
+use arrow::datatypes::{DataType, Field, FieldRef};
+use datafusion_common::{Result, plan_err};
+use datafusion_expr::{
+    ArrayFunctionArgument, ArrayFunctionSignature, ColumnarValue, 
ReturnFieldArgs,
+    ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
+};
+use datafusion_functions::utils::make_scalar_function;
+use std::any::Any;
+use std::sync::Arc;
+
+/// Spark-compatible `size` function.
+///
+/// Returns the number of elements in an array or the number of key-value 
pairs in a map.
+/// Returns -1 for null input (Spark behavior).
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct SparkSize {
+    signature: Signature,
+}
+
+impl Default for SparkSize {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl SparkSize {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::one_of(
+                vec![
+                    // Array Type
+                    
TypeSignature::ArraySignature(ArrayFunctionSignature::Array {
+                        arguments: vec![ArrayFunctionArgument::Array],
+                        array_coercion: None,
+                    }),
+                    // Map Type
+                    
TypeSignature::ArraySignature(ArrayFunctionSignature::MapArray),
+                ],
+                Volatility::Immutable,
+            ),
+        }
+    }
+}
+
+impl ScalarUDFImpl for SparkSize {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "size"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+        Ok(DataType::Int32)
+    }
+
+    fn return_field_from_args(&self, _args: ReturnFieldArgs) -> 
Result<FieldRef> {
+        // nullable=false for legacy behavior (NULL -> -1); set to input 
nullability for null-on-null
+        Ok(Arc::new(Field::new(self.name(), DataType::Int32, false)))
+    }
+
+    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+        make_scalar_function(spark_size_inner, vec![])(&args.args)
+    }
+}
+
+fn spark_size_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
+    let array = &args[0];
+
+    match array.data_type() {
+        DataType::List(_) => {
+            if array.null_count() == 0 {
+                Ok(arrow_length(array)?)
+            } else {
+                let list_array = array.as_list::<i32>();
+                let lengths: Vec<i32> = list_array
+                    .offsets()
+                    .lengths()
+                    .enumerate()
+                    .map(|(i, len)| if array.is_null(i) { -1 } else { len as 
i32 })
+                    .collect();
+                Ok(Arc::new(Int32Array::from(lengths)))
+            }
+        }
+        DataType::FixedSizeList(_, size) => {
+            if array.null_count() == 0 {
+                Ok(arrow_length(array)?)
+            } else {
+                let length: Vec<i32> = (0..array.len())
+                    .map(|i| if array.is_null(i) { -1 } else { *size })
+                    .collect();
+                Ok(Arc::new(Int32Array::from(length)))
+            }
+        }
+        DataType::LargeList(_) => {
+            // Arrow length kernel returns Int64 for LargeList
+            let list_array = array.as_list::<i64>();
+            if array.null_count() == 0 {
+                let lengths: Vec<i32> = list_array
+                    .offsets()
+                    .lengths()
+                    .map(|len| len as i32)
+                    .collect();
+                Ok(Arc::new(Int32Array::from(lengths)))
+            } else {
+                let lengths: Vec<i32> = list_array
+                    .offsets()
+                    .lengths()
+                    .enumerate()
+                    .map(|(i, len)| if array.is_null(i) { -1 } else { len as 
i32 })
+                    .collect();
+                Ok(Arc::new(Int32Array::from(lengths)))
+            }
+        }
+        DataType::Map(_, _) => {
+            let map_array = array.as_map();
+            let length: Vec<i32> = if array.null_count() == 0 {
+                map_array
+                    .offsets()
+                    .lengths()
+                    .map(|len| len as i32)
+                    .collect()
+            } else {
+                map_array
+                    .offsets()
+                    .lengths()
+                    .enumerate()
+                    .map(|(i, len)| if array.is_null(i) { -1 } else { len as 
i32 })
+                    .collect()
+            };
+            Ok(Arc::new(Int32Array::from(length)))
+        }
+        DataType::Null => Ok(Arc::new(Int32Array::from(vec![-1; 
array.len()]))),
+        dt => {
+            plan_err!("size function does not support type: {}", dt)
+        }
+    }
+}
diff --git a/datafusion/sqllogictest/test_files/spark/collection/size.slt 
b/datafusion/sqllogictest/test_files/spark/collection/size.slt
new file mode 100644
index 0000000000..dabcfd069b
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/spark/collection/size.slt
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This file was originally created by a porting script from:
+#   
https://github.com/lakehq/sail/tree/43b6ed8221de5c4c4adbedbb267ae1351158b43c/crates/sail-spark-connect/tests/gold_data/function
+# This file is part of the implementation of the datafusion-spark function 
library.
+# For more information, please see:
+#   https://github.com/apache/datafusion/issues/15914
+
+## Original Query: SELECT size(array(1, 2, 3));
+## PySpark 3.5.5 Result: {'size(array(1, 2, 3))': 3}
+
+# Basic array
+query I
+SELECT size(make_array(1, 2, 3));
+----
+3
+
+# Nested array
+query I
+SELECT size(make_array(make_array(1, 2), make_array(3, 4, 5)));
+----
+2
+
+# LargeList tests
+query I
+SELECT size(arrow_cast(make_array(1, 2, 3), 'LargeList(Int32)'));
+----
+3
+
+query I
+SELECT size(arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)'));
+----
+5
+
+# FixedSizeList tests
+query I
+SELECT size(arrow_cast(make_array(1, 2, 3), 'FixedSizeList(3, Int32)'));
+----
+3
+
+query I
+SELECT size(arrow_cast(make_array(1, 2, 3, 4), 'FixedSizeList(4, Int32)'));
+----
+4
+
+# Map size tests
+query I
+SELECT size(map(make_array('a', 'b', 'c'), make_array(1, 2, 3)));
+----
+3
+
+query I
+SELECT size(map(make_array('a'), make_array(1)));
+----
+1
+
+# Empty array
+query I
+SELECT size(arrow_cast(make_array(), 'List(Int32)'));
+----
+0
+
+
+# Array with NULL elements (size counts elements including NULLs)
+query I
+SELECT size(make_array(1, NULL, 3));
+----
+3
+
+# NULL array returns -1 (Spark behavior)
+query I
+SELECT size(NULL::int[]);
+----
+-1
+
+
+# Empty map
+query I
+SELECT size(map(arrow_cast(make_array(), 'List(Utf8)'), 
arrow_cast(make_array(), 'List(Int32)')));
+----
+0
+
+# String array
+query I
+SELECT size(make_array('hello', 'world'));
+----
+2
+
+# Boolean array
+query I
+SELECT size(make_array(true, false, true));
+----
+3
+
+# Float array
+query I
+SELECT size(make_array(1.5, 2.5, 3.5, 4.5));
+----
+4
+
+# Array column tests (with NULL values)
+query I
+SELECT size(column1) FROM VALUES ([1]), ([1,2]), ([]), (NULL);
+----
+1
+2
+0
+-1
+
+# Map column tests (with NULL values)
+query I
+SELECT size(column1) FROM VALUES (map(['a'], [1])), (map(['a','b'], [1,2])), 
(NULL);
+----
+1
+2
+-1
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to