This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new e4df0ef14c [GLUTEN-10377][CH] Move specialized delta internal column
mapping to CH backend (#10381)
e4df0ef14c is described below
commit e4df0ef14c0155880d77f4dac1f90535eb9e9dba
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Aug 12 10:34:49 2025 +0200
[GLUTEN-10377][CH] Move specialized delta internal column mapping to CH
backend (#10381)
---
.../gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala | 7 +++++++
.../main/scala/org/apache/spark/util/SparkVersionUtil.scala | 2 ++
.../org/apache/gluten/backendsapi/SparkPlanExecApi.scala | 6 +++++-
.../apache/gluten/execution/BasicScanExecTransformer.scala | 5 +++--
.../gluten/execution/FileSourceScanExecTransformer.scala | 11 +++++++++++
.../org/apache/gluten/sql/shims/spark35/Spark35Shims.scala | 3 +--
.../apache/spark/sql/execution/FileSourceScanExecShim.scala | 7 +++----
7 files changed, 32 insertions(+), 9 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 12e775086c..ce7d4e0c7b 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -57,6 +57,7 @@ import org.apache.spark.sql.execution.utils.{CHExecUtil,
PushDownUtil}
import org.apache.spark.sql.execution.window._
import org.apache.spark.sql.types.{DecimalType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.SparkVersionUtil
import org.apache.commons.lang3.ClassUtils
@@ -999,4 +1000,10 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
override def genColumnarToCarrierRow(plan: SparkPlan): SparkPlan =
CHColumnarToCarrierRowExec.enforce(plan)
+
+ override def isRowIndexMetadataColumn(columnName: String): Boolean = {
+ SparkShimLoader.getSparkShims.isRowIndexMetadataColumn(
+ columnName) || (SparkVersionUtil.gteSpark35 &&
columnName.equalsIgnoreCase(
+ "__delta_internal_is_row_deleted"))
+ }
}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
b/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
index 49da44f257..7a02698cdc 100644
--- a/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
@@ -19,9 +19,11 @@ package org.apache.spark.util
object SparkVersionUtil {
val lteSpark32: Boolean = compareMajorMinorVersion((3, 2)) <= 0
private val comparedWithSpark33 = compareMajorMinorVersion((3, 3))
+ private val comparedWithSpark35 = compareMajorMinorVersion((3, 5))
val eqSpark33: Boolean = comparedWithSpark33 == 0
val lteSpark33: Boolean = lteSpark32 || eqSpark33
val gteSpark33: Boolean = comparedWithSpark33 >= 0
+ val gteSpark35: Boolean = comparedWithSpark35 >= 0
// Returns X. X < 0 if one < other, x == 0 if one == other, x > 0 if one >
other.
def compareMajorMinorVersion(other: (Int, Int)): Int = {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index f4ebbed5bd..8676014427 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -644,7 +644,7 @@ trait SparkPlanExecApi {
val pushedFilters =
dataFilters ++ FilterHandler.getRemainingFilters(dataFilters,
extraFilters)
pushedFilters.filterNot(_.references.exists {
- attr =>
SparkShimLoader.getSparkShims.isRowIndexMetadataColumn(attr.name)
+ attr =>
BackendsApiManager.getSparkPlanExecApiInstance.isRowIndexMetadataColumn(attr.name)
})
}
sparkExecNode match {
@@ -769,4 +769,8 @@ trait SparkPlanExecApi {
original: Expression): ExpressionTransformer = {
throw new GlutenNotSupportException("timestampdiff is not supported")
}
+
+ def isRowIndexMetadataColumn(columnName: String): Boolean = {
+ SparkShimLoader.getSparkShims.isRowIndexMetadataColumn(columnName)
+ }
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 0fcf2b0d58..0c38f645ce 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -19,7 +19,6 @@ package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter}
-import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.`type`.ColumnTypeNode
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.extensions.ExtensionBuilder
@@ -125,7 +124,9 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
attr =>
if (getPartitionSchema.exists(_.name.equals(attr.name))) {
new ColumnTypeNode(NamedStruct.ColumnType.PARTITION_COL)
- } else if
(SparkShimLoader.getSparkShims.isRowIndexMetadataColumn(attr.name)) {
+ } else if (
+
BackendsApiManager.getSparkPlanExecApiInstance.isRowIndexMetadataColumn(attr.name)
+ ) {
new ColumnTypeNode(NamedStruct.ColumnType.ROWINDEX_COL)
} else if (attr.isMetadataCol) {
new ColumnTypeNode(NamedStruct.ColumnType.METADATA_COL)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index e9740da62d..24dd83caef 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.FileSourceScanExecShim
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SparkVersionUtil
import org.apache.spark.util.collection.BitSet
import org.apache.commons.lang3.StringUtils
@@ -109,6 +110,16 @@ abstract class FileSourceScanExecTransformerBase(
output)
}
+ override def dataFiltersInScan: Seq[Expression] = {
+ if (SparkVersionUtil.gteSpark35) {
+ dataFilters.filterNot(_.references.exists {
+ attr =>
BackendsApiManager.getSparkPlanExecApiInstance.isRowIndexMetadataColumn(attr.name)
+ })
+ } else {
+ super.dataFiltersInScan
+ }
+ }
+
override def getMetadataColumns(): Seq[AttributeReference] = metadataColumns
override def outputAttributes(): Seq[Attribute] = output
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index d0d9408534..cf1066bdca 100644
---
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -400,8 +400,7 @@ class Spark35Shims extends SparkShims {
}
def isRowIndexMetadataColumn(name: String): Boolean =
- name == ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME ||
- name.equalsIgnoreCase("__delta_internal_is_row_deleted")
+ name == ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int = {
sparkSchema.fields.zipWithIndex.find {
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 463afbbca4..676b60d318 100644
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution
import org.apache.gluten.metrics.GlutenTimeMetric
-import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
@@ -59,9 +58,9 @@ abstract class FileSourceScanExecShim(
protected lazy val driverMetricsAlias = driverMetrics
- def dataFiltersInScan: Seq[Expression] =
dataFilters.filterNot(_.references.exists {
- attr => SparkShimLoader.getSparkShims.isRowIndexMetadataColumn(attr.name)
- })
+ def dataFiltersInScan: Seq[Expression] = {
+ throw new UnsupportedOperationException("Not implemented")
+ }
def hasUnsupportedColumns: Boolean = {
// TODO, fallback if user define same name column due to we can't right now
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]