This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 380518e  [HUDI-2038] Support rollback inflight compaction instances 
for CompactionPlanOperator (#3105)
380518e is described below

commit 380518e232b883bb12579b3e98283659b464285a
Author: yuzhaojing <[email protected]>
AuthorDate: Wed Jun 23 20:58:52 2021 +0800

    [HUDI-2038] Support rollback inflight compaction instances for 
CompactionPlanOperator (#3105)
    
    Co-authored-by: 喻兆靖 <[email protected]>
---
 .../hudi/sink/compact/CompactionPlanOperator.java       | 11 ++++++-----
 .../apache/hudi/sink/compact/HoodieFlinkCompactor.java  | 16 +++-------------
 .../main/java/org/apache/hudi/util/CompactionUtil.java  | 17 +++++++++++++++++
 3 files changed, 26 insertions(+), 18 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index ad3dd57..e271c84 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
@@ -80,18 +81,18 @@ public class CompactionPlanOperator extends 
AbstractStreamOperator<CompactionPla
   }
 
   @Override
-  public void notifyCheckpointComplete(long checkpointId) throws IOException {
+  public void notifyCheckpointComplete(long checkpointId) {
     try {
-      scheduleCompaction(checkpointId);
+      HoodieFlinkTable hoodieTable = writeClient.getHoodieTable();
+      CompactionUtil.rollbackCompaction(hoodieTable, conf);
+      scheduleCompaction(hoodieTable, checkpointId);
     } catch (Throwable throwable) {
       // make it fail safe
       LOG.error("Error while scheduling compaction at instant: " + 
compactionInstantTime, throwable);
     }
   }
 
-  private void scheduleCompaction(long checkpointId) throws IOException {
-    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
-
+  private void scheduleCompaction(HoodieFlinkTable<?> table, long 
checkpointId) throws IOException {
     // the last instant takes the highest priority.
     Option<HoodieInstant> lastRequested = 
table.getActiveTimeline().filterPendingCompactionTimeline()
         .filter(instant -> instant.getState() == 
HoodieInstant.State.REQUESTED).lastInstant();
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index f1102e8..f95f3e3 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -21,7 +21,6 @@ package org.apache.hudi.sink.compact;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CompactionUtils;
@@ -46,7 +45,6 @@ public class HoodieFlinkCompactor {
 
   protected static final Logger LOG = 
LoggerFactory.getLogger(HoodieFlinkCompactor.class);
 
-  @SuppressWarnings("unchecked, rawtypes")
   public static void main(String[] args) throws Exception {
     StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
@@ -73,17 +71,7 @@ public class HoodieFlinkCompactor {
 
     // rolls back inflight compaction first
     // condition: the schedule compaction is in INFLIGHT state for max delta 
seconds.
-    String curInstantTime = HoodieActiveTimeline.createNewInstantTime();
-    int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS);
-    HoodieTimeline inflightCompactionTimeline = metaClient.getActiveTimeline()
-        .filterPendingCompactionTimeline()
-        .filter(instant ->
-            instant.getState() == HoodieInstant.State.INFLIGHT
-                && StreamerUtil.instantTimeDiff(curInstantTime, 
instant.getTimestamp()) >= deltaSeconds);
-    inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
-      writeClient.rollbackInflightCompaction(inflightInstant, table);
-      table.getMetaClient().reloadActiveTimeline();
-    });
+    CompactionUtil.rollbackCompaction(table, conf);
 
     // judge whether have operation
     // to compute the compaction instant time and do compaction.
@@ -94,6 +82,8 @@ public class HoodieFlinkCompactor {
       LOG.info("No compaction plan for this job ");
       return;
     }
+
+    table.getMetaClient().reloadActiveTimeline();
     // generate compaction plan
     // should support configurable commit metadata
     HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java 
b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index 46d7272..3d98ce9 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieIOException;
@@ -29,6 +30,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.avro.Schema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.table.HoodieFlinkTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,4 +98,19 @@ public class CompactionUtil {
       throw new HoodieIOException("Could not remove requested commit " + 
commitFilePath, e);
     }
   }
+
+  public static void rollbackCompaction(HoodieFlinkTable<?> table, 
Configuration conf) {
+    String curInstantTime = HoodieActiveTimeline.createNewInstantTime();
+    int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS);
+    HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline()
+        .filterPendingCompactionTimeline()
+        .filter(instant ->
+            instant.getState() == HoodieInstant.State.INFLIGHT
+                && StreamerUtil.instantTimeDiff(curInstantTime, 
instant.getTimestamp()) >= deltaSeconds);
+    inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
+      LOG.info("Rollback the pending compaction instant: " + inflightInstant);
+      table.rollback(table.getContext(), 
HoodieActiveTimeline.createNewInstantTime(), inflightInstant, true);
+      table.getMetaClient().reloadActiveTimeline();
+    });
+  }
 }

Reply via email to