This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0178b12b5b5 [HUDI-6304] Handle cases when inflight compaction files 
are removed and job failed leaving pending rollback files (#12644)
0178b12b5b5 is described below

commit 0178b12b5b55b835af5119369298de08794f3b5c
Author: fhan <aaron.han.1...@gmail.com>
AuthorDate: Sat Jan 18 10:42:10 2025 +0800

    [HUDI-6304] Handle cases when inflight compaction files are removed and job 
failed leaving pending rollback files (#12644)
---
 .../java/org/apache/hudi/sink/compact/CompactionPlanOperator.java  | 7 ++++++-
 .../src/main/java/org/apache/hudi/util/CompactionUtil.java         | 4 ++--
 .../src/test/java/org/apache/hudi/utils/TestCompactionUtil.java    | 2 +-
 3 files changed, 9 insertions(+), 4 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index 75f5779cb9e..17732f3aca7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.sink.compact;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.model.CompactionOperation;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -29,6 +30,7 @@ import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.FlinkTables;
+import org.apache.hudi.util.FlinkWriteClients;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
@@ -70,6 +72,8 @@ public class CompactionPlanOperator extends 
AbstractStreamOperator<CompactionPla
 
   private transient FlinkCompactionMetrics compactionMetrics;
 
+  private transient HoodieFlinkWriteClient writeClient;
+
   public CompactionPlanOperator(Configuration conf) {
     this.conf = conf;
   }
@@ -79,10 +83,11 @@ public class CompactionPlanOperator extends 
AbstractStreamOperator<CompactionPla
     super.open();
     registerMetrics();
     this.table = FlinkTables.createTable(conf, getRuntimeContext());
+    this.writeClient = FlinkWriteClients.createWriteClient(conf, 
getRuntimeContext());
     // when starting up, rolls back all the inflight compaction instants if 
there exists,
     // these instants are in priority for scheduling task because the 
compaction instants are
     // scheduled from earliest(FIFO sequence).
-    CompactionUtil.rollbackCompaction(table);
+    CompactionUtil.rollbackCompaction(table, this.writeClient);
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index 657bbdbea60..f0059ff34af 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -149,14 +149,14 @@ public class CompactionUtil {
    *
    * @param table The hoodie table
    */
-  public static void rollbackCompaction(HoodieFlinkTable<?> table) {
+  public static void rollbackCompaction(HoodieFlinkTable<?> table, 
HoodieFlinkWriteClient writeClient) {
     HoodieTimeline inflightCompactionTimeline = table.getActiveTimeline()
         .filterPendingCompactionTimeline()
         .filter(instant ->
             instant.getState() == HoodieInstant.State.INFLIGHT);
     inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
       LOG.info("Rollback the inflight compaction instant: " + inflightInstant 
+ " for failover");
-      table.rollbackInflightCompaction(inflightInstant);
+      table.rollbackInflightCompaction(inflightInstant, commitToRollback -> 
writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(),
 commitToRollback, false));
       table.getMetaClient().reloadActiveTimeline();
     });
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
index 111680bdcdb..93e9f7e9ab6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
@@ -98,7 +98,7 @@ public class TestCompactionUtil {
         .filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT)
         .getInstants();
     assertThat("all the instants should be in pending state", instants.size(), 
is(3));
-    CompactionUtil.rollbackCompaction(table);
+    CompactionUtil.rollbackCompaction(table, 
FlinkWriteClients.createWriteClient(conf));
     boolean allRolledBack = 
metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstantsAsStream()
         .allMatch(instant -> instant.getState() == 
HoodieInstant.State.REQUESTED);
     assertTrue(allRolledBack, "all the instants should be rolled back");

Reply via email to