This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d4f0326b4b [HUDI-4275] Refactor rollback inflight instant for 
clustering/compaction to reuse some code (#5894)
d4f0326b4b is described below

commit d4f0326b4bbbabefc5c75617b2b5d6b8bf55fe11
Author: huberylee <[email protected]>
AuthorDate: Mon Jun 20 14:29:21 2022 +0800

    [HUDI-4275] Refactor rollback inflight instant for clustering/compaction to 
reuse some code (#5894)
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 11 +-----
 .../apache/hudi/client/CompactionAdminClient.java  |  7 ++--
 .../java/org/apache/hudi/table/HoodieTable.java    | 41 ++++++++++++++++++----
 .../apache/hudi/client/SparkRDDWriteClient.java    |  2 +-
 .../table/timeline/HoodieActiveTimeline.java       | 36 +++++--------------
 .../table/timeline/TestHoodieActiveTimeline.java   |  2 +-
 .../sink/clustering/HoodieFlinkClusteringJob.java  |  3 +-
 7 files changed, 51 insertions(+), 51 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 455cb644c7..961965353b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -999,7 +999,6 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
     return scheduleTableService(instantTime, extraMetadata, 
TableServiceType.COMPACT).isPresent();
   }
 
-
   /**
    * Schedules INDEX action.
    *
@@ -1094,7 +1093,7 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
     return getPendingRollbackInfo(metaClient, commitToRollback, true);
   }
 
-  protected Option<HoodiePendingRollbackInfo> 
getPendingRollbackInfo(HoodieTableMetaClient metaClient, String 
commitToRollback, boolean ignoreCompactionAndClusteringInstants) {
+  public Option<HoodiePendingRollbackInfo> 
getPendingRollbackInfo(HoodieTableMetaClient metaClient, String 
commitToRollback, boolean ignoreCompactionAndClusteringInstants) {
     return getPendingRollbackInfos(metaClient, 
ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback, 
Option.empty());
   }
 
@@ -1375,14 +1374,6 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
     return scheduleClustering(extraMetadata);
   }
 
-  public void rollbackInflightClustering(HoodieInstant inflightInstant, 
HoodieTable table) {
-    Option<HoodiePendingRollbackInfo> pendingRollbackInstantInfo = 
getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), 
false);
-    String commitTime = pendingRollbackInstantInfo.map(entry -> 
entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
-    table.scheduleRollback(context, commitTime, inflightInstant, false, 
config.shouldRollbackUsingMarkers());
-    table.rollback(context, commitTime, inflightInstant, false, false);
-    
table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant);
-  }
-
   /**
    * Finalize Write operation.
    *
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
index d006b52b33..a394c6d905 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.client;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -44,6 +42,9 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.action.compact.OperationResult;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -172,7 +173,7 @@ public class CompactionAdminClient extends BaseHoodieClient 
{
       Path inflightPath = new Path(metaClient.getMetaPath(), 
inflight.getFileName());
       if (metaClient.getFs().exists(inflightPath)) {
         // revert if in inflight state
-        
metaClient.getActiveTimeline().revertCompactionInflightToRequested(inflight);
+        
metaClient.getActiveTimeline().revertInstantFromInflightToRequested(inflight);
       }
       // Overwrite compaction plan with updated info
       metaClient.getActiveTimeline().saveToCompactionRequested(
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 56526d23db..b6541ac66b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -18,11 +18,6 @@
 
 package org.apache.hudi.table;
 
-import org.apache.avro.Schema;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
@@ -65,6 +60,7 @@ import 
org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
 import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
 import org.apache.hudi.common.util.Functions;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -82,6 +78,12 @@ import org.apache.hudi.table.marker.WriteMarkers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.table.storage.HoodieLayoutFactory;
 import org.apache.hudi.table.storage.HoodieStorageLayout;
+
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -545,12 +547,37 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload, I, K, O> implem
    *
    * @param inflightInstant Inflight Compaction Instant
    */
-  public void rollbackInflightCompaction(HoodieInstant inflightInstant, 
Function<String, Option<HoodiePendingRollbackInfo>> 
getPendingRollbackInstantFunc) {
+  public void rollbackInflightCompaction(HoodieInstant inflightInstant,
+                                         Function<String, 
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
+    
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
+    rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
+  }
+
+  /**
+   * Rollback inflight clustering instant to requested clustering instant
+   *
+   * @param inflightInstant               Inflight clustering instant
+   * @param getPendingRollbackInstantFunc Function to get rollback instant
+   */
+  public void rollbackInflightClustering(HoodieInstant inflightInstant,
+                                         Function<String, 
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
+    
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
+    rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
+  }
+
+  /**
+   * Rollback inflight instant to requested instant
+   *
+   * @param inflightInstant               Inflight instant
+   * @param getPendingRollbackInstantFunc Function to get rollback instant
+   */
+  private void rollbackInflightInstant(HoodieInstant inflightInstant,
+                                       Function<String, 
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
     final String commitTime = 
getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
         -> 
entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
     scheduleRollback(context, commitTime, inflightInstant, false, 
config.shouldRollbackUsingMarkers());
     rollback(context, commitTime, inflightInstant, false, false);
-    getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
+    getActiveTimeline().revertInstantFromInflightToRequested(inflightInstant);
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index fe6ea975e3..bdf478a8f6 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -355,7 +355,7 @@ public class SparkRDDWriteClient<T extends 
HoodieRecordPayload> extends
     HoodieTimeline pendingClusteringTimeline = 
table.getActiveTimeline().filterPendingReplaceTimeline();
     HoodieInstant inflightInstant = 
HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
     if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
-      rollbackInflightClustering(inflightInstant, table);
+      table.rollbackInflightClustering(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
       table.getMetaClient().reloadActiveTimeline();
     }
     clusteringTimer = metrics.getClusteringCtx();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index c069e41ade..6e7f6a2430 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -18,10 +18,6 @@
 
 package org.apache.hudi.common.table.timeline;
 
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -32,6 +28,11 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -347,16 +348,15 @@ public class HoodieActiveTimeline extends 
HoodieDefaultTimeline {
   }
 
   /**
-   * Revert compaction State from inflight to requested.
+   * Revert instant state from inflight to requested.
    *
    * @param inflightInstant Inflight Instant
    * @return requested instant
    */
-  public HoodieInstant revertCompactionInflightToRequested(HoodieInstant 
inflightInstant) {
-    
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
+  public HoodieInstant revertInstantFromInflightToRequested(HoodieInstant 
inflightInstant) {
     ValidationUtils.checkArgument(inflightInstant.isInflight());
     HoodieInstant requestedInstant =
-        new HoodieInstant(State.REQUESTED, COMPACTION_ACTION, 
inflightInstant.getTimestamp());
+        new HoodieInstant(State.REQUESTED, inflightInstant.getAction(), 
inflightInstant.getTimestamp());
     if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
       // Pass empty data since it is read from the corresponding 
.aux/.compaction instant file
       transitionState(inflightInstant, requestedInstant, Option.empty());
@@ -514,26 +514,6 @@ public class HoodieActiveTimeline extends 
HoodieDefaultTimeline {
     return commitInstant;
   }
 
-  /**
-   * Revert replace requested State from inflight to requested.
-   *
-   * @param inflightInstant Inflight Instant
-   * @return requested instant
-   */
-  public HoodieInstant revertReplaceCommitInflightToRequested(HoodieInstant 
inflightInstant) {
-    
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
-    ValidationUtils.checkArgument(inflightInstant.isInflight());
-    HoodieInstant requestedInstant =
-        new HoodieInstant(State.REQUESTED, REPLACE_COMMIT_ACTION, 
inflightInstant.getTimestamp());
-    if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
-      // Pass empty data since it is read from the corresponding 
.aux/.compaction instant file
-      transitionState(inflightInstant, requestedInstant, Option.empty());
-    } else {
-      deleteInflight(inflightInstant);
-    }
-    return requestedInstant;
-  }
-
   private void transitionState(HoodieInstant fromInstant, HoodieInstant 
toInstant, Option<byte[]> data) {
     transitionState(fromInstant, toInstant, data, false);
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
index 9ff17cdbd2..55806bf1e0 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
@@ -336,7 +336,7 @@ public class TestHoodieActiveTimeline extends 
HoodieCommonTestHarness {
     timeline = timeline.reload();
     assertFalse(timeline.containsInstant(compaction));
     assertTrue(timeline.containsInstant(inflight));
-    compaction = timeline.revertCompactionInflightToRequested(inflight);
+    compaction = timeline.revertInstantFromInflightToRequested(inflight);
     timeline = timeline.reload();
     assertTrue(timeline.containsInstant(compaction));
     assertFalse(timeline.containsInstant(inflight));
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index f7c361533a..b8ba8e4389 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -114,7 +114,8 @@ public class HoodieFlinkClusteringJob {
     HoodieInstant inflightInstant = 
HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant.getTimestamp());
     if (timeline.containsInstant(inflightInstant)) {
       LOG.info("Rollback inflight clustering instant: [" + clusteringInstant + 
"]");
-      writeClient.rollbackInflightClustering(inflightInstant, table);
+      table.rollbackInflightClustering(inflightInstant,
+          commitToRollback -> 
writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, 
false));
       table.getMetaClient().reloadActiveTimeline();
     }
 

Reply via email to