This is an automated email from the ASF dual-hosted git repository.

hope pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 8670853c759b41cf9f91b0333b0e2ab4e1b2c641
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Mar 26 19:54:06 2026 +0800

    [spark] Support reading ObjectTable via Spark SQL (#7535)
    
    Support reading ObjectTable via Spark SQL by implementing `SupportsRead`
    for `SparkObjectTable` and adding `ObjectTableScan` /
    `ObjectTableScanBuilder`.
---
 .../java/org/apache/paimon/spark/SparkCatalog.java |  2 +-
 .../scala/org/apache/paimon/spark/SparkTable.scala | 19 +++++-
 .../apache/paimon/spark/read/ObjectTableScan.scala | 52 ++++++++++++++++
 .../paimon/spark/table/PaimonObjectTableTest.scala | 72 ++++++++++++++++++++++
 4 files changed, 142 insertions(+), 3 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 9d12201c91..6ef853eda8 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -662,7 +662,7 @@ public class SparkCatalog extends SparkBaseCatalog
             } else if (table instanceof LanceTable) {
                 return new SparkLanceTable(table);
             } else if (table instanceof ObjectTable) {
-                return new SparkObjectTable(table);
+                return new SparkObjectTable((ObjectTable) table);
             } else {
                 return new SparkTable(table);
             }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index 740e7b5994..55b21263df 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -18,11 +18,17 @@
 
 package org.apache.paimon.spark
 
+import org.apache.paimon.spark.read.ObjectTableScanBuilder
 import org.apache.paimon.spark.rowops.PaimonSparkCopyOnWriteOperation
+import org.apache.paimon.table.`object`.ObjectTable
 import org.apache.paimon.table.{FileStoreTable, Table}
 
-import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
+import org.apache.spark.sql.connector.catalog.{SupportsRead, 
SupportsRowLevelOperations, TableCapability}
+import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{RowLevelOperationBuilder, 
RowLevelOperationInfo}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.util.{EnumSet => JEnumSet, Set => JSet}
 
 /** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
 case class SparkTable(override val table: Table)
@@ -46,4 +52,13 @@ case class SparkIcebergTable(table: Table) extends BaseTable
 
 case class SparkLanceTable(table: Table) extends BaseTable
 
-case class SparkObjectTable(table: Table) extends BaseTable
+case class SparkObjectTable(override val table: ObjectTable) extends BaseTable 
with SupportsRead {
+
+  override def capabilities(): JSet[TableCapability] = {
+    JEnumSet.of(TableCapability.BATCH_READ)
+  }
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
+    new ObjectTableScanBuilder(table.copy(options.asCaseSensitiveMap))
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/ObjectTableScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/ObjectTableScan.scala
new file mode 100644
index 0000000000..4542b4d2a4
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/ObjectTableScan.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.read
+
+import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.predicate.Predicate
+import org.apache.paimon.spark.PaimonBaseScanBuilder
+import org.apache.paimon.table.`object`.ObjectTable
+import org.apache.paimon.table.source.Split
+
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+class ObjectTableScanBuilder(val table: ObjectTable) extends 
PaimonBaseScanBuilder {
+
+  override def build(): ObjectTableScan =
+    ObjectTableScan(table, requiredSchema)
+}
+
+/** Scan implementation for [[ObjectTable]] */
+case class ObjectTableScan(table: ObjectTable, requiredSchema: StructType) 
extends BaseScan {
+
+  override val pushedPartitionFilters: Seq[PartitionPredicate] = Nil
+  override val pushedDataFilters: Seq[Predicate] = Nil
+  override val pushedLimit: Option[Int] = None
+
+  protected def getInputSplits: Array[Split] = {
+    readBuilder
+      .newScan()
+      .plan()
+      .splits()
+      .asScala
+      .toArray
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonObjectTableTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonObjectTableTest.scala
new file mode 100644
index 0000000000..3c3ecee016
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonObjectTableTest.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.table
+
+import org.apache.paimon.catalog.Identifier
+import org.apache.paimon.fs.Path
+import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
+import org.apache.paimon.table.`object`.ObjectTable
+
+import org.apache.spark.sql.Row
+
+class PaimonObjectTableTest extends PaimonSparkTestWithRestCatalogBase {
+
+  override protected def beforeEach(): Unit = {
+    super.beforeEach()
+    sql("USE paimon")
+    sql("CREATE DATABASE IF NOT EXISTS test_db")
+    sql("USE test_db")
+  }
+
+  test("ObjectTable: read file metadata") {
+    val tableName = "object_table_test"
+    withTable(tableName) {
+      sql(
+        s"CREATE TABLE $tableName TBLPROPERTIES (" +
+          s"'type' = 'object-table')")
+
+      val objectTable =
+        paimonCatalog
+          .getTable(Identifier.create("test_db", tableName))
+          .asInstanceOf[ObjectTable]
+
+      val fileIO = objectTable.fileIO()
+      val basePath = new Path(objectTable.location())
+      fileIO.mkdirs(basePath)
+      fileIO.writeFile(new Path(basePath, "file1.txt"), "content1", false)
+      fileIO.writeFile(new Path(basePath, "file2.txt"), "content2", false)
+      fileIO.mkdirs(new Path(basePath, "subdir"))
+      fileIO.writeFile(new Path(basePath, "subdir/file3.txt"), "content3", 
false)
+
+      checkAnswer(
+        sql(s"SELECT path, name FROM $tableName ORDER BY path"),
+        Seq(
+          Row("file1.txt", "file1.txt"),
+          Row("file2.txt", "file2.txt"),
+          Row("subdir/file3.txt", "file3.txt"))
+      )
+
+      // Verify schema has expected columns
+      checkAnswer(
+        sql(s"SELECT COUNT(*) FROM $tableName"),
+        Seq(Row(3))
+      )
+    }
+  }
+}

Reply via email to