This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new c0c67bd75 chore: use datafusion impl of `space` function (#3612)
c0c67bd75 is described below
commit c0c67bd75213861d103a62656cb01b859728fcc0
Author: Kazantsev Maksim <[email protected]>
AuthorDate: Tue Mar 3 17:25:31 2026 +0400
chore: use datafusion impl of `space` function (#3612)
---
native/core/src/execution/jni_api.rs | 2 +
native/spark-expr/src/comet_scalar_funcs.rs | 3 +-
native/spark-expr/src/string_funcs/mod.rs | 2 -
native/spark-expr/src/string_funcs/string_space.rs | 197 ---------------------
.../org/apache/comet/serde/QueryPlanSerde.scala | 2 +-
5 files changed, 4 insertions(+), 202 deletions(-)
diff --git a/native/core/src/execution/jni_api.rs
b/native/core/src/execution/jni_api.rs
index 0193f3012..a090dd89d 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -400,6 +400,7 @@ fn register_datafusion_spark_function(session_ctx:
&SessionContext) {
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkWidthBucket::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkCrc32::default()));
+ session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSpace::default()));
}
/// Prepares arrow arrays for output.
@@ -910,6 +911,7 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_logMemoryUsage(
use crate::execution::columnar_to_row::ColumnarToRowContext;
use arrow::ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema};
+use datafusion_spark::function::string::space::SparkSpace;
/// Initialize a native columnar to row converter.
///
diff --git a/native/spark-expr/src/comet_scalar_funcs.rs
b/native/spark-expr/src/comet_scalar_funcs.rs
index 4bfdef709..08b34bc26 100644
--- a/native/spark-expr/src/comet_scalar_funcs.rs
+++ b/native/spark-expr/src/comet_scalar_funcs.rs
@@ -23,7 +23,7 @@ use crate::{
spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor,
spark_isnan,
spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round,
spark_rpad, spark_unhex,
spark_unscaled_value, EvalMode, SparkBitwiseCount, SparkContains,
SparkDateDiff,
- SparkDateTrunc, SparkMakeDate, SparkSizeFunc, SparkStringSpace,
+ SparkDateTrunc, SparkMakeDate, SparkSizeFunc,
};
use arrow::datatypes::DataType;
use datafusion::common::{DataFusionError, Result as DataFusionResult};
@@ -196,7 +196,6 @@ fn all_scalar_functions() -> Vec<Arc<ScalarUDF>> {
Arc::new(ScalarUDF::new_from_impl(SparkDateDiff::default())),
Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())),
Arc::new(ScalarUDF::new_from_impl(SparkMakeDate::default())),
- Arc::new(ScalarUDF::new_from_impl(SparkStringSpace::default())),
Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())),
]
}
diff --git a/native/spark-expr/src/string_funcs/mod.rs
b/native/spark-expr/src/string_funcs/mod.rs
index 3523b1f78..919b9dc19 100644
--- a/native/spark-expr/src/string_funcs/mod.rs
+++ b/native/spark-expr/src/string_funcs/mod.rs
@@ -17,10 +17,8 @@
mod contains;
mod split;
-mod string_space;
mod substring;
pub use contains::SparkContains;
pub use split::spark_split;
-pub use string_space::SparkStringSpace;
pub use substring::SubstringExpr;
diff --git a/native/spark-expr/src/string_funcs/string_space.rs
b/native/spark-expr/src/string_funcs/string_space.rs
deleted file mode 100644
index 78d94208d..000000000
--- a/native/spark-expr/src/string_funcs/string_space.rs
+++ /dev/null
@@ -1,197 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use arrow::array::{
- as_dictionary_array, make_array, Array, ArrayData, ArrayRef,
DictionaryArray,
- GenericStringArray, Int32Array, OffsetSizeTrait,
-};
-use arrow::buffer::MutableBuffer;
-use arrow::datatypes::{DataType, Int32Type};
-use datafusion::common::{exec_err, internal_datafusion_err, DataFusionError,
Result, ScalarValue};
-use datafusion::logical_expr::{
- ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
-};
-use std::{any::Any, sync::Arc};
-
-#[derive(Debug, PartialEq, Eq, Hash)]
-pub struct SparkStringSpace {
- signature: Signature,
- aliases: Vec<String>,
-}
-
-impl Default for SparkStringSpace {
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl SparkStringSpace {
- pub fn new() -> Self {
- Self {
- signature: Signature::user_defined(Volatility::Immutable),
- aliases: vec![],
- }
- }
-}
-
-impl ScalarUDFImpl for SparkStringSpace {
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn name(&self) -> &str {
- "string_space"
- }
-
- fn signature(&self) -> &Signature {
- &self.signature
- }
-
- fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
- Ok(match &arg_types[0] {
- DataType::Dictionary(key_type, _) => {
- DataType::Dictionary(key_type.clone(),
Box::new(DataType::Utf8))
- }
- _ => DataType::Utf8,
- })
- }
-
- fn invoke_with_args(&self, args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
- let args: [ColumnarValue; 1] = args
- .args
- .try_into()
- .map_err(|_| internal_datafusion_err!("string_space expects
exactly one argument"))?;
- spark_string_space(&args)
- }
-
- fn aliases(&self) -> &[String] {
- &self.aliases
- }
-}
-
-pub fn spark_string_space(args: &[ColumnarValue; 1]) -> Result<ColumnarValue> {
- match args {
- [ColumnarValue::Array(array)] => {
- let result = string_space_array(&array)?;
- Ok(ColumnarValue::Array(result))
- }
- [ColumnarValue::Scalar(scalar)] => {
- let result = string_space_scalar(scalar)?;
- Ok(ColumnarValue::Scalar(result))
- }
- }
-}
-
-fn string_space_array(length: &dyn Array) -> std::result::Result<ArrayRef,
DataFusionError> {
- match length.data_type() {
- DataType::Int32 => {
- let array = length.as_any().downcast_ref::<Int32Array>().unwrap();
- Ok(generic_string_space::<i32>(array))
- }
- DataType::Dictionary(_, _) => {
- let dict = as_dictionary_array::<Int32Type>(length);
- let values = string_space_array(dict.values())?;
- let result = DictionaryArray::try_new(dict.keys().clone(),
values)?;
- Ok(Arc::new(result))
- }
- other => exec_err!("Unsupported input type for function
'string_space': {other:?}"),
- }
-}
-
-fn string_space_scalar(scalar: &ScalarValue) -> Result<ScalarValue> {
- match scalar {
- ScalarValue::Int32(value) => {
- let result = value.map(|v| {
- if v <= 0 {
- String::new()
- } else {
- " ".repeat(v as usize)
- }
- });
- Ok(ScalarValue::Utf8(result))
- }
- other => {
- exec_err!("Unsupported data type {other:?} for function `space`")
- }
- }
-}
-
-fn generic_string_space<OffsetSize: OffsetSizeTrait>(length: &Int32Array) ->
ArrayRef {
- let array_len = length.len();
- let mut offsets = MutableBuffer::new((array_len + 1) *
std::mem::size_of::<OffsetSize>());
- let mut length_so_far = OffsetSize::zero();
-
- // compute null bitmap (copy)
- let null_bit_buffer = length.to_data().nulls().map(|b| b.buffer().clone());
-
- // Gets slice of length array to access it directly for performance.
- // Negative length values are set to zero to match Spark behavior
- let length_data = length.to_data();
- let lengths: Vec<_> = length_data.buffers()[0]
- .typed_data::<i32>()
- .iter()
- .map(|l| (*l).max(0) as usize)
- .collect();
- let total = lengths.iter().sum::<usize>();
- let mut values = MutableBuffer::new(total);
-
- offsets.push(length_so_far);
-
- let blank = " ".as_bytes()[0];
- values.resize(total, blank);
-
- (0..array_len).for_each(|i| {
- let current_len = lengths[i];
-
- length_so_far += OffsetSize::from_usize(current_len).unwrap();
- offsets.push(length_so_far);
- });
-
- let data = unsafe {
- ArrayData::new_unchecked(
- GenericStringArray::<OffsetSize>::DATA_TYPE,
- array_len,
- None,
- null_bit_buffer,
- 0,
- vec![offsets.into(), values.into()],
- vec![],
- )
- };
- make_array(data)
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use arrow::array::StringArray;
- use datafusion::common::cast::as_string_array;
-
- #[test]
- fn test_negative_length() {
- let input = Int32Array::from(vec![Some(-1), Some(-2), None]);
- let args = ColumnarValue::Array(Arc::new(input));
- match spark_string_space(&[args]) {
- Ok(ColumnarValue::Array(result)) => {
- let actual = as_string_array(&result).unwrap();
- let expected = StringArray::from(vec![Some(""), Some(""),
None]);
- assert_eq!(actual, &expected)
- }
- _ => unreachable!(),
- }
- }
-}
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 9d13ccd9e..c5880e00e 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -168,7 +168,7 @@ object QueryPlanSerde extends Logging with CometExprShim {
classOf[StringReplace] -> CometScalarFunction("replace"),
classOf[StringRPad] -> CometStringRPad,
classOf[StringLPad] -> CometStringLPad,
- classOf[StringSpace] -> CometScalarFunction("string_space"),
+ classOf[StringSpace] -> CometScalarFunction("space"),
classOf[StringSplit] -> CometStringSplit,
classOf[StringTranslate] -> CometScalarFunction("translate"),
classOf[StringTrim] -> CometScalarFunction("trim"),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]