This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 0178b12b5b5 [HUDI-6304] Handle cases when inflight compaction files are removed and job failed leaving pending rollback files (#12644) 0178b12b5b5 is described below commit 0178b12b5b55b835af5119369298de08794f3b5c Author: fhan <aaron.han.1...@gmail.com> AuthorDate: Sat Jan 18 10:42:10 2025 +0800 [HUDI-6304] Handle cases when inflight compaction files are removed and job failed leaving pending rollback files (#12644) --- .../java/org/apache/hudi/sink/compact/CompactionPlanOperator.java | 7 ++++++- .../src/main/java/org/apache/hudi/util/CompactionUtil.java | 4 ++-- .../src/test/java/org/apache/hudi/utils/TestCompactionUtil.java | 2 +- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java index 75f5779cb9e..17732f3aca7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java @@ -19,6 +19,7 @@ package org.apache.hudi.sink.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -29,6 +30,7 @@ import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.FlinkTables; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; @@ -70,6 +72,8 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla private transient FlinkCompactionMetrics compactionMetrics; + private transient HoodieFlinkWriteClient writeClient; + public CompactionPlanOperator(Configuration conf) { this.conf = conf; } @@ -79,10 +83,11 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla super.open(); registerMetrics(); this.table = FlinkTables.createTable(conf, getRuntimeContext()); + this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext()); // when starting up, rolls back all the inflight compaction instants if there exists, // these instants are in priority for scheduling task because the compaction instants are // scheduled from earliest(FIFO sequence). - CompactionUtil.rollbackCompaction(table); + CompactionUtil.rollbackCompaction(table, this.writeClient); } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index 657bbdbea60..f0059ff34af 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -149,14 +149,14 @@ public class CompactionUtil { * * @param table The hoodie table */ - public static void rollbackCompaction(HoodieFlinkTable<?> table) { + public static void rollbackCompaction(HoodieFlinkTable<?> table, HoodieFlinkWriteClient writeClient) { HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline() .filterPendingCompactionTimeline() .filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT); inflightCompactionTimeline.getInstants().forEach(inflightInstant -> { LOG.info("Rollback the inflight compaction instant: " + inflightInstant + " for failover"); - table.rollbackInflightCompaction(inflightInstant); + table.rollbackInflightCompaction(inflightInstant, commitToRollback -> writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false)); table.getMetaClient().reloadActiveTimeline(); }); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java index 111680bdcdb..93e9f7e9ab6 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java @@ -98,7 +98,7 @@ public class TestCompactionUtil { .filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT) .getInstants(); assertThat("all the instants should be in pending state", instants.size(), is(3)); - CompactionUtil.rollbackCompaction(table); + CompactionUtil.rollbackCompaction(table, FlinkWriteClients.createWriteClient(conf)); boolean allRolledBack = metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstantsAsStream() .allMatch(instant -> instant.getState() == HoodieInstant.State.REQUESTED); assertTrue(allRolledBack, "all the instants should be rolled back");