This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new cdce1965e2 [VL] Support cast from array to string (#10300)
cdce1965e2 is described below
commit cdce1965e217a9b0f347d50d7678c1be71a8308c
Author: Mingliang Zhu <[email protected]>
AuthorDate: Wed Aug 6 23:59:28 2025 +0800
[VL] Support cast from array to string (#10300)
---
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 1 +
.../org/apache/gluten/config/VeloxConfig.scala | 11 ++++
.../gluten/extension/RewriteCastFromArray.scala | 58 ++++++++++++++++++++
.../functions/ScalarFunctionsValidateSuite.scala | 32 +++++++++++
.../gluten/utils/velox/VeloxTestSettings.scala | 2 +
.../spark/sql/GlutenDataFrameFunctionsSuite.scala | 62 ++++++++++++++++++++++
6 files changed, 166 insertions(+)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 53be562dfb..cdb6f14bf0 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -63,6 +63,7 @@ object VeloxRuleApi {
injector.injectOptimizerRule(CollectRewriteRule.apply)
injector.injectOptimizerRule(HLLRewriteRule.apply)
injector.injectOptimizerRule(CollapseGetJsonObjectExpressionRule.apply)
+ injector.injectOptimizerRule(RewriteCastFromArray.apply)
injector.injectPostHocResolutionRule(ArrowConvertorRule.apply)
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index d4d0d0af32..fd451d1ffd 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -68,6 +68,9 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
getConf(VELOX_PROPAGATE_IGNORE_NULL_KEYS_ENABLED)
def floatingPointMode: String = getConf(FLOATING_POINT_MODE)
+
+ def enableRewriteCastArrayToString: Boolean =
+ getConf(ENABLE_REWRITE_CAST_ARRAY_TO_STRING)
}
object VeloxConfig {
@@ -647,4 +650,12 @@ object VeloxConfig {
" with `--enable_jemalloc_stats=ON`.")
.booleanConf
.createWithDefault(false)
+
+ val ENABLE_REWRITE_CAST_ARRAY_TO_STRING =
+ buildConf("spark.gluten.sql.rewrite.castArrayToString")
+ .internal()
+ .doc("When true, rewrite `cast(array as String)` to" +
+ " `concat('[', array_join(array, ', ', null), ']')` to allow
offloading to Velox.")
+ .booleanConf
+ .createWithDefault(true)
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/RewriteCastFromArray.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/RewriteCastFromArray.scala
new file mode 100644
index 0000000000..4821418852
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/RewriteCastFromArray.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.config.VeloxConfig
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{ArrayJoin, Cast, Concat,
Literal}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.CAST
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, StringType}
+
+/**
+ * Velox does not support cast Array to String. Before velox support,
temporarily add this rule to
+ * replace `cast(array as String)` with `concat('[', array_join(array, ', ',
'null'), ']')` to
+ * support offload.
+ */
+case class RewriteCastFromArray(spark: SparkSession) extends Rule[LogicalPlan]
{
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ if (
+ !VeloxConfig.get.enableRewriteCastArrayToString ||
+ SQLConf.get.getConf(SQLConf.LEGACY_COMPLEX_TYPES_TO_STRING)
+ ) {
+ return plan
+ }
+ plan.transformUpWithPruning(_.containsPattern(CAST)) {
+ case p =>
+ p.transformExpressionsUpWithPruning(_.containsPattern(CAST)) {
+ case Cast(child, StringType, timeZoneId, evalMode)
+ if child.dataType.isInstanceOf[ArrayType] =>
+ val joinChild = child.dataType.asInstanceOf[ArrayType].elementType
match {
+ case StringType =>
+ child
+ case _ =>
+ Cast(child, ArrayType(StringType), timeZoneId, evalMode)
+ }
+ val arrayJoin = ArrayJoin(joinChild, Literal(", "),
Some(Literal("null")))
+ Concat(Seq(Literal("["), arrayJoin, Literal("]")))
+ }
+ }
+ }
+}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/functions/ScalarFunctionsValidateSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/functions/ScalarFunctionsValidateSuite.scala
index 38204daf4b..81259f7595 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/functions/ScalarFunctionsValidateSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/functions/ScalarFunctionsValidateSuite.scala
@@ -1175,6 +1175,38 @@ abstract class ScalarFunctionsValidateSuite extends
FunctionsValidateSuite {
runQueryAndCompare("select cast(array(timestamp'2024-01-01 12:00:00') AS
array<string>)") {
checkGlutenOperatorMatch[ProjectExecTransformer]
}
+ // Cast Array as String
+ withTempView("cast_table") {
+ withTempPath {
+ path =>
+ Seq[Array[String]](Array("a", null), Array(), null)
+ .toDF("c1")
+ .write
+ .parquet(path.getCanonicalPath)
+
spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("cast_table")
+ runQueryAndCompare("select cast(c1 as string) from cast_table") {
+ checkGlutenOperatorMatch[ProjectExecTransformer]
+ }
+ }
+ }
+ runQueryAndCompare("select cast(array(1, 2) AS string)") {
+ checkGlutenOperatorMatch[ProjectExecTransformer]
+ }
+ runQueryAndCompare("select cast(array(1L, null) AS string)") {
+ checkGlutenOperatorMatch[ProjectExecTransformer]
+ }
+ runQueryAndCompare("select cast(array(1.1d, null) AS string)") {
+ checkGlutenOperatorMatch[ProjectExecTransformer]
+ }
+ runQueryAndCompare("select cast(array(false, null) AS string)") {
+ checkGlutenOperatorMatch[ProjectExecTransformer]
+ }
+ runQueryAndCompare("select cast(array(date'2024-01-01') AS string)") {
+ checkGlutenOperatorMatch[ProjectExecTransformer]
+ }
+ runQueryAndCompare("select cast(array(timestamp'2024-01-01 12:00:00') AS
string)") {
+ checkGlutenOperatorMatch[ProjectExecTransformer]
+ }
}
testWithMinSparkVersion("equal_null", "3.4") {
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 819d0a69ee..4055ff970d 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -701,6 +701,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("aggregate function - array for non-primitive type")
// Rewrite this test because Velox sorts rows by key for primitive data
types, which disrupts the original row sequence.
.exclude("map_zip_with function - map of primitive types")
+ // Exception class different.
+ .exclude("array_insert functions")
enableSuite[GlutenDataFrameHintSuite]
enableSuite[GlutenDataFrameImplicitsSuite]
enableSuite[GlutenDataFrameJoinSuite]
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenDataFrameFunctionsSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenDataFrameFunctionsSuite.scala
index 2b0b40790a..1725b98e75 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenDataFrameFunctionsSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenDataFrameFunctionsSuite.scala
@@ -16,7 +16,11 @@
*/
package org.apache.spark.sql
+import org.apache.gluten.exception.GlutenException
+
+import org.apache.spark.SparkException
import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
class GlutenDataFrameFunctionsSuite extends DataFrameFunctionsSuite with
GlutenSQLTestsTrait {
import testImplicits._
@@ -49,4 +53,62 @@ class GlutenDataFrameFunctionsSuite extends
DataFrameFunctionsSuite with GlutenS
false
)
}
+
+ testGluten("array_insert functions") {
+ val fiveShort: Short = 5
+
+ val df1 = Seq((Array[Integer](3, 2, 5, 1, 2), 6, 3)).toDF("a", "b", "c")
+ val df2 = Seq((Array[Short](1, 2, 3, 4), 5, fiveShort)).toDF("a", "b", "c")
+ val df3 = Seq((Array[Double](3.0, 2.0, 5.0, 1.0, 2.0), 2, 3.0)).toDF("a",
"b", "c")
+ val df4 = Seq((Array[Boolean](true, false), 3, false)).toDF("a", "b", "c")
+ val df5 = Seq((Array[String]("a", "b", "c"), 0, "d")).toDF("a", "b", "c")
+ val df6 = Seq((Array[String]("a", null, "b", "c"), 5, "d")).toDF("a", "b",
"c")
+
+ checkAnswer(df1.selectExpr("array_insert(a, b, c)"), Seq(Row(Seq(3, 2, 5,
1, 2, 3))))
+ checkAnswer(df2.selectExpr("array_insert(a, b, c)"), Seq(Row(Seq[Short](1,
2, 3, 4, 5))))
+ checkAnswer(
+ df3.selectExpr("array_insert(a, b, c)"),
+ Seq(Row(Seq[Double](3.0, 3.0, 2.0, 5.0, 1.0, 2.0)))
+ )
+ checkAnswer(df4.selectExpr("array_insert(a, b, c)"), Seq(Row(Seq(true,
false, false))))
+
+ val e1 = intercept[SparkException] {
+ df5.selectExpr("array_insert(a, b, c)").show()
+ }
+ assert(e1.getCause.isInstanceOf[GlutenException])
+// checkError(
+// exception = e1.getCause.asInstanceOf[SparkRuntimeException],
+// errorClass = "INVALID_INDEX_OF_ZERO",
+// parameters = Map.empty,
+// context = ExpectedContext(
+// fragment = "array_insert(a, b, c)",
+// start = 0,
+// stop = 20)
+// )
+
+ checkAnswer(
+ df5.select(array_insert(col("a"), lit(1), col("c"))),
+ Seq(Row(Seq("d", "a", "b", "c")))
+ )
+ // null checks
+ checkAnswer(df6.selectExpr("array_insert(a, b, c)"), Seq(Row(Seq("a",
null, "b", "c", "d"))))
+ checkAnswer(
+ df6.select(array_insert(col("a"), col("b"), lit(null).cast("string"))),
+ Seq(Row(Seq("a", null, "b", "c", null)))
+ )
+ checkAnswer(
+ df5.select(array_insert(col("a"), lit(null).cast("integer"), col("c"))),
+ Seq(Row(null))
+ )
+ checkAnswer(
+ df5.select(array_insert(lit(null).cast("array<string>"), col("b"),
col("c"))),
+ Seq(Row(null))
+ )
+ checkAnswer(df1.selectExpr("array_insert(a, 7, c)"), Seq(Row(Seq(3, 2, 5,
1, 2, null, 3))))
+ checkAnswer(df1.selectExpr("array_insert(a, -6, c)"), Seq(Row(Seq(3, 3, 2,
5, 1, 2))))
+
+ withSQLConf(SQLConf.LEGACY_NEGATIVE_INDEX_IN_ARRAY_INSERT.key -> "true") {
+ checkAnswer(df1.selectExpr("array_insert(a, -6, c)"), Seq(Row(Seq(3,
null, 3, 2, 5, 1, 2))))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]