This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6a8167f1d5 [spark] Support partition statistics in SHOW TABLE EXTENDED
PARTITION command (#7612)
6a8167f1d5 is described below
commit 6a8167f1d5c682e9f9a04ed9b1443234f5235988
Author: Kerwin Zhang <[email protected]>
AuthorDate: Fri Apr 10 21:10:09 2026 +0800
[spark] Support partition statistics in SHOW TABLE EXTENDED PARTITION
command (#7612)
---
.../paimon/spark/PaimonPartitionManagement.scala | 28 +++++++++++++++++++++-
.../commands/PaimonShowTablePartitionCommand.scala | 18 ++++++++++++--
.../paimon/spark/sql/DescribeTableTestBase.scala | 16 +++++++++++++
.../catalyst/analysis/Spark4ResolutionRules.scala | 12 ++++++++--
4 files changed, 69 insertions(+), 5 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 511e728dbc..cd5955b0ff 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -19,7 +19,9 @@
package org.apache.paimon.spark
import org.apache.paimon.CoreOptions
+import org.apache.paimon.partition.PartitionStatistics
import org.apache.paimon.table.{FileStoreTable, Table}
+import org.apache.paimon.table.source.ScanMode
import org.apache.paimon.types.RowType
import org.apache.paimon.utils.{InternalRowPartitionComputer, TypeUtils}
@@ -136,7 +138,31 @@ trait PaimonPartitionManagement extends
SupportsAtomicPartitionManagement with L
}
override def loadPartitionMetadata(ident: InternalRow): JMap[String, String]
= {
- Map.empty[String, String].asJava
+ table match {
+ case fileStoreTable: FileStoreTable =>
+ val partitionSpec = toPaimonPartitions(Array(ident)).head
+ val partitionEntries = fileStoreTable
+ .newSnapshotReader()
+ .withMode(ScanMode.ALL)
+ .withPartitionFilter(partitionSpec)
+ .partitionEntries()
+
+ if (!partitionEntries.isEmpty) {
+ val entry = partitionEntries.get(0)
+ Map(
+ PartitionStatistics.FIELD_RECORD_COUNT ->
entry.recordCount().toString,
+ PartitionStatistics.FIELD_FILE_SIZE_IN_BYTES ->
entry.fileSizeInBytes().toString,
+ PartitionStatistics.FIELD_FILE_COUNT -> entry.fileCount().toString,
+ PartitionStatistics.FIELD_LAST_FILE_CREATION_TIME -> entry
+ .lastFileCreationTime()
+ .toString
+ ).asJava
+ } else {
+ Map.empty[String, String].asJava
+ }
+ case _ =>
+ Map.empty[String, String].asJava
+ }
}
override def listPartitionIdentifiers(
diff --git
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala
similarity index 84%
rename from
paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala
index ac98a807ca..5d85c7bd5e 100644
---
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala
@@ -18,13 +18,14 @@
package org.apache.paimon.spark.commands
+import org.apache.paimon.partition.PartitionStatistics
import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.ResolvedPartitionSpec
import
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal,
ToPrettyString}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal}
import org.apache.spark.sql.connector.catalog.{Identifier,
SupportsPartitionManagement, TableCatalog}
import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
@@ -87,7 +88,20 @@ case class PaimonShowTablePartitionCommand(
val partitionValues = partitions.mkString("[", ", ", "]")
results.put("Partition Values", s"$partitionValues")
- // TODO "Partition Parameters", "Created Time", "Last Access", "Partition
Statistics"
+ // Partition Parameters and Partition Statistics
+ val metadata = partitionTable.loadPartitionMetadata(row)
+ if (!metadata.isEmpty) {
+ val metadataMap = metadata.asScala
+ results.put(
+ "Partition Parameters",
+ s"{${metadataMap.map { case (k, v) => s"$k=$v" }.mkString(", ")}}")
+
+ val fileSizeInBytes =
+ metadataMap.getOrElse(PartitionStatistics.FIELD_FILE_SIZE_IN_BYTES,
"0").toLong
+ val recordCount =
+ metadataMap.getOrElse(PartitionStatistics.FIELD_RECORD_COUNT,
"0").toLong
+ results.put("Partition Statistics", s"$recordCount rows,
$fileSizeInBytes bytes")
+ }
results
.map {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTestBase.scala
index 63efa3f7e0..31cec5fda6 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTestBase.scala
@@ -98,6 +98,22 @@ abstract class DescribeTableTestBase extends
PaimonSparkTestBase {
)
Assertions.assertTrue(
res2.select("information").collect().head.getString(0).contains("Partition
Values"))
+
+ val info2 = res2.select("information").collect().head.getString(0)
+ Assertions.assertTrue(info2.contains("Partition Parameters"))
+
Assertions.assertTrue(info2.contains(PartitionStatistics.FIELD_RECORD_COUNT))
+
Assertions.assertTrue(info2.contains(PartitionStatistics.FIELD_FILE_SIZE_IN_BYTES))
+
Assertions.assertTrue(info2.contains(PartitionStatistics.FIELD_FILE_COUNT))
+
Assertions.assertTrue(info2.contains(PartitionStatistics.FIELD_LAST_FILE_CREATION_TIME))
+ Assertions.assertTrue(info2.contains("Partition Statistics"))
+ Assertions.assertTrue(info2.contains("recordCount=1"))
+ Assertions.assertTrue(info2.contains("1 rows"))
+
+ val res3 =
+ spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's2' PARTITION(pt
= '2024')")
+ val info3 = res3.select("information").collect().head.getString(0)
+ Assertions.assertTrue(info3.contains("recordCount=2"))
+ Assertions.assertTrue(info3.contains("2 rows"))
}
}
}
diff --git
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala
index 461cbd0c93..23db4e716d 100644
---
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala
+++
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala
@@ -18,10 +18,18 @@
package org.apache.paimon.spark.catalyst.analysis
+import org.apache.paimon.spark.commands.PaimonShowTablePartitionCommand
+
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.analysis.ResolvedTable
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
ShowTablePartition}
import org.apache.spark.sql.catalyst.rules.Rule
case class Spark4ResolutionRules(session: SparkSession) extends
Rule[LogicalPlan] {
- override def apply(plan: LogicalPlan): LogicalPlan = plan
+ override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsDown {
+ case s @ ShowTablePartition(rt: ResolvedTable, _, _) =>
+ val resolvedSpec =
+ PaimonResolvePartitionSpec.resolve(rt.catalog, rt.identifier,
s.partitionSpec)
+ PaimonShowTablePartitionCommand(s.output, rt.catalog, rt.identifier,
resolvedSpec)
+ }
}