This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a9a75f43a1c [HUDI-7941] add show_file_status procedure & 
run_rollback_inflight_tableservice procedure (#11538)
a9a75f43a1c is described below

commit a9a75f43a1cad05fc363acfb436f57adefce5676
Author: empcl <[email protected]>
AuthorDate: Tue Jul 2 17:21:36 2024 +0800

    [HUDI-7941] add show_file_status procedure & 
run_rollback_inflight_tableservice procedure (#11538)
    
    Co-authored-by: chenlei677 <[email protected]>
---
 .../hudi/command/procedures/HoodieProcedures.scala |   2 +
 .../RunRollbackInflightTableServiceProcedure.scala | 143 +++++++++
 .../procedures/ShowFileStatusProcedure.scala       | 244 +++++++++++++++
 ...tRunRollbackInflightTableServiceProcedure.scala | 122 ++++++++
 .../procedure/TestShowFileStatusProcedure.scala    | 330 +++++++++++++++++++++
 5 files changed, 841 insertions(+)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index e12aad789d7..87c6971a791 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -53,6 +53,7 @@ object HoodieProcedures {
       ,(DeleteMarkerProcedure.NAME, DeleteMarkerProcedure.builder)
       ,(ShowRollbacksProcedure.NAME, ShowRollbacksProcedure.builder)
       ,(ShowRollbackDetailProcedure.NAME, ShowRollbackDetailProcedure.builder)
+      ,(RunRollbackInflightTableServiceProcedure.NAME, 
RunRollbackInflightTableServiceProcedure.builder)
       ,(ExportInstantsProcedure.NAME, ExportInstantsProcedure.builder)
       ,(ShowAllFileSystemViewProcedure.NAME, 
ShowAllFileSystemViewProcedure.builder)
       ,(ShowLatestFileSystemViewProcedure.NAME, 
ShowLatestFileSystemViewProcedure.builder)
@@ -76,6 +77,7 @@ object HoodieProcedures {
       ,(ShowMetadataTableStatsProcedure.NAME, 
ShowMetadataTableStatsProcedure.builder)
       ,(ValidateMetadataTableFilesProcedure.NAME, 
ValidateMetadataTableFilesProcedure.builder)
       ,(ShowFsPathDetailProcedure.NAME, ShowFsPathDetailProcedure.builder)
+      ,(ShowFileStatusProcedure.NAME, ShowFileStatusProcedure.builder)
       ,(CopyToTableProcedure.NAME, CopyToTableProcedure.builder)
       ,(RepairAddpartitionmetaProcedure.NAME, 
RepairAddpartitionmetaProcedure.builder)
       ,(RepairCorruptedCleanFilesProcedure.NAME, 
RepairCorruptedCleanFilesProcedure.builder)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunRollbackInflightTableServiceProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunRollbackInflightTableServiceProcedure.scala
new file mode 100644
index 00000000000..28e0d03b85d
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunRollbackInflightTableServiceProcedure.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.HoodieCLIUtils
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.common.HoodiePendingRollbackInfo
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.util.Option
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.table.HoodieSparkTable
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.hudi.command.procedures
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+/**
+ * When calling this procedure, one of parameters table and path must be 
specified at least.
+ * If both parameters are given, table will take effect.
+ * pending_instant represents the instant information of the current 
operation, and pending_instant can be either
+ * a clustering instant or a compaction instant.
+ * delete_request_instant_file is used to mark whether to delete the request 
instant file.
+ */
+class RunRollbackInflightTableServiceProcedure extends BaseProcedure
+  with ProcedureBuilder
+  with PredicateHelper
+  with Logging {
+  override def build: Procedure = new RunRollbackInflightTableServiceProcedure
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
+    ProcedureParameter.required(2, "pending_instant", DataTypes.StringType),
+    ProcedureParameter.optional(3, "delete_request_instant_file", 
DataTypes.BooleanType, false)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("instant", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("time_cost", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  override def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  override def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+    val pendingInstant = getArgValueOrDefault(args, PARAMETERS(2)).get.toString
+    val deleteRequestInstantFile = getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[Boolean]
+
+    val basePath: String = getBasePath(tableName, tablePath)
+    val metaClient = HoodieTableMetaClient.builder
+      
.setConf(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration)).setBasePath(basePath).build
+
+    // determine whether the current instant exists and whether it is 
clustering or compaction
+    var isClustering: Boolean = true
+    var instant: HoodieInstant = null
+    val pendingCompactionInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, 
pendingInstant)
+    val pendingClusteringInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.REPLACE_COMMIT_ACTION, pendingInstant)
+    val timeline = metaClient.getActiveTimeline.getWriteTimeline
+    if (!timeline.containsInstant(pendingCompactionInstant) && 
!timeline.containsInstant(pendingClusteringInstant)) {
+      throw new RuntimeException(s"there is no pending instant : 
[$pendingClusteringInstant | $pendingCompactionInstant]")
+    } else if (timeline.containsInstant(pendingCompactionInstant)) {
+      isClustering = false
+      instant = pendingCompactionInstant
+      logInfo(s"compaction instant to rollback : ${instant}")
+    } else {
+      isClustering = true
+      instant = pendingClusteringInstant
+      logInfo(s"clustering instant to rollback : ${instant}")
+    }
+
+    var client: SparkRDDWriteClient[_] = null
+    try {
+      client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, 
Map.empty,
+        tableName.asInstanceOf[scala.Option[String]])
+
+      val startTs = System.currentTimeMillis()
+      doRollbackOnInflightInstant(client, instant, isClustering)
+      if (deleteRequestInstantFile) {
+        val requestInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, 
instant.getAction, instant.getTimestamp)
+        metaClient.getActiveTimeline.deleteInstantFileIfExists(requestInstant)
+      }
+      val timeCost = System.currentTimeMillis() - startTs
+      logInfo(s"Finish rollback pending instant: $pendingInstant," +
+        s" time cost: $timeCost ms.")
+
+      Seq(Row(instant.getTimestamp, timeCost.toString))
+    } finally {
+      if (client != null) {
+        client.close()
+      }
+    }
+  }
+
+  private def doRollbackOnInflightInstant(client: SparkRDDWriteClient[_], 
inflightInstant: HoodieInstant, isClustering: Boolean): Unit = {
+    val tsClient = client.getTableServiceClient
+    val table = HoodieSparkTable.create(client.getConfig, 
client.getEngineContext)
+
+    val getPendingRollbackInstantFunc: java.util.function.Function[String, 
Option[HoodiePendingRollbackInfo]] =
+      new java.util.function.Function[String, 
Option[HoodiePendingRollbackInfo]] {
+        override def apply(commitToRollback: String): 
Option[HoodiePendingRollbackInfo] = {
+          tsClient.getPendingRollbackInfo(table.getMetaClient, 
commitToRollback, false)
+        }
+      }
+
+    if (isClustering) {
+      table.rollbackInflightClustering(inflightInstant, 
getPendingRollbackInstantFunc)
+    } else {
+      
table.rollbackInflightCompaction(inflightInstant,getPendingRollbackInstantFunc)
+    }
+  }
+}
+
+object RunRollbackInflightTableServiceProcedure {
+  val NAME = "run_rollback_inflight_tableservice"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get(): ProcedureBuilder = new 
RunRollbackInflightTableServiceProcedure
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileStatusProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileStatusProcedure.scala
new file mode 100644
index 00000000000..1eaa810d7a2
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileStatusProcedure.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.HoodieCLIUtils
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieArchivedTimeline, HoodieDefaultTimeline, HoodieInstant, HoodieTimeline, 
TimelineMetadataUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.HoodieSparkTable
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.hudi.command.procedures
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+import scala.collection.JavaConverters._
+/**
+ * Used to view the status of a specified file, such as whether it has been 
deleted, which action deleted it, etc
+ */
+class ShowFileStatusProcedure extends BaseProcedure
+  with ProcedureBuilder
+  with PredicateHelper
+  with Logging {
+
+  private val DEFAULT_VALUE = ""
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "partition", DataTypes.StringType),
+    ProcedureParameter.required(2, "file", DataTypes.StringType)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("status", DataTypes.StringType, nullable = false, 
Metadata.empty),
+    StructField("action", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("instant", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("timeline", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("full_path", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  override def build: Procedure = new ShowFileStatusProcedure
+
+  override def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  override def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val partition = getArgValueOrDefault(args, PARAMETERS(1))
+    val fileName = getArgValueOrDefault(args, PARAMETERS(2))
+
+    val basePath: String = getBasePath(tableName, Option.empty)
+    val metaClient = createMetaClient(jsc, basePath)
+    val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath, 
Map.empty, tableName.asInstanceOf[Option[String]])
+    val table: HoodieSparkTable[String] = 
HoodieSparkTable.create(client.getConfig, client.getEngineContext)
+
+    if (partition.isEmpty && table.isPartitioned) {
+      throw new HoodieException(s"table $tableName is a partitioned table. 
Please specify the partition name where the current file is located.")
+    }
+
+    // step1 lookup clean/rollback/restore metadata in active & archive 
timeline
+    val fileStatus: Option[FileStatusInfo] = isDeleted(metaClient, 
partition.asInstanceOf[Option[String]], fileName.get.asInstanceOf[String])
+    if (fileStatus.isDefined) {
+      val res: FileStatusInfo = fileStatus.get
+      Seq(Row(res.status, res.action, res.instant, res.timeline, res.fullPath))
+    } else {
+      // step2 lookup write commit metadata
+      val fs = new 
Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
+      var path: Path = null
+      if (partition.nonEmpty) {
+        path = new Path(basePath, partition.get.asInstanceOf[String])
+      } else {
+        path = new Path(basePath)
+      }
+      fs.listStatus(path).toList
+        .find(f => f.getPath.getName.equals(fileName.get))
+        .map(f => Seq(Row(FileStatus.EXIST.toString, DEFAULT_VALUE, 
DEFAULT_VALUE, TimelineType.ACTIVE.toString, f.getPath.toUri.getPath)))
+        .getOrElse(Seq(Row(FileStatus.UNKNOWN.toString, DEFAULT_VALUE, 
DEFAULT_VALUE, DEFAULT_VALUE, DEFAULT_VALUE)))
+    }
+  }
+
+  private def isDeleted(metaClient: HoodieTableMetaClient, partition: 
Option[String], fileName: String): Option[FileStatusInfo] = {
+    // step1 check clean
+    checkCleanMetadata(metaClient, partition, fileName)
+      .orElse(
+        // step2 check rollback
+        checkRollbackMetadata(metaClient, partition, fileName)
+          .orElse(
+            // step3 check restore
+            checkRestoreMetadata(metaClient, partition, fileName)
+          )
+      )
+  }
+
+  private def checkRestoreMetadata(metaClient: HoodieTableMetaClient, 
partition: Option[String], fileName: String): Option[FileStatusInfo] = {
+    val restoreInstant = metaClient.reloadActiveTimeline
+      .getRestoreTimeline
+      .filterCompletedInstants
+      .getReverseOrderedInstants
+      .collect(java.util.stream.Collectors.toList[HoodieInstant])
+      .asScala
+
+    restoreInstant.find { instant =>
+      val hoodieRestoreMetadata =
+        TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
+          metaClient.getActiveTimeline.getInstantDetails(instant).get
+        )
+      val restoreMetadata = 
hoodieRestoreMetadata.getHoodieRestoreMetadata.values().asScala
+
+      restoreMetadata.exists { metadata =>
+        metadata.asScala.exists { rollbackMetadata =>
+          val partitionRollbackMetadata = rollbackMetadata.getPartitionMetadata
+          partition.flatMap(
+            p => Option.apply(partitionRollbackMetadata.get(p)).flatMap(
+              
_.getSuccessDeleteFiles.asScala.find(_.contains(fileName)))).isDefined ||
+            
partitionRollbackMetadata.values.iterator.asScala.exists(_.getSuccessDeleteFiles.asScala.exists(_.contains(fileName)))
+        }
+      }
+    }.map(restoreInstant =>
+      FileStatusInfo(
+        FileStatus.DELETED.toString,
+        HoodieTimeline.RESTORE_ACTION,
+        restoreInstant.getTimestamp,
+        TimelineType.ACTIVE.toString,
+        DEFAULT_VALUE
+      )
+    )
+  }
+
+  private def checkRollbackMetadata(metaClient: HoodieTableMetaClient, 
partition: Option[String], fileName: String): Option[FileStatusInfo] = {
+    checkRollbackMetadataInternal(metaClient.getActiveTimeline, partition, 
fileName)
+      .orElse(
+        checkRollbackMetadataInternal(metaClient.getArchivedTimeline, 
partition, fileName)
+      )
+  }
+
+  private def checkRollbackMetadataInternal(timeline: HoodieDefaultTimeline,
+                                            partition: Option[String], 
fileName: String): Option[FileStatusInfo] = {
+    val rollbackInstant = timeline.getRollbackTimeline
+      .filterCompletedInstants()
+      .getReverseOrderedInstants
+      .collect(java.util.stream.Collectors.toList[HoodieInstant])
+      .asScala
+
+    reloadTimelineIfNecessary(timeline)
+
+    rollbackInstant.find { instant =>
+      val rollbackMetadata =
+        
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(timeline.getInstantDetails(instant).get())
+      val partitionRollbackMetadata = rollbackMetadata.getPartitionMetadata
+      partition.flatMap(
+        p => Option.apply(partitionRollbackMetadata.get(p)).flatMap(
+          
_.getSuccessDeleteFiles.asScala.find(_.contains(fileName)))).isDefined ||
+        
partitionRollbackMetadata.values.iterator.asScala.exists(_.getSuccessDeleteFiles.asScala.exists(_.contains(fileName)))
+    }.map(instant => getResult(timeline, HoodieTimeline.ROLLBACK_ACTION, 
instant.getTimestamp).get)
+  }
+
+
+  private def checkCleanMetadata(metaClient: HoodieTableMetaClient, partition: 
Option[String], fileName: String): Option[FileStatusInfo] = {
+    checkCleanMetadataInternal(metaClient.getActiveTimeline, partition, 
fileName)
+      .orElse(checkCleanMetadataInternal(metaClient.getArchivedTimeline, 
partition, fileName))
+  }
+
+  private def checkCleanMetadataInternal(timeline: HoodieDefaultTimeline, 
partition: Option[String], fileName: String): Option[FileStatusInfo] = {
+    val cleanedInstant = timeline.getCleanerTimeline
+      .filterCompletedInstants()
+      .getReverseOrderedInstants
+      .collect(java.util.stream.Collectors.toList[HoodieInstant])
+      .asScala
+    reloadTimelineIfNecessary(timeline)
+    cleanedInstant.find { instant =>
+      val cleanMetadata = TimelineMetadataUtils.deserializeHoodieCleanMetadata(
+        timeline.getInstantDetails(instant).get())
+      val partitionCleanMetadata = cleanMetadata.getPartitionMetadata
+      partition.flatMap(p => 
Option.apply(partitionCleanMetadata.get(p)).flatMap(_.getSuccessDeleteFiles.asScala.find(_.contains(fileName)))).isDefined
 ||
+        
partitionCleanMetadata.values.iterator.asScala.exists(_.getSuccessDeleteFiles.asScala.exists(_.contains(fileName)))
+    }.map(instant => getResult(timeline, HoodieTimeline.CLEAN_ACTION, 
instant.getTimestamp).get)
+  }
+
+  private def getResult(timeline: HoodieDefaultTimeline, action: String, 
timestamp: String): Option[FileStatusInfo] = {
+    timeline match {
+      case _: HoodieActiveTimeline =>
+        Option.apply(FileStatusInfo(FileStatus.DELETED.toString, action, 
timestamp, TimelineType.ACTIVE.toString, DEFAULT_VALUE))
+      case _: HoodieArchivedTimeline =>
+        Option.apply(FileStatusInfo(FileStatus.DELETED.toString, action, 
timestamp, TimelineType.ARCHIVED.toString, DEFAULT_VALUE))
+      case _ => throw new HoodieException("Unsupported timeline type: " + 
timeline.getClass);
+    }
+  }
+
+  private def reloadTimelineIfNecessary(timeline: HoodieDefaultTimeline): Unit 
= {
+    timeline match {
+      case _: HoodieArchivedTimeline =>
+        val archivalTimeline: HoodieArchivedTimeline = 
timeline.asInstanceOf[HoodieArchivedTimeline]
+        archivalTimeline.loadCompletedInstantDetailsInMemory()
+      case _ =>
+    }
+  }
+}
+
+object ShowFileStatusProcedure {
+  val NAME = "show_file_status"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new ShowFileStatusProcedure()
+  }
+}
+
+object FileStatus extends Enumeration {
+  type FileStatus = Value
+
+  val DELETED: procedures.FileStatus.Value = Value("deleted")
+  val EXIST: procedures.FileStatus.Value = Value("exist")
+  val UNKNOWN: procedures.FileStatus.Value = Value("unknown")
+}
+
+object TimelineType extends Enumeration {
+  type TimelineType = Value
+
+  val ACTIVE: procedures.TimelineType.Value = Value("active")
+  val ARCHIVED: procedures.TimelineType.Value = Value("archived")
+}
+
+case class FileStatusInfo(status: String, action: String, instant: String, 
timeline: String, fullPath: String)
+
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRunRollbackInflightTableServiceProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRunRollbackInflightTableServiceProcedure.scala
new file mode 100644
index 00000000000..b2bb7e00daf
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRunRollbackInflightTableServiceProcedure.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.junit.jupiter.api.Assertions
+
+class TestRunRollbackInflightTableServiceProcedure extends 
HoodieSparkProcedureTestBase {
+  test("Test Call run_rollback_inflight_tableservice Procedure for 
clustering") {
+    withTempDir {tmp => {
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}/$tableName"
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | options (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  preCombineField = 'ts'
+           | )
+           | partitioned by(ts)
+           | location '$basePath'
+     """.stripMargin)
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+      spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+
+      spark.sql(s"call run_clustering(table => '$tableName', op => 
'schedule')")
+      spark.sql(s"call run_clustering(table => '$tableName', op => 'execute')")
+
+      // delete clustering commit file
+      val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath)
+        
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration)).build()
+      val clusteringInstant = 
metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.get(0)
+      metaClient.getActiveTimeline.deleteInstantFileIfExists(clusteringInstant)
+
+      val clusteringInstantTime = clusteringInstant.getTimestamp
+
+      spark.sql(s"call run_rollback_inflight_tableservice(table => 
'$tableName', pending_instant => '$clusteringInstantTime')")
+      Assertions.assertTrue(!metaClient.reloadActiveTimeline().getInstants
+        .contains(new HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstantTime)))
+      Assertions.assertTrue(metaClient.reloadActiveTimeline().getInstants
+        .contains(new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstantTime)))
+    }}
+  }
+
+  test("Test Call run_rollback_inflight_tableservice Procedure for 
compaction") {
+    withTempDir {
+      tmp => {
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | options (
+             |  primaryKey ='id',
+             |  type = 'mor',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by(ts)
+             | location '$basePath'
+         """.stripMargin)
+        spark.sql("set hoodie.parquet.max.file.size = 10000")
+        // disable automatic inline compaction
+        spark.sql("set hoodie.compact.inline=false")
+        spark.sql("set hoodie.compact.schedule.inline=false")
+
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
+        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
+        spark.sql(s"update $tableName set price = 11 where id = 1")
+
+        spark.sql(s"call run_compaction(op => 'schedule', table => 
'$tableName')")
+        spark.sql(s"call run_compaction(op => 'run', table => '$tableName')")
+
+        // delete compaction commit file
+        val metaClient = HoodieTableMetaClient.builder
+          
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration)).setBasePath(basePath).build
+        val compactionInstant: HoodieInstant = 
metaClient.getActiveTimeline.getReverseOrderedInstants.findFirst().get()
+
+        
metaClient.getActiveTimeline.deleteInstantFileIfExists(compactionInstant)
+        val compactionInstantTime = compactionInstant.getTimestamp
+
+        spark.sql(s"call run_rollback_inflight_tableservice(table => 
'$tableName', pending_instant => '$compactionInstantTime', 
delete_request_instant_file => true)")
+        Assertions.assertTrue(!metaClient.reloadActiveTimeline().getInstants
+          .contains(new HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.COMPACTION_ACTION, compactionInstantTime)))
+        Assertions.assertTrue(!metaClient.reloadActiveTimeline().getInstants
+          .contains(new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.COMPACTION_ACTION, compactionInstantTime)))
+      }
+    }
+  }
+
+
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowFileStatusProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowFileStatusProcedure.scala
new file mode 100644
index 00000000000..27196082c34
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowFileStatusProcedure.scala
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers}
+import org.apache.spark.sql.hudi.command.procedures.{FileStatus, TimelineType}
+
+import scala.collection.JavaConverters._
+
+class TestShowFileStatusProcedure extends HoodieSparkProcedureTestBase {
+
+  private val DEFAULT_VALUE = ""
+
+  test("Test Call show_file_status Procedure By COW / MOR Partitioned Table") {
+    withTempDir { tmp =>
+      Seq("mor", "cow").foreach { tableType =>
+
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+        // specify clean conf & archive conf & compaction conf
+        spark.sql("set hoodie.clean.commits.retained = 2")
+        spark.sql("set hoodie.keep.min.commits = 3")
+        spark.sql("set hoodie.keep.max.commits = 4")
+        spark.sql("set hoodie.compact.inline=false")
+        spark.sql("set hoodie.compact.schedule.inline=false")
+
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long,
+             |  partition long
+             |) using hudi
+             | options (
+             |  primaryKey ='id',
+             |  type = '$tableType',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by(partition)
+             | location '$basePath'
+        """.stripMargin)
+
+        val partition: String = "partition=1000"
+        var before: List[String] = null
+        var after: List[String] = null
+        var cleanedDataFile: Option[String] = Option.empty
+
+        val fs = new 
Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
+        val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath, 
Map.empty, Option(tableName))
+        val metaClient: HoodieTableMetaClient = 
client.getInternalSchemaAndMetaClient.getRight
+
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+        val firstCleanedDataFile = getAllDataFile(fs, basePath, 
Option.apply(partition)).toStream.filter(f => !f.startsWith(".")).head
+        cleanedDataFile = Option.apply(firstCleanedDataFile)
+        checkAnswer(s"call show_file_status(table => '$tableName', partition 
=> '$partition', file => '${cleanedDataFile.get}')")(
+          Seq(FileStatus.EXIST.toString, DEFAULT_VALUE, DEFAULT_VALUE, 
TimelineType.ACTIVE.toString, new Path(basePath, new Path(partition, 
cleanedDataFile.get)).toString))
+
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001, 1000)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+
+        checkAnswer(s"call show_file_status(table => '$tableName', partition 
=> '$partition', file => '$firstCleanedDataFile')")(
+          Seq(FileStatus.DELETED.toString, HoodieTimeline.CLEAN_ACTION, 
metaClient.reloadActiveTimeline().getCleanerTimeline.lastInstant().get.getTimestamp,
 TimelineType.ACTIVE.toString, DEFAULT_VALUE)
+        )
+
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1001)")
+        // clustering / compaction
+        val newInstant = client.createNewInstantTime()
+        if (tableType.equals("cow")) {
+          client.scheduleClusteringAtInstant(newInstant, HOption.empty())
+          client.cluster(newInstant)
+        } else {
+          client.scheduleCompactionAtInstant(newInstant, HOption.empty())
+          client.compact(newInstant)
+        }
+
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1002, 1000)")
+
+        // savepoint
+        val savepointTime: String = getSpecifyActionLatestTime(fs, basePath, 
newInstant, 1).get
+        spark.sql(s"call create_savepoint(table => '$tableName', commit_time 
=> '$savepointTime')")
+        spark.sql(s"insert into $tableName values(2, 'a2', 11, 1002, 1000)")
+        spark.sql(s"insert into $tableName values(6, 'a6', 10, 1000, 1001)")
+
+        // restore
+        before = getAllDataFile(fs, basePath, Option.apply(partition))
+        spark.sql(s"call rollback_to_savepoint(table => '$tableName', 
instant_time => '$savepointTime')")
+        after = getAllDataFile(fs, basePath, Option.apply(partition))
+        cleanedDataFile = getAnyOneDataFile(before, after)
+
+        checkAnswer(s"call show_file_status(table => '$tableName', partition 
=> '$partition', file => '${cleanedDataFile.get}')")(
+          Seq(FileStatus.DELETED.toString, HoodieTimeline.RESTORE_ACTION, 
metaClient.reloadActiveTimeline().getRestoreTimeline.lastInstant().get.getTimestamp,
 TimelineType.ACTIVE.toString, DEFAULT_VALUE))
+
+        val latestTime = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+        spark.sql(s"insert into $tableName values(7, 'a7', 15, 1000, 1000)")
+        spark.sql(s"insert into $tableName values(8, 'a8', 12, 1000, 1000)")
+
+        before = getAllDataFile(fs, basePath, Option.apply(partition))
+        // rollback
+        val rollbackTime = getSpecifyActionLatestTime(fs, basePath, 
latestTime, 5).get
+        spark.sql(s"call rollback_to_instant(table => '$tableName', 
instant_time => '$rollbackTime')")
+        spark.sql(s"insert into $tableName values(9, 'a9', 16, 1000, 1000)")
+        after = getAllDataFile(fs, basePath, Option.apply(partition))
+        cleanedDataFile = getAnyOneDataFile(before, after)
+        checkAnswer(s"call show_file_status(table => '$tableName', partition 
=> '$partition', file => '${cleanedDataFile.get}')")(
+          Seq(FileStatus.DELETED.toString, HoodieTimeline.ROLLBACK_ACTION, 
metaClient.reloadActiveTimeline().getRollbackTimeline.lastInstant().get.getTimestamp,
 TimelineType.ACTIVE.toString, DEFAULT_VALUE))
+
+        // unknown
+        checkAnswer(s"call show_file_status(table => '$tableName', partition 
=> '$partition', file => 'unknown')")(
+          Seq(FileStatus.UNKNOWN.toString, DEFAULT_VALUE, DEFAULT_VALUE, 
DEFAULT_VALUE, DEFAULT_VALUE))
+      }
+    }
+  }
+
+  test("Test Call show_file_status Procedure By COW / MOR Non_Partitioned 
Table") {
+    withTempDir { tmp =>
+      Seq("mor", "cow").foreach { tableType =>
+
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+        // specify clean conf & archive conf & compaction conf
+        spark.sql("set hoodie.clean.commits.retained = 2")
+        spark.sql("set hoodie.keep.min.commits = 3")
+        spark.sql("set hoodie.keep.max.commits = 4")
+        spark.sql("set hoodie.compact.inline=false")
+        spark.sql("set hoodie.compact.schedule.inline=false")
+
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long,
+             |  partition long
+             |) using hudi
+             | options (
+             |  primaryKey ='id',
+             |  type = '$tableType',
+             |  preCombineField = 'ts'
+             | )
+             | location '$basePath'
+        """.stripMargin)
+
+        val partition: Option[String] = Option.empty
+        var before: List[String] = null
+        var after: List[String] = null
+        var cleanedDataFile: Option[String] = Option.empty
+
+        val fs = new 
Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
+        val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath, 
Map.empty, Option(tableName))
+        val metaClient: HoodieTableMetaClient = 
client.getInternalSchemaAndMetaClient.getRight
+
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+        val firstCleanedDataFile = getAllDataFile(fs, basePath, 
partition).toStream.filter(f => !f.startsWith(".")).head
+        cleanedDataFile = Option.apply(firstCleanedDataFile)
+        checkAnswer(s"call show_file_status(table => '$tableName', file => 
'${cleanedDataFile.get}')")(
+          Seq(FileStatus.EXIST.toString, DEFAULT_VALUE, DEFAULT_VALUE, 
TimelineType.ACTIVE.toString, new Path(basePath, new 
Path(cleanedDataFile.get)).toString))
+
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001, 1000)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+
+        checkAnswer(s"call show_file_status(table => '$tableName', file => 
'$firstCleanedDataFile')")(
+          Seq(FileStatus.DELETED.toString, HoodieTimeline.CLEAN_ACTION, 
metaClient.reloadActiveTimeline().getCleanerTimeline.lastInstant().get.getTimestamp,
 TimelineType.ACTIVE.toString, DEFAULT_VALUE)
+        )
+
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+        // clustering / compaction
+        val newInstant = client.createNewInstantTime()
+        if (tableType.equals("cow")) {
+          client.scheduleClusteringAtInstant(newInstant, HOption.empty())
+          client.cluster(newInstant)
+        } else {
+          client.scheduleCompactionAtInstant(newInstant, HOption.empty())
+          client.compact(newInstant)
+        }
+
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1002, 1000)")
+
+        // savepoint
+        val savepointTime: String = getSpecifyActionLatestTime(fs, basePath, 
newInstant, 1).get
+        spark.sql(s"call create_savepoint(table => '$tableName', commit_time 
=> '$savepointTime')")
+        spark.sql(s"insert into $tableName values(2, 'a2', 11, 1002, 1000)")
+        spark.sql(s"insert into $tableName values(6, 'a6', 10, 1000, 1000)")
+
+        // restore
+        before = getAllDataFile(fs, basePath, partition)
+        spark.sql(s"call rollback_to_savepoint(table => '$tableName', 
instant_time => '$savepointTime')")
+        after = getAllDataFile(fs, basePath, partition)
+        cleanedDataFile = getAnyOneDataFile(before, after)
+        checkAnswer(s"call show_file_status(table => '$tableName', file => 
'${cleanedDataFile.get}')")(
+          Seq(FileStatus.DELETED.toString, HoodieTimeline.RESTORE_ACTION,
+            
metaClient.reloadActiveTimeline().getRestoreTimeline.lastInstant().get.getTimestamp,
 TimelineType.ACTIVE.toString, DEFAULT_VALUE))
+
+        val latestTime = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+        spark.sql(s"insert into $tableName values(7, 'a7', 15, 1000, 1000)")
+        spark.sql(s"insert into $tableName values(8, 'a8', 12, 1000, 1000)")
+
+        before = getAllDataFile(fs, basePath, partition)
+        // rollback
+        val rollbackTime = getSpecifyActionLatestTime(fs, basePath, 
latestTime, 5).get
+        spark.sql(s"call rollback_to_instant(table => '$tableName', 
instant_time => '$rollbackTime')")
+        spark.sql(s"insert into $tableName values(9, 'a9', 16, 1000, 1000)")
+        after = getAllDataFile(fs, basePath, partition)
+        cleanedDataFile = getAnyOneDataFile(before, after)
+        checkAnswer(s"call show_file_status(table => '$tableName', file => 
'${cleanedDataFile.get}')")(
+          Seq(FileStatus.DELETED.toString, HoodieTimeline.ROLLBACK_ACTION, 
metaClient.reloadActiveTimeline().getRollbackTimeline.lastInstant().get.getTimestamp,
 TimelineType.ACTIVE.toString, DEFAULT_VALUE))
+
+        // unknown
+        checkAnswer(s"call show_file_status(table => '$tableName', file => 
'unknown')")(
+          Seq(FileStatus.UNKNOWN.toString, DEFAULT_VALUE, DEFAULT_VALUE, 
DEFAULT_VALUE, DEFAULT_VALUE))
+      }
+    }
+  }
+
+
+  test("Test Call show_file_status Procedure By COW / MOR For Archive") {
+    withTempDir { tmp =>
+      Seq("mor", "cow").foreach { tableType =>
+
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+        // specify clean conf & archive conf & compaction conf
+        spark.sql("set hoodie.clean.commits.retained = 2")
+        spark.sql("set hoodie.keep.min.commits = 3")
+        spark.sql("set hoodie.keep.max.commits = 4")
+
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long,
+             |  partition long
+             |) using hudi
+             | options (
+             |  primaryKey ='id',
+             |  type = '$tableType',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by(partition)
+             | location '$basePath'
+        """.stripMargin)
+
+        val partition: String = "partition=1000"
+        var cleanedDataFile: String = null
+
+        val fs = new 
Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
+
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+        cleanedDataFile = getAllDataFile(fs, basePath, 
Option.apply(partition)).toStream.filter(f => !f.startsWith(".")).head
+        val firstCleanedDataFile = cleanedDataFile
+
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001, 1000)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+
+        val firstCleanedDataTime =
+          spark.sql(s"call show_file_status(table => '$tableName', partition 
=> '$partition', file => '$firstCleanedDataFile')")
+            .collect().toList.head.get(2)
+
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1002, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 11, 1002, 1000)")
+        spark.sql(s"insert into $tableName values(6, 'a6', 10, 1000, 1000)")
+        spark.sql(s"insert into $tableName values(7, 'a7', 15, 1000, 1000)")
+        spark.sql(s"insert into $tableName values(8, 'a8', 12, 1000, 1000)")
+        spark.sql(s"insert into $tableName values(9, 'a9', 16, 1000, 1000)")
+
+        checkAnswer(s"call show_file_status(table => '$tableName',  partition 
=> '$partition', file => '$firstCleanedDataFile')")(
+          Seq(FileStatus.DELETED.toString, HoodieTimeline.CLEAN_ACTION, 
firstCleanedDataTime, TimelineType.ARCHIVED.toString, DEFAULT_VALUE))
+      }
+    }
+  }
+
+  private def getAnyOneDataFile(before: List[String], after: List[String]): 
Option[String] = {
+    val sortedBefore = before.sorted
+    val sortedAfter = after.sorted
+    sortedBefore.diff(sortedAfter).headOption
+  }
+
+  private def getAllDataFile(fs: FileSystem, basePath: String, partition: 
Option[String]): List[String] = {
+    if (partition.isDefined) {
+      fs.listStatus(new Path(basePath, partition.get)).filter(f => f.isFile && 
!f.getPath.getName.startsWith(".hoodie_partition_metadata")).map(f => 
f.getPath.getName).toList
+    } else {
+      fs.listStatus(new Path(basePath)).filter(f => f.isFile && 
!f.getPath.getName.startsWith(".hoodie_partition_metadata")).map(f => 
f.getPath.getName).toList
+    }
+  }
+
+
+  private def getSpecifyActionLatestTime(fs: FileSystem, basePath: String, 
specifyInstant: String, after: Int): Option[String] = {
+    val commitsSince = HoodieDataSourceHelpers.listCommitsSince(fs, basePath, 
specifyInstant).asScala.toList
+    if (commitsSince.length > after) {
+      Some(commitsSince(after))
+    } else {
+      commitsSince.lastOption
+    }
+  }
+}
+


Reply via email to