This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d4f0326b4b [HUDI-4275] Refactor rollback inflight instant for
clustering/compaction to reuse some code (#5894)
d4f0326b4b is described below
commit d4f0326b4bbbabefc5c75617b2b5d6b8bf55fe11
Author: huberylee <[email protected]>
AuthorDate: Mon Jun 20 14:29:21 2022 +0800
[HUDI-4275] Refactor rollback inflight instant for clustering/compaction to
reuse some code (#5894)
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 11 +-----
.../apache/hudi/client/CompactionAdminClient.java | 7 ++--
.../java/org/apache/hudi/table/HoodieTable.java | 41 ++++++++++++++++++----
.../apache/hudi/client/SparkRDDWriteClient.java | 2 +-
.../table/timeline/HoodieActiveTimeline.java | 36 +++++--------------
.../table/timeline/TestHoodieActiveTimeline.java | 2 +-
.../sink/clustering/HoodieFlinkClusteringJob.java | 3 +-
7 files changed, 51 insertions(+), 51 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 455cb644c7..961965353b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -999,7 +999,6 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
return scheduleTableService(instantTime, extraMetadata,
TableServiceType.COMPACT).isPresent();
}
-
/**
* Schedules INDEX action.
*
@@ -1094,7 +1093,7 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
return getPendingRollbackInfo(metaClient, commitToRollback, true);
}
- protected Option<HoodiePendingRollbackInfo>
getPendingRollbackInfo(HoodieTableMetaClient metaClient, String
commitToRollback, boolean ignoreCompactionAndClusteringInstants) {
+ public Option<HoodiePendingRollbackInfo>
getPendingRollbackInfo(HoodieTableMetaClient metaClient, String
commitToRollback, boolean ignoreCompactionAndClusteringInstants) {
return getPendingRollbackInfos(metaClient,
ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback,
Option.empty());
}
@@ -1375,14 +1374,6 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
return scheduleClustering(extraMetadata);
}
- public void rollbackInflightClustering(HoodieInstant inflightInstant,
HoodieTable table) {
- Option<HoodiePendingRollbackInfo> pendingRollbackInstantInfo =
getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(),
false);
- String commitTime = pendingRollbackInstantInfo.map(entry ->
entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
- table.scheduleRollback(context, commitTime, inflightInstant, false,
config.shouldRollbackUsingMarkers());
- table.rollback(context, commitTime, inflightInstant, false, false);
-
table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant);
- }
-
/**
* Finalize Write operation.
*
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
index d006b52b33..a394c6d905 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
@@ -18,8 +18,6 @@
package org.apache.hudi.client;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -44,6 +42,9 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.action.compact.OperationResult;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -172,7 +173,7 @@ public class CompactionAdminClient extends BaseHoodieClient
{
Path inflightPath = new Path(metaClient.getMetaPath(),
inflight.getFileName());
if (metaClient.getFs().exists(inflightPath)) {
// revert if in inflight state
-
metaClient.getActiveTimeline().revertCompactionInflightToRequested(inflight);
+
metaClient.getActiveTimeline().revertInstantFromInflightToRequested(inflight);
}
// Overwrite compaction plan with updated info
metaClient.getActiveTimeline().saveToCompactionRequested(
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 56526d23db..b6541ac66b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -18,11 +18,6 @@
package org.apache.hudi.table;
-import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
@@ -65,6 +60,7 @@ import
org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
@@ -82,6 +78,12 @@ import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.storage.HoodieLayoutFactory;
import org.apache.hudi.table.storage.HoodieStorageLayout;
+
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -545,12 +547,37 @@ public abstract class HoodieTable<T extends
HoodieRecordPayload, I, K, O> implem
*
* @param inflightInstant Inflight Compaction Instant
*/
- public void rollbackInflightCompaction(HoodieInstant inflightInstant,
Function<String, Option<HoodiePendingRollbackInfo>>
getPendingRollbackInstantFunc) {
+ public void rollbackInflightCompaction(HoodieInstant inflightInstant,
+ Function<String,
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
+
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
+ rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
+ }
+
+ /**
+ * Rollback inflight clustering instant to requested clustering instant
+ *
+ * @param inflightInstant Inflight clustering instant
+ * @param getPendingRollbackInstantFunc Function to get rollback instant
+ */
+ public void rollbackInflightClustering(HoodieInstant inflightInstant,
+ Function<String,
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
+
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
+ rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
+ }
+
+ /**
+ * Rollback inflight instant to requested instant
+ *
+ * @param inflightInstant Inflight instant
+ * @param getPendingRollbackInstantFunc Function to get rollback instant
+ */
+ private void rollbackInflightInstant(HoodieInstant inflightInstant,
+ Function<String,
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
final String commitTime =
getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
->
entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
scheduleRollback(context, commitTime, inflightInstant, false,
config.shouldRollbackUsingMarkers());
rollback(context, commitTime, inflightInstant, false, false);
- getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
+ getActiveTimeline().revertInstantFromInflightToRequested(inflightInstant);
}
/**
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index fe6ea975e3..bdf478a8f6 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -355,7 +355,7 @@ public class SparkRDDWriteClient<T extends
HoodieRecordPayload> extends
HoodieTimeline pendingClusteringTimeline =
table.getActiveTimeline().filterPendingReplaceTimeline();
HoodieInstant inflightInstant =
HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
- rollbackInflightClustering(inflightInstant, table);
+ table.rollbackInflightClustering(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
table.getMetaClient().reloadActiveTimeline();
}
clusteringTimer = metrics.getClusteringCtx();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index c069e41ade..6e7f6a2430 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -18,10 +18,6 @@
package org.apache.hudi.common.table.timeline;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -32,6 +28,11 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -347,16 +348,15 @@ public class HoodieActiveTimeline extends
HoodieDefaultTimeline {
}
/**
- * Revert compaction State from inflight to requested.
+ * Revert instant state from inflight to requested.
*
* @param inflightInstant Inflight Instant
* @return requested instant
*/
- public HoodieInstant revertCompactionInflightToRequested(HoodieInstant
inflightInstant) {
-
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
+ public HoodieInstant revertInstantFromInflightToRequested(HoodieInstant
inflightInstant) {
ValidationUtils.checkArgument(inflightInstant.isInflight());
HoodieInstant requestedInstant =
- new HoodieInstant(State.REQUESTED, COMPACTION_ACTION,
inflightInstant.getTimestamp());
+ new HoodieInstant(State.REQUESTED, inflightInstant.getAction(),
inflightInstant.getTimestamp());
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
// Pass empty data since it is read from the corresponding
.aux/.compaction instant file
transitionState(inflightInstant, requestedInstant, Option.empty());
@@ -514,26 +514,6 @@ public class HoodieActiveTimeline extends
HoodieDefaultTimeline {
return commitInstant;
}
- /**
- * Revert replace requested State from inflight to requested.
- *
- * @param inflightInstant Inflight Instant
- * @return requested instant
- */
- public HoodieInstant revertReplaceCommitInflightToRequested(HoodieInstant
inflightInstant) {
-
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
- ValidationUtils.checkArgument(inflightInstant.isInflight());
- HoodieInstant requestedInstant =
- new HoodieInstant(State.REQUESTED, REPLACE_COMMIT_ACTION,
inflightInstant.getTimestamp());
- if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
- // Pass empty data since it is read from the corresponding
.aux/.compaction instant file
- transitionState(inflightInstant, requestedInstant, Option.empty());
- } else {
- deleteInflight(inflightInstant);
- }
- return requestedInstant;
- }
-
private void transitionState(HoodieInstant fromInstant, HoodieInstant
toInstant, Option<byte[]> data) {
transitionState(fromInstant, toInstant, data, false);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
index 9ff17cdbd2..55806bf1e0 100755
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
@@ -336,7 +336,7 @@ public class TestHoodieActiveTimeline extends
HoodieCommonTestHarness {
timeline = timeline.reload();
assertFalse(timeline.containsInstant(compaction));
assertTrue(timeline.containsInstant(inflight));
- compaction = timeline.revertCompactionInflightToRequested(inflight);
+ compaction = timeline.revertInstantFromInflightToRequested(inflight);
timeline = timeline.reload();
assertTrue(timeline.containsInstant(compaction));
assertFalse(timeline.containsInstant(inflight));
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index f7c361533a..b8ba8e4389 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -114,7 +114,8 @@ public class HoodieFlinkClusteringJob {
HoodieInstant inflightInstant =
HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp());
if (timeline.containsInstant(inflightInstant)) {
LOG.info("Rollback inflight clustering instant: [" + clusteringInstant +
"]");
- writeClient.rollbackInflightClustering(inflightInstant, table);
+ table.rollbackInflightClustering(inflightInstant,
+ commitToRollback ->
writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback,
false));
table.getMetaClient().reloadActiveTimeline();
}