This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c37503a1f5 [spark] Add OVERWRITE_BY_FILTER capabilities to
PartitionedFormatTable for Spark 4 compatibility (#7517)
c37503a1f5 is described below
commit c37503a1f532c99f3f9f04192b244ba2eaac6db8
Author: Kerwin Zhang <[email protected]>
AuthorDate: Wed Mar 25 19:19:41 2026 +0800
[spark] Add OVERWRITE_BY_FILTER capabilities to PartitionedFormatTable for
Spark 4 compatibility (#7517)
---
.../spark/sql/execution/SparkFormatTable.scala | 36 ++++++++++++++++++++
.../paimon/spark/table/PaimonFormatTableTest.scala | 38 ++++++++++++++++++++++
2 files changed, 74 insertions(+)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
index 2cb0101653..9d0983ed0b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/SparkFormatTable.scala
@@ -25,7 +25,11 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Literal}
import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
+import org.apache.spark.sql.connector.catalog.TableCapability
+import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
+import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo,
SupportsOverwriteV2, Write, WriteBuilder}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.csv.{CSVScanBuilder,
CSVTable}
import org.apache.spark.sql.execution.datasources.v2.json.JsonTable
@@ -123,6 +127,18 @@ trait PartitionedFormatTable extends
SupportsPartitionManagement {
val fileIndex: PartitioningAwareFileIndex
+ override def capabilities(): util.Set[TableCapability] = {
+ util.EnumSet.of(BATCH_READ, BATCH_WRITE, OVERWRITE_DYNAMIC,
OVERWRITE_BY_FILTER)
+ }
+
+ protected def wrapWriteBuilderWithOverwrite(original: WriteBuilder):
WriteBuilder = {
+ new WriteBuilder with SupportsOverwriteV2 {
+ override def build(): Write = original.build()
+ override def canOverwrite(predicates: Array[Predicate]): Boolean = true
+ override def overwrite(predicates: Array[Predicate]): WriteBuilder = this
+ }
+ }
+
override def partitionSchema(): StructType = partitionSchema_
override def partitioning(): Array[Transform] = {
@@ -172,6 +188,10 @@ class PartitionedCSVTable(
extends CSVTable(name, sparkSession, options, paths, userSpecifiedSchema,
fallbackFileFormat)
with PartitionedFormatTable {
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+ wrapWriteBuilderWithOverwrite(super.newWriteBuilder(info))
+ }
+
override def newScanBuilder(options: CaseInsensitiveStringMap):
CSVScanBuilder = {
val mergedOptions =
this.options.asCaseSensitiveMap().asScala ++
options.asCaseSensitiveMap().asScala
@@ -204,6 +224,10 @@ class PartitionedTextTable(
extends TextTable(name, sparkSession, options, paths, userSpecifiedSchema,
fallbackFileFormat)
with PartitionedFormatTable {
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+ wrapWriteBuilderWithOverwrite(super.newWriteBuilder(info))
+ }
+
override def newScanBuilder(options: CaseInsensitiveStringMap):
TextScanBuilder = {
val mergedOptions =
this.options.asCaseSensitiveMap().asScala ++
options.asCaseSensitiveMap().asScala
@@ -236,6 +260,10 @@ class PartitionedOrcTable(
) extends OrcTable(name, sparkSession, options, paths, userSpecifiedSchema,
fallbackFileFormat)
with PartitionedFormatTable {
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+ wrapWriteBuilderWithOverwrite(super.newWriteBuilder(info))
+ }
+
override lazy val fileIndex: PartitioningAwareFileIndex = {
SparkFormatTable.createFileIndex(
options,
@@ -257,6 +285,10 @@ class PartitionedParquetTable(
) extends ParquetTable(name, sparkSession, options, paths,
userSpecifiedSchema, fallbackFileFormat)
with PartitionedFormatTable {
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+ wrapWriteBuilderWithOverwrite(super.newWriteBuilder(info))
+ }
+
override lazy val fileIndex: PartitioningAwareFileIndex = {
SparkFormatTable.createFileIndex(
options,
@@ -278,6 +310,10 @@ class PartitionedJsonTable(
extends JsonTable(name, sparkSession, options, paths, userSpecifiedSchema,
fallbackFileFormat)
with PartitionedFormatTable {
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+ wrapWriteBuilderWithOverwrite(super.newWriteBuilder(info))
+ }
+
override lazy val fileIndex: PartitioningAwareFileIndex = {
SparkFormatTable.createFileIndex(
options,
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
index 374383d316..6fa31f5e2c 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
@@ -24,6 +24,8 @@ import
org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
import org.apache.paimon.table.FormatTable
import org.apache.spark.sql.Row
+import org.apache.spark.sql.connector.catalog.TableCapability
+import org.apache.spark.sql.connector.catalog.TableCatalog
class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase {
@@ -367,6 +369,42 @@ class PaimonFormatTableTest extends
PaimonSparkTestWithRestCatalogBase {
}
}
+ test("PartitionedFormatTable: external table saveAsTable should support
overwrite") {
+ val tableName = "paimon_format_external_overwrite_test"
+ Seq("paimon", "engine").foreach {
+ impl =>
+ withTable(tableName) {
+ val location =
s"${tempDBDir.getCanonicalPath}/external_overwrite_test"
+
+ sql(s"""CREATE TABLE $tableName (a INT, b STRING, pt STRING)
+ |USING CSV PARTITIONED BY (pt) LOCATION '$location'
+ |TBLPROPERTIES ('format-table.implementation'='$impl')
+ |""".stripMargin)
+
+ // Verify external format table has OVERWRITE_BY_FILTER capability
+ val catalog = spark.sessionState.catalogManager.currentCatalog
+ .asInstanceOf[TableCatalog]
+ val v2Table = catalog.loadTable(
+
org.apache.spark.sql.connector.catalog.Identifier.of(Array("test_db"),
tableName))
+ val caps = v2Table.capabilities()
+ assert(
+ caps.contains(TableCapability.OVERWRITE_BY_FILTER),
+ s"External format table ($impl) should have OVERWRITE_BY_FILTER
capability, but capabilities are: $caps"
+ )
+
+ spark
+ .createDataFrame(Seq((5, "x5", "p1")))
+ .toDF("a", "b", "pt")
+ .write
+ .format("csv")
+ .option("path", location)
+ .partitionBy("pt")
+ .mode("overwrite")
+ .saveAsTable(tableName)
+ }
+ }
+ }
+
test("Paimon format table: show partitions") {
withTable("t") {
sql("""