This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new e4df0ef14c [GLUTEN-10377][CH] Move specialized delta internal column 
mapping to CH backend (#10381)
e4df0ef14c is described below

commit e4df0ef14c0155880d77f4dac1f90535eb9e9dba
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Aug 12 10:34:49 2025 +0200

    [GLUTEN-10377][CH] Move specialized delta internal column mapping to CH 
backend (#10381)
---
 .../gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala    |  7 +++++++
 .../main/scala/org/apache/spark/util/SparkVersionUtil.scala   |  2 ++
 .../org/apache/gluten/backendsapi/SparkPlanExecApi.scala      |  6 +++++-
 .../apache/gluten/execution/BasicScanExecTransformer.scala    |  5 +++--
 .../gluten/execution/FileSourceScanExecTransformer.scala      | 11 +++++++++++
 .../org/apache/gluten/sql/shims/spark35/Spark35Shims.scala    |  3 +--
 .../apache/spark/sql/execution/FileSourceScanExecShim.scala   |  7 +++----
 7 files changed, 32 insertions(+), 9 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 12e775086c..ce7d4e0c7b 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -57,6 +57,7 @@ import org.apache.spark.sql.execution.utils.{CHExecUtil, 
PushDownUtil}
 import org.apache.spark.sql.execution.window._
 import org.apache.spark.sql.types.{DecimalType, StructType}
 import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.SparkVersionUtil
 
 import org.apache.commons.lang3.ClassUtils
 
@@ -999,4 +1000,10 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
 
   override def genColumnarToCarrierRow(plan: SparkPlan): SparkPlan =
     CHColumnarToCarrierRowExec.enforce(plan)
+
+  override def isRowIndexMetadataColumn(columnName: String): Boolean = {
+    SparkShimLoader.getSparkShims.isRowIndexMetadataColumn(
+      columnName) || (SparkVersionUtil.gteSpark35 && 
columnName.equalsIgnoreCase(
+      "__delta_internal_is_row_deleted"))
+  }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala 
b/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
index 49da44f257..7a02698cdc 100644
--- a/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
@@ -19,9 +19,11 @@ package org.apache.spark.util
 object SparkVersionUtil {
   val lteSpark32: Boolean = compareMajorMinorVersion((3, 2)) <= 0
   private val comparedWithSpark33 = compareMajorMinorVersion((3, 3))
+  private val comparedWithSpark35 = compareMajorMinorVersion((3, 5))
   val eqSpark33: Boolean = comparedWithSpark33 == 0
   val lteSpark33: Boolean = lteSpark32 || eqSpark33
   val gteSpark33: Boolean = comparedWithSpark33 >= 0
+  val gteSpark35: Boolean = comparedWithSpark35 >= 0
 
   // Returns X. X < 0 if one < other, x == 0 if one == other, x > 0 if one > 
other.
   def compareMajorMinorVersion(other: (Int, Int)): Int = {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index f4ebbed5bd..8676014427 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -644,7 +644,7 @@ trait SparkPlanExecApi {
       val pushedFilters =
         dataFilters ++ FilterHandler.getRemainingFilters(dataFilters, 
extraFilters)
       pushedFilters.filterNot(_.references.exists {
-        attr => 
SparkShimLoader.getSparkShims.isRowIndexMetadataColumn(attr.name)
+        attr => 
BackendsApiManager.getSparkPlanExecApiInstance.isRowIndexMetadataColumn(attr.name)
       })
     }
     sparkExecNode match {
@@ -769,4 +769,8 @@ trait SparkPlanExecApi {
       original: Expression): ExpressionTransformer = {
     throw new GlutenNotSupportException("timestampdiff is not supported")
   }
+
+  def isRowIndexMetadataColumn(columnName: String): Boolean = {
+    SparkShimLoader.getSparkShims.isRowIndexMetadataColumn(columnName)
+  }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 0fcf2b0d58..0c38f645ce 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -19,7 +19,6 @@ package org.apache.gluten.execution
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter}
-import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.`type`.ColumnTypeNode
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.extensions.ExtensionBuilder
@@ -125,7 +124,9 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
       attr =>
         if (getPartitionSchema.exists(_.name.equals(attr.name))) {
           new ColumnTypeNode(NamedStruct.ColumnType.PARTITION_COL)
-        } else if 
(SparkShimLoader.getSparkShims.isRowIndexMetadataColumn(attr.name)) {
+        } else if (
+          
BackendsApiManager.getSparkPlanExecApiInstance.isRowIndexMetadataColumn(attr.name)
+        ) {
           new ColumnTypeNode(NamedStruct.ColumnType.ROWINDEX_COL)
         } else if (attr.isMetadataCol) {
           new ColumnTypeNode(NamedStruct.ColumnType.METADATA_COL)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index e9740da62d..24dd83caef 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.FileSourceScanExecShim
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SparkVersionUtil
 import org.apache.spark.util.collection.BitSet
 
 import org.apache.commons.lang3.StringUtils
@@ -109,6 +110,16 @@ abstract class FileSourceScanExecTransformerBase(
         output)
   }
 
+  override def dataFiltersInScan: Seq[Expression] = {
+    if (SparkVersionUtil.gteSpark35) {
+      dataFilters.filterNot(_.references.exists {
+        attr => 
BackendsApiManager.getSparkPlanExecApiInstance.isRowIndexMetadataColumn(attr.name)
+      })
+    } else {
+      super.dataFiltersInScan
+    }
+  }
+
   override def getMetadataColumns(): Seq[AttributeReference] = metadataColumns
 
   override def outputAttributes(): Seq[Attribute] = output
diff --git 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index d0d9408534..cf1066bdca 100644
--- 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -400,8 +400,7 @@ class Spark35Shims extends SparkShims {
   }
 
   def isRowIndexMetadataColumn(name: String): Boolean =
-    name == ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME ||
-      name.equalsIgnoreCase("__delta_internal_is_row_deleted")
+    name == ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
 
   def findRowIndexColumnIndexInSchema(sparkSchema: StructType): Int = {
     sparkSchema.fields.zipWithIndex.find {
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 463afbbca4..676b60d318 100644
--- 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -17,7 +17,6 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.metrics.GlutenTimeMetric
-import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
@@ -59,9 +58,9 @@ abstract class FileSourceScanExecShim(
 
   protected lazy val driverMetricsAlias = driverMetrics
 
-  def dataFiltersInScan: Seq[Expression] = 
dataFilters.filterNot(_.references.exists {
-    attr => SparkShimLoader.getSparkShims.isRowIndexMetadataColumn(attr.name)
-  })
+  def dataFiltersInScan: Seq[Expression] = {
+    throw new UnsupportedOperationException("Not implemented")
+  }
 
   def hasUnsupportedColumns: Boolean = {
     // TODO, fallback if user define same name column due to we can't right now


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to