This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 37564b4fd68 [HUDI-7845] Call show_fsview_latest procedure support
path_regex (#11418)
37564b4fd68 is described below
commit 37564b4fd68777fd0b1f553237066a07060aa1d6
Author: Zouxxyy <[email protected]>
AuthorDate: Sun Jun 9 09:11:46 2024 +0800
[HUDI-7845] Call show_fsview_latest procedure support path_regex (#11418)
---
.../table/view/AbstractTableFileSystemView.java | 13 +++
.../hudi/command/procedures/BaseProcedure.scala | 5 +
.../procedures/ShowFileSystemViewProcedure.scala | 105 ++++++++++++---------
.../sql/hudi/procedure/TestFsViewProcedure.scala | 86 ++++++++++++++++-
4 files changed, 164 insertions(+), 45 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 550082b0aa1..90f48b660c3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -672,6 +672,19 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
}
}
+ public final List<String> getPartitionNames() {
+ try {
+ readLock.lock();
+ return fetchAllStoredFileGroups()
+ .filter(fg -> !isFileGroupReplaced(fg))
+ .map(HoodieFileGroup::getPartitionPath)
+ .distinct()
+ .collect(Collectors.toList());
+ } finally {
+ readLock.unlock();
+ }
+ }
+
@Override
public final Stream<Pair<String, CompactionOperation>>
getPendingLogCompactionOperations() {
try {
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
index b0ffc0cb64e..777d1937c98 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BaseProcedure.scala
@@ -76,6 +76,11 @@ abstract class BaseProcedure extends Procedure {
}
}
+ protected def isArgDefined(args: ProcedureArgs, parameter:
ProcedureParameter): Boolean = {
+ val paramKey = getParamKey(parameter, args.isNamedArgs)
+ args.map.containsKey(paramKey)
+ }
+
protected def getInternalRowValue(row: InternalRow, index: Int, dataType:
DataType): Any = {
dataType match {
case StringType => row.getString(index)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
index c7d11f4c091..f19cd105c81 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
@@ -22,17 +22,23 @@ import org.apache.hudi.common.model.{FileSlice,
HoodieLogFile}
import org.apache.hudi.common.table.timeline.{CompletionTimeQueryView,
HoodieDefaultTimeline, HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.util
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.storage.StoragePath
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
import java.util.function.{Function, Supplier}
-import java.util.stream.Collectors
+import java.util.stream.{Collectors, Stream => JStream}
+import java.util.{ArrayList => JArrayList, List => JList}
import scala.collection.JavaConverters._
class ShowFileSystemViewProcedure(showLatest: Boolean) extends BaseProcedure
with ProcedureBuilder {
+
+ private val ALL_PARTITIONS = "ALL_PARTITIONS"
+
private val PARAMETERS_ALL: Array[ProcedureParameter] =
Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""),
@@ -40,7 +46,7 @@ class ShowFileSystemViewProcedure(showLatest: Boolean)
extends BaseProcedure wit
ProcedureParameter.optional(3, "include_in_flight", DataTypes.BooleanType,
false),
ProcedureParameter.optional(4, "exclude_compaction",
DataTypes.BooleanType, false),
ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 10),
- ProcedureParameter.optional(6, "path_regex", DataTypes.StringType, "*/*/*")
+ ProcedureParameter.optional(6, "path_regex", DataTypes.StringType,
ALL_PARTITIONS)
)
private val OUTPUT_TYPE_ALL: StructType = StructType(Array[StructField](
@@ -54,16 +60,11 @@ class ShowFileSystemViewProcedure(showLatest: Boolean)
extends BaseProcedure wit
StructField("delta_files", DataTypes.StringType, nullable = true,
Metadata.empty)
))
- private val PARAMETERS_LATEST: Array[ProcedureParameter] =
Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType),
- ProcedureParameter.optional(1, "max_instant", DataTypes.StringType, ""),
- ProcedureParameter.optional(2, "include_max", DataTypes.BooleanType,
false),
- ProcedureParameter.optional(3, "include_inflight", DataTypes.BooleanType,
false),
- ProcedureParameter.optional(4, "exclude_compaction",
DataTypes.BooleanType, false),
- ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 10),
- ProcedureParameter.required(6, "partition_path", DataTypes.StringType),
- ProcedureParameter.optional(7, "merge", DataTypes.BooleanType, true)
-
+ private val PARAMETERS_LATEST: Array[ProcedureParameter] =
+ PARAMETERS_ALL ++ Array[ProcedureParameter](
+ // Keep it for compatibility with older version, `path_regex` can
replace it
+ ProcedureParameter.optional(7, "partition_path", DataTypes.StringType,
ALL_PARTITIONS),
+ ProcedureParameter.optional(8, "merge", DataTypes.BooleanType, true)
)
private val OUTPUT_TYPE_LATEST: StructType = StructType(Array[StructField](
@@ -82,17 +83,16 @@ class ShowFileSystemViewProcedure(showLatest: Boolean)
extends BaseProcedure wit
StructField("delta_files_compaction_unscheduled", DataTypes.StringType,
nullable = true, Metadata.empty)
))
- private def buildFileSystemView(table: Option[Any],
+ private def buildFileSystemView(basePath: String,
+ metaClient: HoodieTableMetaClient,
globRegex: String,
maxInstant: String,
includeMaxInstant: Boolean,
includeInflight: Boolean,
excludeCompaction: Boolean
): HoodieTableFileSystemView = {
- val basePath = getBasePath(table)
- val metaClient = createMetaClient(jsc, basePath)
val storage = metaClient.getStorage
- val statuses = if (globRegex == PARAMETERS_ALL.apply(6).default) {
+ val statuses = if (globRegex == ALL_PARTITIONS) {
FSUtils.getAllDataPathInfo(storage, new StoragePath(basePath))
} else {
val globPath = String.format("%s/%s/*", basePath, globRegex)
@@ -124,12 +124,12 @@ class ShowFileSystemViewProcedure(showLatest: Boolean)
extends BaseProcedure wit
}
val filteredTimeline = new HoodieDefaultTimeline(
- new java.util.ArrayList[HoodieInstant](instants.toList.asJava).stream(),
details)
+ new JArrayList[HoodieInstant](instants.toList.asJava).stream(), details)
new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses)
}
- private def showAllFileSlices(fsView: HoodieTableFileSystemView):
java.util.List[Row] = {
- val rows: java.util.List[Row] = new java.util.ArrayList[Row]
+ private def showAllFileSlices(fsView: HoodieTableFileSystemView): JList[Row]
= {
+ val rows: JList[Row] = new JArrayList[Row]
fsView.getAllFileGroups.iterator().asScala.foreach(fg => {
fg.getAllFileSlices.iterator().asScala.foreach(fs => {
val fileId = fg.getFileGroupId.getFileId
@@ -150,25 +150,19 @@ class ShowFileSystemViewProcedure(showLatest: Boolean)
extends BaseProcedure wit
rows
}
- private def showLatestFileSlices(fsView: HoodieTableFileSystemView,
- table: Option[Any],
- partition: String,
+ private def showLatestFileSlices(metaClient: HoodieTableMetaClient,
+ fsView: HoodieTableFileSystemView,
+ partitions: Seq[String],
maxInstant: String,
- merge: Boolean): java.util.List[Row] = {
- var fileSliceStream: java.util.stream.Stream[FileSlice] = null
- val basePath = getBasePath(table)
- val metaClient = createMetaClient(jsc, basePath)
+ merge: Boolean): JList[Row] = {
+ var fileSliceStream: JStream[FileSlice] = JStream.empty()
val completionTimeQueryView = new CompletionTimeQueryView(metaClient)
- if (!merge) {
- fileSliceStream = fsView.getLatestFileSlices(partition)
+ if (merge) {
+ partitions.foreach(p => fileSliceStream =
JStream.concat(fileSliceStream, fsView.getLatestMergedFileSlicesBeforeOrOn(p,
maxInstant)))
} else {
- fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(partition,
if (maxInstant.isEmpty) {
-
metaClient.getActiveTimeline.filterCompletedAndCompactionInstants().lastInstant().get().getTimestamp
- } else {
- maxInstant
- })
+ partitions.foreach(p => fileSliceStream =
JStream.concat(fileSliceStream, fsView.getLatestFileSlices(p)))
}
- val rows: java.util.List[Row] = new java.util.ArrayList[Row]
+ val rows = new JArrayList[Row]
fileSliceStream.iterator().asScala.foreach {
fs => {
val fileId = fs.getFileId
@@ -204,7 +198,7 @@ class ShowFileSystemViewProcedure(showLatest: Boolean)
extends BaseProcedure wit
val logFilesCommitTimeNonEqualInstantTime =
fs.getLogFiles.iterator().asScala
.filter(logFile =>
!logFile.getDeltaCommitTime.equals(fs.getBaseInstantTime))
.mkString("[", ",", "]")
- rows.add(Row(partition, fileId, baseInstantTime, baseFilePath,
baseFileSize, numLogFiles, sumLogFileSize,
+ rows.add(Row(fs.getFileGroupId.getPartitionPath, fileId,
baseInstantTime, baseFilePath, baseFileSize, numLogFiles, sumLogFileSize,
logFilesScheduledForCompactionTotalSize,
logFilesUnscheduledTotalSize, logSelectedForCompactionToBaseRatio,
logUnscheduledToBaseRatio, logFilesCommitTimeEqualInstantTime,
logFilesCommitTimeNonEqualInstantTime
))
@@ -234,15 +228,40 @@ class ShowFileSystemViewProcedure(showLatest: Boolean)
extends BaseProcedure wit
val includeInflight = getArgValueOrDefault(args,
parameters(3)).get.asInstanceOf[Boolean]
val excludeCompaction = getArgValueOrDefault(args,
parameters(4)).get.asInstanceOf[Boolean]
val limit = getArgValueOrDefault(args, parameters(5)).get.asInstanceOf[Int]
- val rows: java.util.List[Row] = if (!showLatest) {
- val globRegex = getArgValueOrDefault(args,
parameters(6)).get.asInstanceOf[String]
- val fsView = buildFileSystemView(table, globRegex, maxInstant,
includeMax, includeInflight, excludeCompaction)
- showAllFileSlices(fsView)
+ val globRegex = if (showLatest) {
+ val isPathRegexDefined = isArgDefined(args, parameters(6))
+ val isPartitionPathDefined = isArgDefined(args, parameters(7))
+ if (isPathRegexDefined && isPartitionPathDefined) {
+ throw new HoodieException("path_regex and partition_path cannot be
used together")
+ }
+ if (isPathRegexDefined) {
+ getArgValueOrDefault(args, parameters(6)).get.asInstanceOf[String]
+ } else {
+ getArgValueOrDefault(args, parameters(7)).get.asInstanceOf[String]
+ }
+ } else {
+ getArgValueOrDefault(args, parameters(6)).get.asInstanceOf[String]
+ }
+ val basePath = getBasePath(table)
+ val metaClient = createMetaClient(jsc, basePath)
+ val fsView = buildFileSystemView(basePath, metaClient, globRegex,
maxInstant, includeMax, includeInflight, excludeCompaction)
+ val rows = if (showLatest) {
+ val merge = getArgValueOrDefault(args,
parameters(8)).get.asInstanceOf[Boolean]
+ val maxInstantForMerge = if (merge && maxInstant.isEmpty) {
+ val lastInstant =
metaClient.getActiveTimeline.filterCompletedAndCompactionInstants().lastInstant()
+ if (lastInstant.isPresent) {
+ lastInstant.get().getTimestamp
+ } else {
+ // scalastyle:off return
+ return Seq.empty
+ // scalastyle:on return
+ }
+ } else {
+ maxInstant
+ }
+ showLatestFileSlices(metaClient, fsView,
fsView.getPartitionNames.asScala.toSeq, maxInstantForMerge, merge)
} else {
- val partitionPath = getArgValueOrDefault(args,
parameters(6)).get.asInstanceOf[String]
- val merge = getArgValueOrDefault(args,
parameters(7)).get.asInstanceOf[Boolean]
- val fsView = buildFileSystemView(table, partitionPath, maxInstant,
includeMax, includeInflight, excludeCompaction)
- showLatestFileSlices(fsView, table, partitionPath, maxInstant, merge)
+ showAllFileSlices(fsView)
}
rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala
index 9de1f1b0ee8..69b07f2c9cd 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala
@@ -118,7 +118,7 @@ class TestFsViewProcedure extends
HoodieSparkProcedureTestBase {
| )
""".stripMargin)
// insert data to table
- spark.sql(s"insert into $tableName select 1, 'a1', 10, 'f11',
'f21',1000")
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 'f11', 'f21',
1000")
spark.sql(s"insert into $tableName select 2, 'a2', 20, 'f12', 'f22',
1500")
// Check required fields
@@ -146,7 +146,6 @@ class TestFsViewProcedure extends
HoodieSparkProcedureTestBase {
}
}
-
test("Test Call show_fsview_latest Procedure") {
withTempDir { tmp =>
val tableName = generateTableName
@@ -183,4 +182,87 @@ class TestFsViewProcedure extends
HoodieSparkProcedureTestBase {
}
}
}
+
+ test("Test Call show_fsview_latest Procedure with NonPartition") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | 'hoodie.parquet.small.file.limit' = '0'
+ | )
+ """.stripMargin)
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+ spark.sql(s"insert into $tableName select 2, 'a3', 20, 1500")
+
+ val result = spark.sql(
+ s"""call show_fsview_latest(table => '$tableName', limit =>
10)""".stripMargin).collect()
+ assertResult(2) {
+ result.length
+ }
+ }
+ }
+
+ test("Test Call show_fsview_latest Procedure with path_regex") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | ts long,
+ | day string,
+ | hh string
+ |) using hudi
+ | partitioned by(day, hh)
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | 'hoodie.parquet.small.file.limit' = '0'
+ | )
+ """.stripMargin)
+
+ val result1 = spark.sql(s"call show_fsview_all(table =>
'$tableName')").collect()
+ assertResult(0) {
+ result1.length
+ }
+
+ spark.sql(s"insert into $tableName select 1, 'a1', 1001, 'd1', 'h1'")
+ spark.sql(s"insert into $tableName select 1, 'a2', 1002, 'd1', 'h1'")
+ spark.sql(s"insert into $tableName select 2, 'a3', 1003, 'd1', 'h2'")
+ spark.sql(s"insert into $tableName select 3, 'a4', 1004, 'd1', 'h2'")
+ spark.sql(s"insert into $tableName select 4, 'a5', 1005, 'd2', 'h1'")
+
+ val result2 = spark.sql(
+ s"call show_fsview_latest(table => '$tableName')").collect()
+ assertResult(4) {
+ result2.length
+ }
+
+ val result3 = spark.sql(
+ s"call show_fsview_latest(table => '$tableName', path_regex =>
'day=d1/*/')").collect()
+ assertResult(3) {
+ result3.length
+ }
+
+ val result4 = spark.sql(
+ s"call show_fsview_latest(table => '$tableName', path_regex =>
'day=d1/hh=h2/')").collect()
+ assertResult(2) {
+ result4.length
+ }
+ }
+ }
}