This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 70110f229a [spark] Fix RowTrackingTable reading by add _ROW_ID
existing check before adding _ROW_ID (#7606)
70110f229a is described below
commit 70110f229aa0cf30df0cef81cce7cfb219dc2a58
Author: littlecoder04 <[email protected]>
AuthorDate: Thu Apr 9 20:49:39 2026 +0800
[spark] Fix RowTrackingTable reading by add _ROW_ID existing check before
adding _ROW_ID (#7606)
---
.../org/apache/paimon/spark/PaimonBaseScanBuilder.scala | 5 ++++-
.../scala/org/apache/paimon/spark/read/BaseScan.scala | 5 ++++-
.../apache/paimon/spark/sql/RowTrackingTestBase.scala | 17 +++++++++++++++++
3 files changed, 25 insertions(+), 2 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 14a090ff14..c249169ae7 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -69,7 +69,10 @@ abstract class PaimonBaseScanBuilder
val postScan = mutable.ArrayBuffer.empty[SparkPredicate]
var newRowType = rowType
- if (coreOptions.rowTrackingEnabled() &&
coreOptions.dataEvolutionEnabled()) {
+ if (
+ coreOptions.rowTrackingEnabled() && coreOptions
+ .dataEvolutionEnabled() &&
!rowType.containsField(SpecialFields.ROW_ID.name())
+ ) {
newRowType = SpecialFields.rowTypeWithRowTracking(newRowType);
}
val converter = SparkV2FilterConverter(newRowType)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
index 42f724d3e6..3237475d69 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BaseScan.scala
@@ -68,7 +68,10 @@ trait BaseScan extends Scan with SupportsReportStatistics
with Logging {
val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
lazy val tableRowType: RowType = {
- if (coreOptions.rowTrackingEnabled()) {
+ if (
+ coreOptions
+ .rowTrackingEnabled() &&
!table.rowType().containsField(SpecialFields.ROW_ID.name())
+ ) {
SpecialFields.rowTypeWithRowTracking(table.rowType())
} else {
table.rowType()
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index 0ea9d21bcf..e012656cc8 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -795,6 +795,22 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
}
}
+ test("Row Tracking: query row_tracking system table with filter pushdown") {
+ withTable("t") {
+ sql("CREATE TABLE t (a INT, b INT) TBLPROPERTIES ('row-tracking.enabled'
= 'true')")
+ sql("INSERT INTO t VALUES (1, 10), (2, 20), (3, 30)")
+
+ val query = s"SELECT a, b FROM `t$$row_tracking` WHERE a > 1 ORDER BY a"
+ checkAnswer(sql(query), Seq(Row(2, 20), Row(3, 30)))
+
+ val scan = getScan(query)
+ assert(
+ scan.description().contains("DataFilters"),
+ s"Expected predicate pushdown (DataFilters) in scan description, but
got: ${scan.description()}"
+ )
+ }
+ }
+
test("Data Evolution: compact fields action") {
withTable("s", "t") {
sql("CREATE TABLE s (id INT, b INT)")
@@ -952,4 +968,5 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
assert(!indexEntries.exists(entry =>
entry.partition().getString(0).toString.equals("p1")))
}
}
+
}