This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 380518e [HUDI-2038] Support rollback inflight compaction instances
for CompactionPlanOperator (#3105)
380518e is described below
commit 380518e232b883bb12579b3e98283659b464285a
Author: yuzhaojing <[email protected]>
AuthorDate: Wed Jun 23 20:58:52 2021 +0800
[HUDI-2038] Support rollback inflight compaction instances for
CompactionPlanOperator (#3105)
Co-authored-by: 喻兆靖 <[email protected]>
---
.../hudi/sink/compact/CompactionPlanOperator.java | 11 ++++++-----
.../apache/hudi/sink/compact/HoodieFlinkCompactor.java | 16 +++-------------
.../main/java/org/apache/hudi/util/CompactionUtil.java | 17 +++++++++++++++++
3 files changed, 26 insertions(+), 18 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index ad3dd57..e271c84 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
@@ -80,18 +81,18 @@ public class CompactionPlanOperator extends
AbstractStreamOperator<CompactionPla
}
@Override
- public void notifyCheckpointComplete(long checkpointId) throws IOException {
+ public void notifyCheckpointComplete(long checkpointId) {
try {
- scheduleCompaction(checkpointId);
+ HoodieFlinkTable hoodieTable = writeClient.getHoodieTable();
+ CompactionUtil.rollbackCompaction(hoodieTable, conf);
+ scheduleCompaction(hoodieTable, checkpointId);
} catch (Throwable throwable) {
// make it fail safe
LOG.error("Error while scheduling compaction at instant: " +
compactionInstantTime, throwable);
}
}
- private void scheduleCompaction(long checkpointId) throws IOException {
- HoodieFlinkTable<?> table = writeClient.getHoodieTable();
-
+ private void scheduleCompaction(HoodieFlinkTable<?> table, long
checkpointId) throws IOException {
// the last instant takes the highest priority.
Option<HoodieInstant> lastRequested =
table.getActiveTimeline().filterPendingCompactionTimeline()
.filter(instant -> instant.getState() ==
HoodieInstant.State.REQUESTED).lastInstant();
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index f1102e8..f95f3e3 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -21,7 +21,6 @@ package org.apache.hudi.sink.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
@@ -46,7 +45,6 @@ public class HoodieFlinkCompactor {
protected static final Logger LOG =
LoggerFactory.getLogger(HoodieFlinkCompactor.class);
- @SuppressWarnings("unchecked, rawtypes")
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -73,17 +71,7 @@ public class HoodieFlinkCompactor {
// rolls back inflight compaction first
// condition: the schedule compaction is in INFLIGHT state for max delta
seconds.
- String curInstantTime = HoodieActiveTimeline.createNewInstantTime();
- int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS);
- HoodieTimeline inflightCompactionTimeline = metaClient.getActiveTimeline()
- .filterPendingCompactionTimeline()
- .filter(instant ->
- instant.getState() == HoodieInstant.State.INFLIGHT
- && StreamerUtil.instantTimeDiff(curInstantTime,
instant.getTimestamp()) >= deltaSeconds);
- inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
- writeClient.rollbackInflightCompaction(inflightInstant, table);
- table.getMetaClient().reloadActiveTimeline();
- });
+ CompactionUtil.rollbackCompaction(table, conf);
// judge whether have operation
// to compute the compaction instant time and do compaction.
@@ -94,6 +82,8 @@ public class HoodieFlinkCompactor {
LOG.info("No compaction plan for this job ");
return;
}
+
+ table.getMetaClient().reloadActiveTimeline();
// generate compaction plan
// should support configurable commit metadata
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index 46d7272..3d98ce9 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException;
@@ -29,6 +30,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.table.HoodieFlinkTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,4 +98,19 @@ public class CompactionUtil {
throw new HoodieIOException("Could not remove requested commit " +
commitFilePath, e);
}
}
+
+ public static void rollbackCompaction(HoodieFlinkTable<?> table,
Configuration conf) {
+ String curInstantTime = HoodieActiveTimeline.createNewInstantTime();
+ int deltaSeconds = conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS);
+ HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline()
+ .filterPendingCompactionTimeline()
+ .filter(instant ->
+ instant.getState() == HoodieInstant.State.INFLIGHT
+ && StreamerUtil.instantTimeDiff(curInstantTime,
instant.getTimestamp()) >= deltaSeconds);
+ inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
+ LOG.info("Rollback the pending compaction instant: " + inflightInstant);
+ table.rollback(table.getContext(),
HoodieActiveTimeline.createNewInstantTime(), inflightInstant, true);
+ table.getMetaClient().reloadActiveTimeline();
+ });
+ }
}