This is an automated email from the ASF dual-hosted git repository. hope pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 8670853c759b41cf9f91b0333b0e2ab4e1b2c641 Author: Zouxxyy <[email protected]> AuthorDate: Thu Mar 26 19:54:06 2026 +0800 [spark] Support reading ObjectTable via Spark SQL (#7535) Support reading ObjectTable via Spark SQL by implementing `SupportsRead` for `SparkObjectTable` and adding `ObjectTableScan` / `ObjectTableScanBuilder`. --- .../java/org/apache/paimon/spark/SparkCatalog.java | 2 +- .../scala/org/apache/paimon/spark/SparkTable.scala | 19 +++++- .../apache/paimon/spark/read/ObjectTableScan.scala | 52 ++++++++++++++++ .../paimon/spark/table/PaimonObjectTableTest.scala | 72 ++++++++++++++++++++++ 4 files changed, 142 insertions(+), 3 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 9d12201c91..6ef853eda8 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -662,7 +662,7 @@ public class SparkCatalog extends SparkBaseCatalog } else if (table instanceof LanceTable) { return new SparkLanceTable(table); } else if (table instanceof ObjectTable) { - return new SparkObjectTable(table); + return new SparkObjectTable((ObjectTable) table); } else { return new SparkTable(table); } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala index 740e7b5994..55b21263df 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala @@ -18,11 +18,17 @@ package org.apache.paimon.spark +import org.apache.paimon.spark.read.ObjectTableScanBuilder import org.apache.paimon.spark.rowops.PaimonSparkCopyOnWriteOperation +import org.apache.paimon.table.`object`.ObjectTable import org.apache.paimon.table.{FileStoreTable, Table} -import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations +import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsRowLevelOperations, TableCapability} +import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{RowLevelOperationBuilder, RowLevelOperationInfo} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import java.util.{EnumSet => JEnumSet, Set => JSet} /** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */ case class SparkTable(override val table: Table) @@ -46,4 +52,13 @@ case class SparkIcebergTable(table: Table) extends BaseTable case class SparkLanceTable(table: Table) extends BaseTable -case class SparkObjectTable(table: Table) extends BaseTable +case class SparkObjectTable(override val table: ObjectTable) extends BaseTable with SupportsRead { + + override def capabilities(): JSet[TableCapability] = { + JEnumSet.of(TableCapability.BATCH_READ) + } + + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + new ObjectTableScanBuilder(table.copy(options.asCaseSensitiveMap)) + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/ObjectTableScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/ObjectTableScan.scala new file mode 100644 index 0000000000..4542b4d2a4 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/ObjectTableScan.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.read + +import org.apache.paimon.partition.PartitionPredicate +import org.apache.paimon.predicate.Predicate +import org.apache.paimon.spark.PaimonBaseScanBuilder +import org.apache.paimon.table.`object`.ObjectTable +import org.apache.paimon.table.source.Split + +import org.apache.spark.sql.types.StructType + +import scala.collection.JavaConverters._ + +class ObjectTableScanBuilder(val table: ObjectTable) extends PaimonBaseScanBuilder { + + override def build(): ObjectTableScan = + ObjectTableScan(table, requiredSchema) +} + +/** Scan implementation for [[ObjectTable]] */ +case class ObjectTableScan(table: ObjectTable, requiredSchema: StructType) extends BaseScan { + + override val pushedPartitionFilters: Seq[PartitionPredicate] = Nil + override val pushedDataFilters: Seq[Predicate] = Nil + override val pushedLimit: Option[Int] = None + + protected def getInputSplits: Array[Split] = { + readBuilder + .newScan() + .plan() + .splits() + .asScala + .toArray + } +} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonObjectTableTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonObjectTableTest.scala new file mode 100644 index 0000000000..3c3ecee016 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonObjectTableTest.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.table + +import org.apache.paimon.catalog.Identifier +import org.apache.paimon.fs.Path +import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase +import org.apache.paimon.table.`object`.ObjectTable + +import org.apache.spark.sql.Row + +class PaimonObjectTableTest extends PaimonSparkTestWithRestCatalogBase { + + override protected def beforeEach(): Unit = { + super.beforeEach() + sql("USE paimon") + sql("CREATE DATABASE IF NOT EXISTS test_db") + sql("USE test_db") + } + + test("ObjectTable: read file metadata") { + val tableName = "object_table_test" + withTable(tableName) { + sql( + s"CREATE TABLE $tableName TBLPROPERTIES (" + + s"'type' = 'object-table')") + + val objectTable = + paimonCatalog + .getTable(Identifier.create("test_db", tableName)) + .asInstanceOf[ObjectTable] + + val fileIO = objectTable.fileIO() + val basePath = new Path(objectTable.location()) + fileIO.mkdirs(basePath) + fileIO.writeFile(new Path(basePath, "file1.txt"), "content1", false) + fileIO.writeFile(new Path(basePath, "file2.txt"), "content2", false) + fileIO.mkdirs(new Path(basePath, "subdir")) + fileIO.writeFile(new Path(basePath, "subdir/file3.txt"), "content3", false) + + checkAnswer( + sql(s"SELECT path, name FROM $tableName ORDER BY path"), + Seq( + Row("file1.txt", "file1.txt"), + Row("file2.txt", "file2.txt"), + Row("subdir/file3.txt", "file3.txt")) + ) + + // Verify schema has expected columns + checkAnswer( + sql(s"SELECT COUNT(*) FROM $tableName"), + Seq(Row(3)) + ) + } + } +}
