This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 70110f229a [spark] Fix RowTrackingTable reading by add _ROW_ID 
existing check before adding _ROW_ID (#7606)
70110f229a is described below

commit 70110f229aa0cf30df0cef81cce7cfb219dc2a58
Author: littlecoder04 <[email protected]>
AuthorDate: Thu Apr 9 20:49:39 2026 +0800

    [spark] Fix RowTrackingTable reading by add _ROW_ID existing check before 
adding _ROW_ID (#7606)
---
 .../org/apache/paimon/spark/PaimonBaseScanBuilder.scala |  5 ++++-
 .../scala/org/apache/paimon/spark/read/BaseScan.scala   |  5 ++++-
 .../apache/paimon/spark/sql/RowTrackingTestBase.scala   | 17 +++++++++++++++++
 3 files changed, 25 insertions(+), 2 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 14a090ff14..c249169ae7 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -69,7 +69,10 @@ abstract class PaimonBaseScanBuilder
     val postScan = mutable.ArrayBuffer.empty[SparkPredicate]
 
     var newRowType = rowType
-    if (coreOptions.rowTrackingEnabled() && 
coreOptions.dataEvolutionEnabled()) {
+    if (
+      coreOptions.rowTrackingEnabled() && coreOptions
+        .dataEvolutionEnabled() && 
!rowType.containsField(SpecialFields.ROW_ID.name())
+    ) {
       newRowType = SpecialFields.rowTypeWithRowTracking(newRowType);
     }
     val converter = SparkV2FilterConverter(newRowType)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
index 42f724d3e6..3237475d69 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
@@ -68,7 +68,10 @@ trait BaseScan extends Scan with SupportsReportStatistics 
with Logging {
   val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
 
   lazy val tableRowType: RowType = {
-    if (coreOptions.rowTrackingEnabled()) {
+    if (
+      coreOptions
+        .rowTrackingEnabled() && 
!table.rowType().containsField(SpecialFields.ROW_ID.name())
+    ) {
       SpecialFields.rowTypeWithRowTracking(table.rowType())
     } else {
       table.rowType()
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index 0ea9d21bcf..e012656cc8 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -795,6 +795,22 @@ abstract class RowTrackingTestBase extends 
PaimonSparkTestBase {
     }
   }
 
+  test("Row Tracking: query row_tracking system table with filter pushdown") {
+    withTable("t") {
+      sql("CREATE TABLE t (a INT, b INT) TBLPROPERTIES ('row-tracking.enabled' 
= 'true')")
+      sql("INSERT INTO t VALUES (1, 10), (2, 20), (3, 30)")
+
+      val query = s"SELECT a, b FROM `t$$row_tracking` WHERE a > 1 ORDER BY a"
+      checkAnswer(sql(query), Seq(Row(2, 20), Row(3, 30)))
+
+      val scan = getScan(query)
+      assert(
+        scan.description().contains("DataFilters"),
+        s"Expected predicate pushdown (DataFilters) in scan description, but 
got: ${scan.description()}"
+      )
+    }
+  }
+
   test("Data Evolution: compact fields action") {
     withTable("s", "t") {
       sql("CREATE TABLE s (id INT, b INT)")
@@ -952,4 +968,5 @@ abstract class RowTrackingTestBase extends 
PaimonSparkTestBase {
       assert(!indexEntries.exists(entry => 
entry.partition().getString(0).toString.equals("p1")))
     }
   }
+
 }

Reply via email to