This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c37503a1f5 [spark] Add OVERWRITE_BY_FILTER capabilities to 
PartitionedFormatTable for Spark 4 compatibility (#7517)
c37503a1f5 is described below

commit c37503a1f532c99f3f9f04192b244ba2eaac6db8
Author: Kerwin Zhang <[email protected]>
AuthorDate: Wed Mar 25 19:19:41 2026 +0800

    [spark] Add OVERWRITE_BY_FILTER capabilities to PartitionedFormatTable for 
Spark 4 compatibility (#7517)
---
 .../spark/sql/execution/SparkFormatTable.scala     | 36 ++++++++++++++++++++
 .../paimon/spark/table/PaimonFormatTableTest.scala | 38 ++++++++++++++++++++++
 2 files changed, 74 insertions(+)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
index 2cb0101653..9d0983ed0b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
@@ -25,7 +25,11 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Literal}
 import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
+import org.apache.spark.sql.connector.catalog.TableCapability
+import org.apache.spark.sql.connector.catalog.TableCapability._
 import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
+import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, 
SupportsOverwriteV2, Write, WriteBuilder}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.v2.csv.{CSVScanBuilder, 
CSVTable}
 import org.apache.spark.sql.execution.datasources.v2.json.JsonTable
@@ -123,6 +127,18 @@ trait PartitionedFormatTable extends 
SupportsPartitionManagement {
 
   val fileIndex: PartitioningAwareFileIndex
 
+  override def capabilities(): util.Set[TableCapability] = {
+    util.EnumSet.of(BATCH_READ, BATCH_WRITE, OVERWRITE_DYNAMIC, 
OVERWRITE_BY_FILTER)
+  }
+
+  protected def wrapWriteBuilderWithOverwrite(original: WriteBuilder): 
WriteBuilder = {
+    new WriteBuilder with SupportsOverwriteV2 {
+      override def build(): Write = original.build()
+      override def canOverwrite(predicates: Array[Predicate]): Boolean = true
+      override def overwrite(predicates: Array[Predicate]): WriteBuilder = this
+    }
+  }
+
   override def partitionSchema(): StructType = partitionSchema_
 
   override def partitioning(): Array[Transform] = {
@@ -172,6 +188,10 @@ class PartitionedCSVTable(
   extends CSVTable(name, sparkSession, options, paths, userSpecifiedSchema, 
fallbackFileFormat)
   with PartitionedFormatTable {
 
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    wrapWriteBuilderWithOverwrite(super.newWriteBuilder(info))
+  }
+
   override def newScanBuilder(options: CaseInsensitiveStringMap): 
CSVScanBuilder = {
     val mergedOptions =
       this.options.asCaseSensitiveMap().asScala ++ 
options.asCaseSensitiveMap().asScala
@@ -204,6 +224,10 @@ class PartitionedTextTable(
   extends TextTable(name, sparkSession, options, paths, userSpecifiedSchema, 
fallbackFileFormat)
   with PartitionedFormatTable {
 
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    wrapWriteBuilderWithOverwrite(super.newWriteBuilder(info))
+  }
+
   override def newScanBuilder(options: CaseInsensitiveStringMap): 
TextScanBuilder = {
     val mergedOptions =
       this.options.asCaseSensitiveMap().asScala ++ 
options.asCaseSensitiveMap().asScala
@@ -236,6 +260,10 @@ class PartitionedOrcTable(
 ) extends OrcTable(name, sparkSession, options, paths, userSpecifiedSchema, 
fallbackFileFormat)
   with PartitionedFormatTable {
 
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    wrapWriteBuilderWithOverwrite(super.newWriteBuilder(info))
+  }
+
   override lazy val fileIndex: PartitioningAwareFileIndex = {
     SparkFormatTable.createFileIndex(
       options,
@@ -257,6 +285,10 @@ class PartitionedParquetTable(
 ) extends ParquetTable(name, sparkSession, options, paths, 
userSpecifiedSchema, fallbackFileFormat)
   with PartitionedFormatTable {
 
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    wrapWriteBuilderWithOverwrite(super.newWriteBuilder(info))
+  }
+
   override lazy val fileIndex: PartitioningAwareFileIndex = {
     SparkFormatTable.createFileIndex(
       options,
@@ -278,6 +310,10 @@ class PartitionedJsonTable(
   extends JsonTable(name, sparkSession, options, paths, userSpecifiedSchema, 
fallbackFileFormat)
   with PartitionedFormatTable {
 
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    wrapWriteBuilderWithOverwrite(super.newWriteBuilder(info))
+  }
+
   override lazy val fileIndex: PartitioningAwareFileIndex = {
     SparkFormatTable.createFileIndex(
       options,
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
index 374383d316..6fa31f5e2c 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
@@ -24,6 +24,8 @@ import 
org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
 import org.apache.paimon.table.FormatTable
 
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.connector.catalog.TableCapability
+import org.apache.spark.sql.connector.catalog.TableCatalog
 
 class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase {
 
@@ -367,6 +369,42 @@ class PaimonFormatTableTest extends 
PaimonSparkTestWithRestCatalogBase {
     }
   }
 
+  test("PartitionedFormatTable: external table saveAsTable should support 
overwrite") {
+    val tableName = "paimon_format_external_overwrite_test"
+    Seq("paimon", "engine").foreach {
+      impl =>
+        withTable(tableName) {
+          val location = 
s"${tempDBDir.getCanonicalPath}/external_overwrite_test"
+
+          sql(s"""CREATE TABLE $tableName (a INT, b STRING, pt STRING)
+                 |USING CSV PARTITIONED BY (pt) LOCATION '$location'
+                 |TBLPROPERTIES ('format-table.implementation'='$impl')
+                 |""".stripMargin)
+
+          // Verify external format table has OVERWRITE_BY_FILTER capability
+          val catalog = spark.sessionState.catalogManager.currentCatalog
+            .asInstanceOf[TableCatalog]
+          val v2Table = catalog.loadTable(
+            
org.apache.spark.sql.connector.catalog.Identifier.of(Array("test_db"), 
tableName))
+          val caps = v2Table.capabilities()
+          assert(
+            caps.contains(TableCapability.OVERWRITE_BY_FILTER),
+            s"External format table ($impl) should have OVERWRITE_BY_FILTER 
capability, but capabilities are: $caps"
+          )
+
+          spark
+            .createDataFrame(Seq((5, "x5", "p1")))
+            .toDF("a", "b", "pt")
+            .write
+            .format("csv")
+            .option("path", location)
+            .partitionBy("pt")
+            .mode("overwrite")
+            .saveAsTable(tableName)
+        }
+    }
+  }
+
   test("Paimon format table: show partitions") {
     withTable("t") {
       sql("""

Reply via email to