This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a9a75f43a1c [HUDI-7941] add show_file_status procedure &
run_rollback_inflight_tableservice procedure (#11538)
a9a75f43a1c is described below
commit a9a75f43a1cad05fc363acfb436f57adefce5676
Author: empcl <[email protected]>
AuthorDate: Tue Jul 2 17:21:36 2024 +0800
[HUDI-7941] add show_file_status procedure &
run_rollback_inflight_tableservice procedure (#11538)
Co-authored-by: chenlei677 <[email protected]>
---
.../hudi/command/procedures/HoodieProcedures.scala | 2 +
.../RunRollbackInflightTableServiceProcedure.scala | 143 +++++++++
.../procedures/ShowFileStatusProcedure.scala | 244 +++++++++++++++
...tRunRollbackInflightTableServiceProcedure.scala | 122 ++++++++
.../procedure/TestShowFileStatusProcedure.scala | 330 +++++++++++++++++++++
5 files changed, 841 insertions(+)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index e12aad789d7..87c6971a791 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -53,6 +53,7 @@ object HoodieProcedures {
,(DeleteMarkerProcedure.NAME, DeleteMarkerProcedure.builder)
,(ShowRollbacksProcedure.NAME, ShowRollbacksProcedure.builder)
,(ShowRollbackDetailProcedure.NAME, ShowRollbackDetailProcedure.builder)
+ ,(RunRollbackInflightTableServiceProcedure.NAME,
RunRollbackInflightTableServiceProcedure.builder)
,(ExportInstantsProcedure.NAME, ExportInstantsProcedure.builder)
,(ShowAllFileSystemViewProcedure.NAME,
ShowAllFileSystemViewProcedure.builder)
,(ShowLatestFileSystemViewProcedure.NAME,
ShowLatestFileSystemViewProcedure.builder)
@@ -76,6 +77,7 @@ object HoodieProcedures {
,(ShowMetadataTableStatsProcedure.NAME,
ShowMetadataTableStatsProcedure.builder)
,(ValidateMetadataTableFilesProcedure.NAME,
ValidateMetadataTableFilesProcedure.builder)
,(ShowFsPathDetailProcedure.NAME, ShowFsPathDetailProcedure.builder)
+ ,(ShowFileStatusProcedure.NAME, ShowFileStatusProcedure.builder)
,(CopyToTableProcedure.NAME, CopyToTableProcedure.builder)
,(RepairAddpartitionmetaProcedure.NAME,
RepairAddpartitionmetaProcedure.builder)
,(RepairCorruptedCleanFilesProcedure.NAME,
RepairCorruptedCleanFilesProcedure.builder)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunRollbackInflightTableServiceProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunRollbackInflightTableServiceProcedure.scala
new file mode 100644
index 00000000000..28e0d03b85d
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunRollbackInflightTableServiceProcedure.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.HoodieCLIUtils
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.common.HoodiePendingRollbackInfo
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.util.Option
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.table.HoodieSparkTable
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.hudi.command.procedures
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.util.function.Supplier
+
+/**
+ * When calling this procedure, one of parameters table and path must be
specified at least.
+ * If both parameters are given, table will take effect.
+ * pending_instant represents the instant information of the current
operation, and pending_instant can be either
+ * a clustering instant or a compaction instant.
+ * delete_request_instant_file is used to mark whether to delete the request
instant file.
+ */
+class RunRollbackInflightTableServiceProcedure extends BaseProcedure
+ with ProcedureBuilder
+ with PredicateHelper
+ with Logging {
+ override def build: Procedure = new RunRollbackInflightTableServiceProcedure
+
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.optional(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.optional(1, "path", DataTypes.StringType, None),
+ ProcedureParameter.required(2, "pending_instant", DataTypes.StringType),
+ ProcedureParameter.optional(3, "delete_request_instant_file",
DataTypes.BooleanType, false)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("instant", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("time_cost", DataTypes.StringType, nullable = true,
Metadata.empty)
+ ))
+
+ override def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ override def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+ val pendingInstant = getArgValueOrDefault(args, PARAMETERS(2)).get.toString
+ val deleteRequestInstantFile = getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[Boolean]
+
+ val basePath: String = getBasePath(tableName, tablePath)
+ val metaClient = HoodieTableMetaClient.builder
+
.setConf(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration)).setBasePath(basePath).build
+
+ // determine whether the current instant exists and whether it is
clustering or compaction
+ var isClustering: Boolean = true
+ var instant: HoodieInstant = null
+ val pendingCompactionInstant = new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION,
pendingInstant)
+ val pendingClusteringInstant = new
HoodieInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.REPLACE_COMMIT_ACTION, pendingInstant)
+ val timeline = metaClient.getActiveTimeline.getWriteTimeline
+ if (!timeline.containsInstant(pendingCompactionInstant) &&
!timeline.containsInstant(pendingClusteringInstant)) {
+ throw new RuntimeException(s"there is no pending instant :
[$pendingClusteringInstant | $pendingCompactionInstant]")
+ } else if (timeline.containsInstant(pendingCompactionInstant)) {
+ isClustering = false
+ instant = pendingCompactionInstant
+ logInfo(s"compaction instant to rollback : ${instant}")
+ } else {
+ isClustering = true
+ instant = pendingClusteringInstant
+ logInfo(s"clustering instant to rollback : ${instant}")
+ }
+
+ var client: SparkRDDWriteClient[_] = null
+ try {
+ client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath,
Map.empty,
+ tableName.asInstanceOf[scala.Option[String]])
+
+ val startTs = System.currentTimeMillis()
+ doRollbackOnInflightInstant(client, instant, isClustering)
+ if (deleteRequestInstantFile) {
+ val requestInstant = new HoodieInstant(HoodieInstant.State.REQUESTED,
instant.getAction, instant.getTimestamp)
+ metaClient.getActiveTimeline.deleteInstantFileIfExists(requestInstant)
+ }
+ val timeCost = System.currentTimeMillis() - startTs
+ logInfo(s"Finish rollback pending instant: $pendingInstant," +
+ s" time cost: $timeCost ms.")
+
+ Seq(Row(instant.getTimestamp, timeCost.toString))
+ } finally {
+ if (client != null) {
+ client.close()
+ }
+ }
+ }
+
+ private def doRollbackOnInflightInstant(client: SparkRDDWriteClient[_],
inflightInstant: HoodieInstant, isClustering: Boolean): Unit = {
+ val tsClient = client.getTableServiceClient
+ val table = HoodieSparkTable.create(client.getConfig,
client.getEngineContext)
+
+ val getPendingRollbackInstantFunc: java.util.function.Function[String,
Option[HoodiePendingRollbackInfo]] =
+ new java.util.function.Function[String,
Option[HoodiePendingRollbackInfo]] {
+ override def apply(commitToRollback: String):
Option[HoodiePendingRollbackInfo] = {
+ tsClient.getPendingRollbackInfo(table.getMetaClient,
commitToRollback, false)
+ }
+ }
+
+ if (isClustering) {
+ table.rollbackInflightClustering(inflightInstant,
getPendingRollbackInstantFunc)
+ } else {
+
table.rollbackInflightCompaction(inflightInstant,getPendingRollbackInstantFunc)
+ }
+ }
+}
+
+object RunRollbackInflightTableServiceProcedure {
+ val NAME = "run_rollback_inflight_tableservice"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get(): ProcedureBuilder = new
RunRollbackInflightTableServiceProcedure
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileStatusProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileStatusProcedure.scala
new file mode 100644
index 00000000000..1eaa810d7a2
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileStatusProcedure.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.HoodieCLIUtils
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline,
HoodieArchivedTimeline, HoodieDefaultTimeline, HoodieInstant, HoodieTimeline,
TimelineMetadataUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.HoodieSparkTable
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.hudi.command.procedures
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.util.function.Supplier
+import scala.collection.JavaConverters._
+/**
+ * Used to view the status of a specified file, such as whether it has been
deleted, which action deleted it, etc
+ */
+class ShowFileStatusProcedure extends BaseProcedure
+ with ProcedureBuilder
+ with PredicateHelper
+ with Logging {
+
+ private val DEFAULT_VALUE = ""
+
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
+ ProcedureParameter.optional(1, "partition", DataTypes.StringType),
+ ProcedureParameter.required(2, "file", DataTypes.StringType)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("status", DataTypes.StringType, nullable = false,
Metadata.empty),
+ StructField("action", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("instant", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("timeline", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("full_path", DataTypes.StringType, nullable = true,
Metadata.empty)
+ ))
+
+ override def build: Procedure = new ShowFileStatusProcedure
+
+ override def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ override def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val partition = getArgValueOrDefault(args, PARAMETERS(1))
+ val fileName = getArgValueOrDefault(args, PARAMETERS(2))
+
+ val basePath: String = getBasePath(tableName, Option.empty)
+ val metaClient = createMetaClient(jsc, basePath)
+ val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath,
Map.empty, tableName.asInstanceOf[Option[String]])
+ val table: HoodieSparkTable[String] =
HoodieSparkTable.create(client.getConfig, client.getEngineContext)
+
+ if (partition.isEmpty && table.isPartitioned) {
+ throw new HoodieException(s"table $tableName is a partitioned table.
Please specify the partition name where the current file is located.")
+ }
+
+ // step1 lookup clean/rollback/restore metadata in active & archive
timeline
+ val fileStatus: Option[FileStatusInfo] = isDeleted(metaClient,
partition.asInstanceOf[Option[String]], fileName.get.asInstanceOf[String])
+ if (fileStatus.isDefined) {
+ val res: FileStatusInfo = fileStatus.get
+ Seq(Row(res.status, res.action, res.instant, res.timeline, res.fullPath))
+ } else {
+ // step2 lookup write commit metadata
+ val fs = new
Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
+ var path: Path = null
+ if (partition.nonEmpty) {
+ path = new Path(basePath, partition.get.asInstanceOf[String])
+ } else {
+ path = new Path(basePath)
+ }
+ fs.listStatus(path).toList
+ .find(f => f.getPath.getName.equals(fileName.get))
+ .map(f => Seq(Row(FileStatus.EXIST.toString, DEFAULT_VALUE,
DEFAULT_VALUE, TimelineType.ACTIVE.toString, f.getPath.toUri.getPath)))
+ .getOrElse(Seq(Row(FileStatus.UNKNOWN.toString, DEFAULT_VALUE,
DEFAULT_VALUE, DEFAULT_VALUE, DEFAULT_VALUE)))
+ }
+ }
+
+ private def isDeleted(metaClient: HoodieTableMetaClient, partition:
Option[String], fileName: String): Option[FileStatusInfo] = {
+ // step1 check clean
+ checkCleanMetadata(metaClient, partition, fileName)
+ .orElse(
+ // step2 check rollback
+ checkRollbackMetadata(metaClient, partition, fileName)
+ .orElse(
+ // step3 check restore
+ checkRestoreMetadata(metaClient, partition, fileName)
+ )
+ )
+ }
+
+ private def checkRestoreMetadata(metaClient: HoodieTableMetaClient,
partition: Option[String], fileName: String): Option[FileStatusInfo] = {
+ val restoreInstant = metaClient.reloadActiveTimeline
+ .getRestoreTimeline
+ .filterCompletedInstants
+ .getReverseOrderedInstants
+ .collect(java.util.stream.Collectors.toList[HoodieInstant])
+ .asScala
+
+ restoreInstant.find { instant =>
+ val hoodieRestoreMetadata =
+ TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
+ metaClient.getActiveTimeline.getInstantDetails(instant).get
+ )
+ val restoreMetadata =
hoodieRestoreMetadata.getHoodieRestoreMetadata.values().asScala
+
+ restoreMetadata.exists { metadata =>
+ metadata.asScala.exists { rollbackMetadata =>
+ val partitionRollbackMetadata = rollbackMetadata.getPartitionMetadata
+ partition.flatMap(
+ p => Option.apply(partitionRollbackMetadata.get(p)).flatMap(
+
_.getSuccessDeleteFiles.asScala.find(_.contains(fileName)))).isDefined ||
+
partitionRollbackMetadata.values.iterator.asScala.exists(_.getSuccessDeleteFiles.asScala.exists(_.contains(fileName)))
+ }
+ }
+ }.map(restoreInstant =>
+ FileStatusInfo(
+ FileStatus.DELETED.toString,
+ HoodieTimeline.RESTORE_ACTION,
+ restoreInstant.getTimestamp,
+ TimelineType.ACTIVE.toString,
+ DEFAULT_VALUE
+ )
+ )
+ }
+
+ private def checkRollbackMetadata(metaClient: HoodieTableMetaClient,
partition: Option[String], fileName: String): Option[FileStatusInfo] = {
+ checkRollbackMetadataInternal(metaClient.getActiveTimeline, partition,
fileName)
+ .orElse(
+ checkRollbackMetadataInternal(metaClient.getArchivedTimeline,
partition, fileName)
+ )
+ }
+
+ private def checkRollbackMetadataInternal(timeline: HoodieDefaultTimeline,
+ partition: Option[String],
fileName: String): Option[FileStatusInfo] = {
+ val rollbackInstant = timeline.getRollbackTimeline
+ .filterCompletedInstants()
+ .getReverseOrderedInstants
+ .collect(java.util.stream.Collectors.toList[HoodieInstant])
+ .asScala
+
+ reloadTimelineIfNecessary(timeline)
+
+ rollbackInstant.find { instant =>
+ val rollbackMetadata =
+
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(timeline.getInstantDetails(instant).get())
+ val partitionRollbackMetadata = rollbackMetadata.getPartitionMetadata
+ partition.flatMap(
+ p => Option.apply(partitionRollbackMetadata.get(p)).flatMap(
+
_.getSuccessDeleteFiles.asScala.find(_.contains(fileName)))).isDefined ||
+
partitionRollbackMetadata.values.iterator.asScala.exists(_.getSuccessDeleteFiles.asScala.exists(_.contains(fileName)))
+ }.map(instant => getResult(timeline, HoodieTimeline.ROLLBACK_ACTION,
instant.getTimestamp).get)
+ }
+
+
+ private def checkCleanMetadata(metaClient: HoodieTableMetaClient, partition:
Option[String], fileName: String): Option[FileStatusInfo] = {
+ checkCleanMetadataInternal(metaClient.getActiveTimeline, partition,
fileName)
+ .orElse(checkCleanMetadataInternal(metaClient.getArchivedTimeline,
partition, fileName))
+ }
+
+ private def checkCleanMetadataInternal(timeline: HoodieDefaultTimeline,
partition: Option[String], fileName: String): Option[FileStatusInfo] = {
+ val cleanedInstant = timeline.getCleanerTimeline
+ .filterCompletedInstants()
+ .getReverseOrderedInstants
+ .collect(java.util.stream.Collectors.toList[HoodieInstant])
+ .asScala
+ reloadTimelineIfNecessary(timeline)
+ cleanedInstant.find { instant =>
+ val cleanMetadata = TimelineMetadataUtils.deserializeHoodieCleanMetadata(
+ timeline.getInstantDetails(instant).get())
+ val partitionCleanMetadata = cleanMetadata.getPartitionMetadata
+ partition.flatMap(p =>
Option.apply(partitionCleanMetadata.get(p)).flatMap(_.getSuccessDeleteFiles.asScala.find(_.contains(fileName)))).isDefined
||
+
partitionCleanMetadata.values.iterator.asScala.exists(_.getSuccessDeleteFiles.asScala.exists(_.contains(fileName)))
+ }.map(instant => getResult(timeline, HoodieTimeline.CLEAN_ACTION,
instant.getTimestamp).get)
+ }
+
+ private def getResult(timeline: HoodieDefaultTimeline, action: String,
timestamp: String): Option[FileStatusInfo] = {
+ timeline match {
+ case _: HoodieActiveTimeline =>
+ Option.apply(FileStatusInfo(FileStatus.DELETED.toString, action,
timestamp, TimelineType.ACTIVE.toString, DEFAULT_VALUE))
+ case _: HoodieArchivedTimeline =>
+ Option.apply(FileStatusInfo(FileStatus.DELETED.toString, action,
timestamp, TimelineType.ARCHIVED.toString, DEFAULT_VALUE))
+ case _ => throw new HoodieException("Unsupported timeline type: " +
timeline.getClass);
+ }
+ }
+
+ private def reloadTimelineIfNecessary(timeline: HoodieDefaultTimeline): Unit
= {
+ timeline match {
+ case _: HoodieArchivedTimeline =>
+ val archivalTimeline: HoodieArchivedTimeline =
timeline.asInstanceOf[HoodieArchivedTimeline]
+ archivalTimeline.loadCompletedInstantDetailsInMemory()
+ case _ =>
+ }
+ }
+}
+
+object ShowFileStatusProcedure {
+ val NAME = "show_file_status"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new ShowFileStatusProcedure()
+ }
+}
+
+object FileStatus extends Enumeration {
+ type FileStatus = Value
+
+ val DELETED: procedures.FileStatus.Value = Value("deleted")
+ val EXIST: procedures.FileStatus.Value = Value("exist")
+ val UNKNOWN: procedures.FileStatus.Value = Value("unknown")
+}
+
+object TimelineType extends Enumeration {
+ type TimelineType = Value
+
+ val ACTIVE: procedures.TimelineType.Value = Value("active")
+ val ARCHIVED: procedures.TimelineType.Value = Value("archived")
+}
+
+case class FileStatusInfo(status: String, action: String, instant: String,
timeline: String, fullPath: String)
+
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRunRollbackInflightTableServiceProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRunRollbackInflightTableServiceProcedure.scala
new file mode 100644
index 00000000000..b2bb7e00daf
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRunRollbackInflightTableServiceProcedure.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.junit.jupiter.api.Assertions
+
+class TestRunRollbackInflightTableServiceProcedure extends
HoodieSparkProcedureTestBase {
+ test("Test Call run_rollback_inflight_tableservice Procedure for
clustering") {
+ withTempDir {tmp => {
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = 'cow',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(ts)
+ | location '$basePath'
+ """.stripMargin)
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+
+ spark.sql(s"call run_clustering(table => '$tableName', op =>
'schedule')")
+ spark.sql(s"call run_clustering(table => '$tableName', op => 'execute')")
+
+ // delete clustering commit file
+ val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath)
+
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration)).build()
+ val clusteringInstant =
metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.get(0)
+ metaClient.getActiveTimeline.deleteInstantFileIfExists(clusteringInstant)
+
+ val clusteringInstantTime = clusteringInstant.getTimestamp
+
+ spark.sql(s"call run_rollback_inflight_tableservice(table =>
'$tableName', pending_instant => '$clusteringInstantTime')")
+ Assertions.assertTrue(!metaClient.reloadActiveTimeline().getInstants
+ .contains(new HoodieInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstantTime)))
+ Assertions.assertTrue(metaClient.reloadActiveTimeline().getInstants
+ .contains(new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstantTime)))
+ }}
+ }
+
+ test("Test Call run_rollback_inflight_tableservice Procedure for
compaction") {
+ withTempDir {
+ tmp => {
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = 'mor',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(ts)
+ | location '$basePath'
+ """.stripMargin)
+ spark.sql("set hoodie.parquet.max.file.size = 10000")
+ // disable automatic inline compaction
+ spark.sql("set hoodie.compact.inline=false")
+ spark.sql("set hoodie.compact.schedule.inline=false")
+
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
+ spark.sql(s"update $tableName set price = 11 where id = 1")
+
+ spark.sql(s"call run_compaction(op => 'schedule', table =>
'$tableName')")
+ spark.sql(s"call run_compaction(op => 'run', table => '$tableName')")
+
+ // delete compaction commit file
+ val metaClient = HoodieTableMetaClient.builder
+
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration)).setBasePath(basePath).build
+ val compactionInstant: HoodieInstant =
metaClient.getActiveTimeline.getReverseOrderedInstants.findFirst().get()
+
+
metaClient.getActiveTimeline.deleteInstantFileIfExists(compactionInstant)
+ val compactionInstantTime = compactionInstant.getTimestamp
+
+ spark.sql(s"call run_rollback_inflight_tableservice(table =>
'$tableName', pending_instant => '$compactionInstantTime',
delete_request_instant_file => true)")
+ Assertions.assertTrue(!metaClient.reloadActiveTimeline().getInstants
+ .contains(new HoodieInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.COMPACTION_ACTION, compactionInstantTime)))
+ Assertions.assertTrue(!metaClient.reloadActiveTimeline().getInstants
+ .contains(new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, compactionInstantTime)))
+ }
+ }
+ }
+
+
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowFileStatusProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowFileStatusProcedure.scala
new file mode 100644
index 00000000000..27196082c34
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowFileStatusProcedure.scala
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.{HoodieCLIUtils, HoodieDataSourceHelpers}
+import org.apache.spark.sql.hudi.command.procedures.{FileStatus, TimelineType}
+
+import scala.collection.JavaConverters._
+
+class TestShowFileStatusProcedure extends HoodieSparkProcedureTestBase {
+
+ private val DEFAULT_VALUE = ""
+
+ test("Test Call show_file_status Procedure By COW / MOR Partitioned Table") {
+ withTempDir { tmp =>
+ Seq("mor", "cow").foreach { tableType =>
+
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ // specify clean conf & archive conf & compaction conf
+ spark.sql("set hoodie.clean.commits.retained = 2")
+ spark.sql("set hoodie.keep.min.commits = 3")
+ spark.sql("set hoodie.keep.max.commits = 4")
+ spark.sql("set hoodie.compact.inline=false")
+ spark.sql("set hoodie.compact.schedule.inline=false")
+
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | partition long
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(partition)
+ | location '$basePath'
+ """.stripMargin)
+
+ val partition: String = "partition=1000"
+ var before: List[String] = null
+ var after: List[String] = null
+ var cleanedDataFile: Option[String] = Option.empty
+
+ val fs = new
Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
+ val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath,
Map.empty, Option(tableName))
+ val metaClient: HoodieTableMetaClient =
client.getInternalSchemaAndMetaClient.getRight
+
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+ val firstCleanedDataFile = getAllDataFile(fs, basePath,
Option.apply(partition)).toStream.filter(f => !f.startsWith(".")).head
+ cleanedDataFile = Option.apply(firstCleanedDataFile)
+ checkAnswer(s"call show_file_status(table => '$tableName', partition
=> '$partition', file => '${cleanedDataFile.get}')")(
+ Seq(FileStatus.EXIST.toString, DEFAULT_VALUE, DEFAULT_VALUE,
TimelineType.ACTIVE.toString, new Path(basePath, new Path(partition,
cleanedDataFile.get)).toString))
+
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001, 1000)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+
+ checkAnswer(s"call show_file_status(table => '$tableName', partition
=> '$partition', file => '$firstCleanedDataFile')")(
+ Seq(FileStatus.DELETED.toString, HoodieTimeline.CLEAN_ACTION,
metaClient.reloadActiveTimeline().getCleanerTimeline.lastInstant().get.getTimestamp,
TimelineType.ACTIVE.toString, DEFAULT_VALUE)
+ )
+
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1001)")
+ // clustering / compaction
+ val newInstant = client.createNewInstantTime()
+ if (tableType.equals("cow")) {
+ client.scheduleClusteringAtInstant(newInstant, HOption.empty())
+ client.cluster(newInstant)
+ } else {
+ client.scheduleCompactionAtInstant(newInstant, HOption.empty())
+ client.compact(newInstant)
+ }
+
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1002, 1000)")
+
+ // savepoint
+ val savepointTime: String = getSpecifyActionLatestTime(fs, basePath,
newInstant, 1).get
+ spark.sql(s"call create_savepoint(table => '$tableName', commit_time
=> '$savepointTime')")
+ spark.sql(s"insert into $tableName values(2, 'a2', 11, 1002, 1000)")
+ spark.sql(s"insert into $tableName values(6, 'a6', 10, 1000, 1001)")
+
+ // restore
+ before = getAllDataFile(fs, basePath, Option.apply(partition))
+ spark.sql(s"call rollback_to_savepoint(table => '$tableName',
instant_time => '$savepointTime')")
+ after = getAllDataFile(fs, basePath, Option.apply(partition))
+ cleanedDataFile = getAnyOneDataFile(before, after)
+
+ checkAnswer(s"call show_file_status(table => '$tableName', partition
=> '$partition', file => '${cleanedDataFile.get}')")(
+ Seq(FileStatus.DELETED.toString, HoodieTimeline.RESTORE_ACTION,
metaClient.reloadActiveTimeline().getRestoreTimeline.lastInstant().get.getTimestamp,
TimelineType.ACTIVE.toString, DEFAULT_VALUE))
+
+ val latestTime = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+ spark.sql(s"insert into $tableName values(7, 'a7', 15, 1000, 1000)")
+ spark.sql(s"insert into $tableName values(8, 'a8', 12, 1000, 1000)")
+
+ before = getAllDataFile(fs, basePath, Option.apply(partition))
+ // rollback
+ val rollbackTime = getSpecifyActionLatestTime(fs, basePath,
latestTime, 5).get
+ spark.sql(s"call rollback_to_instant(table => '$tableName',
instant_time => '$rollbackTime')")
+ spark.sql(s"insert into $tableName values(9, 'a9', 16, 1000, 1000)")
+ after = getAllDataFile(fs, basePath, Option.apply(partition))
+ cleanedDataFile = getAnyOneDataFile(before, after)
+ checkAnswer(s"call show_file_status(table => '$tableName', partition
=> '$partition', file => '${cleanedDataFile.get}')")(
+ Seq(FileStatus.DELETED.toString, HoodieTimeline.ROLLBACK_ACTION,
metaClient.reloadActiveTimeline().getRollbackTimeline.lastInstant().get.getTimestamp,
TimelineType.ACTIVE.toString, DEFAULT_VALUE))
+
+ // unknown
+ checkAnswer(s"call show_file_status(table => '$tableName', partition
=> '$partition', file => 'unknown')")(
+ Seq(FileStatus.UNKNOWN.toString, DEFAULT_VALUE, DEFAULT_VALUE,
DEFAULT_VALUE, DEFAULT_VALUE))
+ }
+ }
+ }
+
+ test("Test Call show_file_status Procedure By COW / MOR Non_Partitioned
Table") {
+ withTempDir { tmp =>
+ Seq("mor", "cow").foreach { tableType =>
+
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ // specify clean conf & archive conf & compaction conf
+ spark.sql("set hoodie.clean.commits.retained = 2")
+ spark.sql("set hoodie.keep.min.commits = 3")
+ spark.sql("set hoodie.keep.max.commits = 4")
+ spark.sql("set hoodie.compact.inline=false")
+ spark.sql("set hoodie.compact.schedule.inline=false")
+
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | partition long
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | )
+ | location '$basePath'
+ """.stripMargin)
+
+ val partition: Option[String] = Option.empty
+ var before: List[String] = null
+ var after: List[String] = null
+ var cleanedDataFile: Option[String] = Option.empty
+
+ val fs = new
Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
+ val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath,
Map.empty, Option(tableName))
+ val metaClient: HoodieTableMetaClient =
client.getInternalSchemaAndMetaClient.getRight
+
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+ val firstCleanedDataFile = getAllDataFile(fs, basePath,
partition).toStream.filter(f => !f.startsWith(".")).head
+ cleanedDataFile = Option.apply(firstCleanedDataFile)
+ checkAnswer(s"call show_file_status(table => '$tableName', file =>
'${cleanedDataFile.get}')")(
+ Seq(FileStatus.EXIST.toString, DEFAULT_VALUE, DEFAULT_VALUE,
TimelineType.ACTIVE.toString, new Path(basePath, new
Path(cleanedDataFile.get)).toString))
+
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001, 1000)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+
+ checkAnswer(s"call show_file_status(table => '$tableName', file =>
'$firstCleanedDataFile')")(
+ Seq(FileStatus.DELETED.toString, HoodieTimeline.CLEAN_ACTION,
metaClient.reloadActiveTimeline().getCleanerTimeline.lastInstant().get.getTimestamp,
TimelineType.ACTIVE.toString, DEFAULT_VALUE)
+ )
+
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+ // clustering / compaction
+ val newInstant = client.createNewInstantTime()
+ if (tableType.equals("cow")) {
+ client.scheduleClusteringAtInstant(newInstant, HOption.empty())
+ client.cluster(newInstant)
+ } else {
+ client.scheduleCompactionAtInstant(newInstant, HOption.empty())
+ client.compact(newInstant)
+ }
+
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1002, 1000)")
+
+ // savepoint
+ val savepointTime: String = getSpecifyActionLatestTime(fs, basePath,
newInstant, 1).get
+ spark.sql(s"call create_savepoint(table => '$tableName', commit_time
=> '$savepointTime')")
+ spark.sql(s"insert into $tableName values(2, 'a2', 11, 1002, 1000)")
+ spark.sql(s"insert into $tableName values(6, 'a6', 10, 1000, 1000)")
+
+ // restore
+ before = getAllDataFile(fs, basePath, partition)
+ spark.sql(s"call rollback_to_savepoint(table => '$tableName',
instant_time => '$savepointTime')")
+ after = getAllDataFile(fs, basePath, partition)
+ cleanedDataFile = getAnyOneDataFile(before, after)
+ checkAnswer(s"call show_file_status(table => '$tableName', file =>
'${cleanedDataFile.get}')")(
+ Seq(FileStatus.DELETED.toString, HoodieTimeline.RESTORE_ACTION,
+
metaClient.reloadActiveTimeline().getRestoreTimeline.lastInstant().get.getTimestamp,
TimelineType.ACTIVE.toString, DEFAULT_VALUE))
+
+ val latestTime = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+ spark.sql(s"insert into $tableName values(7, 'a7', 15, 1000, 1000)")
+ spark.sql(s"insert into $tableName values(8, 'a8', 12, 1000, 1000)")
+
+ before = getAllDataFile(fs, basePath, partition)
+ // rollback
+ val rollbackTime = getSpecifyActionLatestTime(fs, basePath,
latestTime, 5).get
+ spark.sql(s"call rollback_to_instant(table => '$tableName',
instant_time => '$rollbackTime')")
+ spark.sql(s"insert into $tableName values(9, 'a9', 16, 1000, 1000)")
+ after = getAllDataFile(fs, basePath, partition)
+ cleanedDataFile = getAnyOneDataFile(before, after)
+ checkAnswer(s"call show_file_status(table => '$tableName', file =>
'${cleanedDataFile.get}')")(
+ Seq(FileStatus.DELETED.toString, HoodieTimeline.ROLLBACK_ACTION,
metaClient.reloadActiveTimeline().getRollbackTimeline.lastInstant().get.getTimestamp,
TimelineType.ACTIVE.toString, DEFAULT_VALUE))
+
+ // unknown
+ checkAnswer(s"call show_file_status(table => '$tableName', file =>
'unknown')")(
+ Seq(FileStatus.UNKNOWN.toString, DEFAULT_VALUE, DEFAULT_VALUE,
DEFAULT_VALUE, DEFAULT_VALUE))
+ }
+ }
+ }
+
+
+ test("Test Call show_file_status Procedure By COW / MOR For Archive") {
+ withTempDir { tmp =>
+ Seq("mor", "cow").foreach { tableType =>
+
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ // specify clean conf & archive conf & compaction conf
+ spark.sql("set hoodie.clean.commits.retained = 2")
+ spark.sql("set hoodie.keep.min.commits = 3")
+ spark.sql("set hoodie.keep.max.commits = 4")
+
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | partition long
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(partition)
+ | location '$basePath'
+ """.stripMargin)
+
+ val partition: String = "partition=1000"
+ var cleanedDataFile: String = null
+
+ val fs = new
Path(basePath).getFileSystem(spark.sessionState.newHadoopConf())
+
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+ cleanedDataFile = getAllDataFile(fs, basePath,
Option.apply(partition)).toStream.filter(f => !f.startsWith(".")).head
+ val firstCleanedDataFile = cleanedDataFile
+
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001, 1000)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+
+ val firstCleanedDataTime =
+ spark.sql(s"call show_file_status(table => '$tableName', partition
=> '$partition', file => '$firstCleanedDataFile')")
+ .collect().toList.head.get(2)
+
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1000)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1002, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 11, 1002, 1000)")
+ spark.sql(s"insert into $tableName values(6, 'a6', 10, 1000, 1000)")
+ spark.sql(s"insert into $tableName values(7, 'a7', 15, 1000, 1000)")
+ spark.sql(s"insert into $tableName values(8, 'a8', 12, 1000, 1000)")
+ spark.sql(s"insert into $tableName values(9, 'a9', 16, 1000, 1000)")
+
+ checkAnswer(s"call show_file_status(table => '$tableName', partition
=> '$partition', file => '$firstCleanedDataFile')")(
+ Seq(FileStatus.DELETED.toString, HoodieTimeline.CLEAN_ACTION,
firstCleanedDataTime, TimelineType.ARCHIVED.toString, DEFAULT_VALUE))
+ }
+ }
+ }
+
+ private def getAnyOneDataFile(before: List[String], after: List[String]):
Option[String] = {
+ val sortedBefore = before.sorted
+ val sortedAfter = after.sorted
+ sortedBefore.diff(sortedAfter).headOption
+ }
+
+ private def getAllDataFile(fs: FileSystem, basePath: String, partition:
Option[String]): List[String] = {
+ if (partition.isDefined) {
+ fs.listStatus(new Path(basePath, partition.get)).filter(f => f.isFile &&
!f.getPath.getName.startsWith(".hoodie_partition_metadata")).map(f =>
f.getPath.getName).toList
+ } else {
+ fs.listStatus(new Path(basePath)).filter(f => f.isFile &&
!f.getPath.getName.startsWith(".hoodie_partition_metadata")).map(f =>
f.getPath.getName).toList
+ }
+ }
+
+
+ private def getSpecifyActionLatestTime(fs: FileSystem, basePath: String,
specifyInstant: String, after: Int): Option[String] = {
+ val commitsSince = HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
specifyInstant).asScala.toList
+ if (commitsSince.length > after) {
+ Some(commitsSince(after))
+ } else {
+ commitsSince.lastOption
+ }
+ }
+}
+