This is an automated email from the ASF dual-hosted git repository.
mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new bae59b5cd chore: use datafusion impl of `bit_count` function (#3616)
bae59b5cd is described below
commit bae59b5cd55fa07afcbbf218762cc994659006fa
Author: Kazantsev Maksim <[email protected]>
AuthorDate: Tue Mar 3 22:29:03 2026 +0400
chore: use datafusion impl of `bit_count` function (#3616)
* impl map_from_entries
* Revert "impl map_from_entries"
This reverts commit 768b3e90f261c7aea58bdb98dc698b90deeeae34.
* Use datafusion impl of bit_count
* fix fmt
* Resolve conflicts
---------
Co-authored-by: Kazantsev Maksim <[email protected]>
---
native/core/src/execution/jni_api.rs | 4 +-
.../spark-expr/src/bitwise_funcs/bitwise_count.rs | 193 ---------------------
native/spark-expr/src/bitwise_funcs/mod.rs | 20 ---
native/spark-expr/src/comet_scalar_funcs.rs | 5 +-
native/spark-expr/src/lib.rs | 2 -
.../scala/org/apache/comet/serde/bitwise.scala | 14 +-
6 files changed, 7 insertions(+), 231 deletions(-)
diff --git a/native/core/src/execution/jni_api.rs
b/native/core/src/execution/jni_api.rs
index a090dd89d..1030e30aa 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -40,6 +40,7 @@ use datafusion::{
prelude::{SessionConfig, SessionContext},
};
use datafusion_comet_proto::spark_operator::Operator;
+use datafusion_spark::function::bitwise::bit_count::SparkBitCount;
use datafusion_spark::function::bitwise::bit_get::SparkBitGet;
use datafusion_spark::function::bitwise::bitwise_not::SparkBitwiseNot;
use datafusion_spark::function::datetime::date_add::SparkDateAdd;
@@ -55,6 +56,7 @@ use datafusion_spark::function::math::hex::SparkHex;
use datafusion_spark::function::math::width_bucket::SparkWidthBucket;
use datafusion_spark::function::string::char::CharFunc;
use datafusion_spark::function::string::concat::SparkConcat;
+use datafusion_spark::function::string::space::SparkSpace;
use futures::poll;
use futures::stream::StreamExt;
use jni::objects::JByteBuffer;
@@ -401,6 +403,7 @@ fn register_datafusion_spark_function(session_ctx:
&SessionContext) {
session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkCrc32::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSpace::default()));
+
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitCount::default()));
}
/// Prepares arrow arrays for output.
@@ -911,7 +914,6 @@ pub unsafe extern "system" fn
Java_org_apache_comet_Native_logMemoryUsage(
use crate::execution::columnar_to_row::ColumnarToRowContext;
use arrow::ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema};
-use datafusion_spark::function::string::space::SparkSpace;
/// Initialize a native columnar to row converter.
///
diff --git a/native/spark-expr/src/bitwise_funcs/bitwise_count.rs
b/native/spark-expr/src/bitwise_funcs/bitwise_count.rs
deleted file mode 100644
index b65c50732..000000000
--- a/native/spark-expr/src/bitwise_funcs/bitwise_count.rs
+++ /dev/null
@@ -1,193 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use arrow::{array::*, datatypes::DataType};
-use datafusion::common::{exec_err, internal_datafusion_err, Result};
-use datafusion::logical_expr::{ScalarFunctionArgs, ScalarUDFImpl, Signature,
Volatility};
-use datafusion::{error::DataFusionError, logical_expr::ColumnarValue};
-use std::any::Any;
-use std::sync::Arc;
-
-#[derive(Debug, PartialEq, Eq, Hash)]
-pub struct SparkBitwiseCount {
- signature: Signature,
- aliases: Vec<String>,
-}
-
-impl Default for SparkBitwiseCount {
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl SparkBitwiseCount {
- pub fn new() -> Self {
- Self {
- signature: Signature::user_defined(Volatility::Immutable),
- aliases: vec![],
- }
- }
-}
-
-impl ScalarUDFImpl for SparkBitwiseCount {
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn name(&self) -> &str {
- "bit_count"
- }
-
- fn signature(&self) -> &Signature {
- &self.signature
- }
-
- fn return_type(&self, _: &[DataType]) -> Result<DataType> {
- Ok(DataType::Int32)
- }
-
- fn invoke_with_args(&self, args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
- let args: [ColumnarValue; 1] = args
- .args
- .try_into()
- .map_err(|_| internal_datafusion_err!("bit_count expects exactly
one argument"))?;
- spark_bit_count(args)
- }
-
- fn aliases(&self) -> &[String] {
- &self.aliases
- }
-}
-
-macro_rules! compute_op {
- ($OPERAND:expr, $DT:ident) => {{
- let operand = $OPERAND.as_any().downcast_ref::<$DT>().ok_or_else(|| {
- DataFusionError::Execution(format!(
- "compute_op failed to downcast array to: {:?}",
- stringify!($DT)
- ))
- })?;
-
- let result: Int32Array = operand
- .iter()
- .map(|x| x.map(|y| bit_count(y.into())))
- .collect();
-
- Ok(Arc::new(result))
- }};
-}
-
-pub fn spark_bit_count(args: [ColumnarValue; 1]) -> Result<ColumnarValue> {
- match args {
- [ColumnarValue::Array(array)] => {
- let result: Result<ArrayRef> = match array.data_type() {
- DataType::Int8 | DataType::Boolean => compute_op!(array,
Int8Array),
- DataType::Int16 => compute_op!(array, Int16Array),
- DataType::Int32 => compute_op!(array, Int32Array),
- DataType::Int64 => compute_op!(array, Int64Array),
- _ => exec_err!("bit_count can't be evaluated because the
array's type is {:?}, not signed int/boolean", array.data_type()),
- };
- result.map(ColumnarValue::Array)
- }
- [ColumnarValue::Scalar(scalar)] => {
- use datafusion::common::ScalarValue;
- let result = match scalar {
- ScalarValue::Int8(Some(v)) => bit_count(v as i64),
- ScalarValue::Int16(Some(v)) => bit_count(v as i64),
- ScalarValue::Int32(Some(v)) => bit_count(v as i64),
- ScalarValue::Int64(Some(v)) => bit_count(v),
- ScalarValue::Boolean(Some(v)) => bit_count(if v { 1 } else { 0
}),
- ScalarValue::Int8(None)
- | ScalarValue::Int16(None)
- | ScalarValue::Int32(None)
- | ScalarValue::Int64(None)
- | ScalarValue::Boolean(None) => {
- return Ok(ColumnarValue::Scalar(ScalarValue::Int32(None)))
- }
- _ => {
- return exec_err!(
- "bit_count can't be evaluated because the scalar's
type is {:?}, not signed int/boolean",
- scalar.data_type()
- )
- }
- };
- Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(result))))
- }
- }
-}
-
-// Here’s the equivalent Rust implementation of the bitCount function (similar
to Java's bitCount for LongType)
-fn bit_count(i: i64) -> i32 {
- let mut u = i as u64;
- u = u - ((u >> 1) & 0x5555555555555555);
- u = (u & 0x3333333333333333) + ((u >> 2) & 0x3333333333333333);
- u = (u + (u >> 4)) & 0x0f0f0f0f0f0f0f0f;
- u = u + (u >> 8);
- u = u + (u >> 16);
- u = u + (u >> 32);
- (u as i32) & 0x7f
-}
-
-#[cfg(test)]
-mod tests {
- use datafusion::common::{cast::as_int32_array, Result, ScalarValue};
-
- use super::*;
-
- #[test]
- fn bitwise_count_op() -> Result<()> {
- let args = ColumnarValue::Array(Arc::new(Int32Array::from(vec![
- Some(1),
- None,
- Some(12345),
- Some(89),
- Some(-3456),
- Some(i32::MIN),
- Some(i32::MAX),
- ])));
- let expected = &Int32Array::from(vec![
- Some(1),
- None,
- Some(6),
- Some(4),
- Some(54),
- Some(33),
- Some(31),
- ]);
-
- let ColumnarValue::Array(result) = spark_bit_count([args])? else {
- unreachable!()
- };
-
- let result = as_int32_array(&result).expect("failed to downcast to
In32Array");
- assert_eq!(result, expected);
-
- Ok(())
- }
-
- #[test]
- fn bitwise_count_scalar() {
- let args = ColumnarValue::Scalar(ScalarValue::Int64(Some(i64::MAX)));
-
- match spark_bit_count([args]) {
- Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(actual)))) => {
- assert_eq!(actual, 63)
- }
- _ => unreachable!(),
- }
- }
-}
diff --git a/native/spark-expr/src/bitwise_funcs/mod.rs
b/native/spark-expr/src/bitwise_funcs/mod.rs
deleted file mode 100644
index d96857e23..000000000
--- a/native/spark-expr/src/bitwise_funcs/mod.rs
+++ /dev/null
@@ -1,20 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-mod bitwise_count;
-
-pub use bitwise_count::SparkBitwiseCount;
diff --git a/native/spark-expr/src/comet_scalar_funcs.rs
b/native/spark-expr/src/comet_scalar_funcs.rs
index 08b34bc26..ff75de763 100644
--- a/native/spark-expr/src/comet_scalar_funcs.rs
+++ b/native/spark-expr/src/comet_scalar_funcs.rs
@@ -22,8 +22,8 @@ use crate::math_funcs::modulo_expr::spark_modulo;
use crate::{
spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor,
spark_isnan,
spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round,
spark_rpad, spark_unhex,
- spark_unscaled_value, EvalMode, SparkBitwiseCount, SparkContains,
SparkDateDiff,
- SparkDateTrunc, SparkMakeDate, SparkSizeFunc,
+ spark_unscaled_value, EvalMode, SparkContains, SparkDateDiff,
SparkDateTrunc, SparkMakeDate,
+ SparkSizeFunc,
};
use arrow::datatypes::DataType;
use datafusion::common::{DataFusionError, Result as DataFusionResult};
@@ -191,7 +191,6 @@ pub fn create_comet_physical_fun_with_eval_mode(
fn all_scalar_functions() -> Vec<Arc<ScalarUDF>> {
vec![
- Arc::new(ScalarUDF::new_from_impl(SparkBitwiseCount::default())),
Arc::new(ScalarUDF::new_from_impl(SparkContains::default())),
Arc::new(ScalarUDF::new_from_impl(SparkDateDiff::default())),
Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())),
diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs
index 40eb180ab..992bd94b0 100644
--- a/native/spark-expr/src/lib.rs
+++ b/native/spark-expr/src/lib.rs
@@ -41,7 +41,6 @@ pub use predicate_funcs::{spark_isnan, RLike};
mod agg_funcs;
mod array_funcs;
-mod bitwise_funcs;
mod comet_scalar_funcs;
pub mod hash_funcs;
@@ -61,7 +60,6 @@ mod math_funcs;
mod nondetermenistic_funcs;
pub use array_funcs::*;
-pub use bitwise_funcs::*;
pub use conditional_funcs::*;
pub use conversion_funcs::*;
pub use nondetermenistic_funcs::*;
diff --git a/spark/src/main/scala/org/apache/comet/serde/bitwise.scala
b/spark/src/main/scala/org/apache/comet/serde/bitwise.scala
index 8020502ab..751fb7521 100644
--- a/spark/src/main/scala/org/apache/comet/serde/bitwise.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/bitwise.scala
@@ -20,7 +20,7 @@
package org.apache.comet.serde
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{ByteType, IntegerType, LongType}
+import org.apache.spark.sql.types.{ByteType, LongType}
import org.apache.comet.serde.QueryPlanSerde._
@@ -140,14 +140,4 @@ object CometBitwiseGet extends
CometExpressionSerde[BitwiseGet] {
}
}
-object CometBitwiseCount extends CometExpressionSerde[BitwiseCount] {
- override def convert(
- expr: BitwiseCount,
- inputs: Seq[Attribute],
- binding: Boolean): Option[ExprOuterClass.Expr] = {
- val childProto = exprToProto(expr.child, inputs, binding)
- val bitCountScalarExpr =
- scalarFunctionExprToProtoWithReturnType("bit_count", IntegerType, false,
childProto)
- optExprWithInfo(bitCountScalarExpr, expr, expr.children: _*)
- }
-}
+object CometBitwiseCount extends CometScalarFunction[BitwiseCount]("bit_count")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]