This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d0c84705ca3 [HUDI-9570] Using coordinator state to persist and 
recommit write metadata events (#13543)
0d0c84705ca3 is described below

commit 0d0c84705ca31aa8f11d9cce97b83898e4ff233a
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Jul 17 09:09:08 2025 +0800

    [HUDI-9570] Using coordinator state to persist and recommit write metadata 
events (#13543)
---
 hudi-examples/hudi-examples-flink/pom.xml          |  6 +++
 .../hudi/sink/StreamWriteOperatorCoordinator.java  | 54 ++++++++++++----------
 .../org/apache/hudi/sink/utils/EventBuffers.java   | 31 +++++++++++--
 .../org/apache/hudi/sink/TestWriteMergeOnRead.java | 32 +++++++++++++
 .../sink/utils/StreamWriteFunctionWrapper.java     | 23 ++++++++-
 .../test/java/org/apache/hudi/utils/TestData.java  |  8 ++++
 6 files changed, 125 insertions(+), 29 deletions(-)

diff --git a/hudi-examples/hudi-examples-flink/pom.xml 
b/hudi-examples/hudi-examples-flink/pom.xml
index 4e7a7f16d72b..08666562cf48 100644
--- a/hudi-examples/hudi-examples-flink/pom.xml
+++ b/hudi-examples/hudi-examples-flink/pom.xml
@@ -117,6 +117,12 @@
             <scope>compile</scope>
         </dependency>
 
+        <!-- Kryo -->
+        <dependency>
+            <groupId>com.esotericsoftware</groupId>
+            <artifactId>kryo-shaded</artifactId>
+        </dependency>
+
         <!-- Flink -->
         <dependency>
             <groupId>org.apache.flink</groupId>
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 4368ca3b3654..e6c37acb9d24 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SerializationUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -38,7 +39,6 @@ import org.apache.hudi.hive.HiveSyncTool;
 import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
 import org.apache.hudi.sink.event.Correspondent;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
-import org.apache.hudi.sink.utils.CommitGuard;
 import org.apache.hudi.sink.utils.CoordinationResponseSerDe;
 import org.apache.hudi.sink.utils.EventBuffers;
 import org.apache.hudi.sink.utils.ExplicitClassloaderThreadFactory;
@@ -195,11 +195,6 @@ public class StreamWriteOperatorCoordinator
    */
   private ClientIds clientIds;
 
-  /**
-   * The commit guard for blocking instant time generation.
-   */
-  private Option<CommitGuard> commitGuardOpt;
-
   /**
    * Constructs a StreamingSinkOperatorCoordinator.
    *
@@ -220,10 +215,8 @@ public class StreamWriteOperatorCoordinator
     // setup classloader for APIs that use reflection without taking 
ClassLoader param
     // reference: 
https://stackoverflow.com/questions/1771679/difference-between-threads-context-class-loader-and-normal-classloader
     Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+    initEventBufferIfNecessary();
     this.tableState = TableState.create(conf);
-    initCommitGuard(this.conf);
-    // initialize event buffer
-    this.eventBuffers = EventBuffers.getInstance(this.commitGuardOpt);
     this.gateways = new SubtaskGateway[this.parallelism];
     try {
       // init table, create if not exists.
@@ -249,6 +242,7 @@ public class StreamWriteOperatorCoordinator
       if (OptionsResolver.isMultiWriter(conf)) {
         initClientIds(conf);
       }
+      restoreEvents();
     } catch (Throwable throwable) {
       LOG.error("Failed to start operator coordinator.", throwable);
       context.failJob(throwable);
@@ -283,7 +277,8 @@ public class StreamWriteOperatorCoordinator
     executor.execute(
         () -> {
           try {
-            result.complete(new byte[0]);
+            byte[] eventBytes = 
SerializationUtils.serialize(this.eventBuffers.getAllCompletedEvents());
+            result.complete(eventBytes);
           } catch (Throwable throwable) {
             // when a checkpoint fails, throws directly.
             result.completeExceptionally(
@@ -316,7 +311,10 @@ public class StreamWriteOperatorCoordinator
 
   @Override
   public void resetToCheckpoint(long checkpointID, byte[] checkpointData) {
-    // no operation
+    if (checkpointData != null) {
+      initEventBufferIfNecessary();
+      
this.eventBuffers.addEventsToBuffer(SerializationUtils.deserialize(checkpointData));
+    }
   }
 
   @Override
@@ -367,7 +365,7 @@ public class StreamWriteOperatorCoordinator
       final String instantTime;
       if (instantTimeAndEventBuffer == null) {
         // wait until previous instants are committed.
-        awaitAllInstantsToCompleteIfNecessary();
+        eventBuffers.awaitAllInstantsToCompleteIfNecessary();
         instantTime = startInstant();
         this.eventBuffers.initNewEventBuffer(checkpointId, instantTime, 
this.parallelism);
       } else {
@@ -382,9 +380,12 @@ public class StreamWriteOperatorCoordinator
   //  Utilities
   // -------------------------------------------------------------------------
 
-  private void awaitAllInstantsToCompleteIfNecessary() {
-    if (this.commitGuardOpt.isPresent() && this.eventBuffers.nonEmpty()) {
-      
this.commitGuardOpt.get().blockFor(this.eventBuffers.getPendingInstants());
+  private void restoreEvents() {
+    if (this.eventBuffers.nonEmpty()) {
+      final HoodieTimeline completedTimeline = 
this.metaClient.getActiveTimeline().filterCompletedInstants();
+      this.eventBuffers.getEventBufferStream()
+          .forEach(entry -> recommitInstant(completedTimeline, entry.getKey(), 
entry.getValue().getLeft(), entry.getValue().getRight()));
+      this.metaClient.reloadActiveTimeline();
     }
   }
 
@@ -441,12 +442,12 @@ public class StreamWriteOperatorCoordinator
     this.clientIds.start();
   }
 
-  private void initCommitGuard(Configuration conf) {
-    if (tableState.isBlockingInstantGeneration) {
-      this.commitGuardOpt = 
Option.of(CommitGuard.create(conf.get(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)));
-    } else {
-      this.commitGuardOpt = Option.empty();
+  private void initEventBufferIfNecessary() {
+    if (this.eventBuffers != null) {
+      return;
     }
+    // initialize event buffer
+    this.eventBuffers = EventBuffers.getInstance(conf);
   }
 
   private String startInstant() {
@@ -470,6 +471,14 @@ public class StreamWriteOperatorCoordinator
    */
   private void recommitInstant(long checkpointId, String instant, 
WriteMetadataEvent[] bootstrapBuffer) {
     HoodieTimeline completedTimeline = 
this.metaClient.getActiveTimeline().filterCompletedInstants();
+    recommitInstant(completedTimeline, checkpointId, instant, bootstrapBuffer);
+  }
+
+  /**
+   * Recommits the last inflight instant if the write metadata checkpoint 
successfully
+   * but was not committed due to some rare cases.
+   */
+  private void recommitInstant(HoodieTimeline completedTimeline, long 
checkpointId, String instant, WriteMetadataEvent[] bootstrapBuffer) {
     if (!completedTimeline.containsInstant(instant)) {
       LOG.info("Recommit instant {}", instant);
       // Recommit should start heartbeat for lazy failed writes clean policy 
to avoid aborting for heartbeat expired;
@@ -686,10 +695,6 @@ public class StreamWriteOperatorCoordinator
     final boolean syncHive;
     final boolean syncMetadata;
     final boolean isDeltaTimeCompaction;
-    /**
-     * Whether the writer for the table applies blocking instant generation.
-     */
-    final boolean isBlockingInstantGeneration;
 
     private TableState(Configuration conf) {
       this.operationType = 
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
@@ -701,7 +706,6 @@ public class StreamWriteOperatorCoordinator
       this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED);
       this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
       this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf);
-      this.isBlockingInstantGeneration = 
OptionsResolver.isBlockingInstantGeneration(conf);
     }
 
     public static TableState create(Configuration conf) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
index fa5595f93528..d4eb6a393883 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
@@ -20,13 +20,17 @@ package org.apache.hudi.sink.utils;
 
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 
+import org.apache.flink.configuration.Configuration;
+
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -45,8 +49,10 @@ public class EventBuffers implements Serializable {
     this.commitGuardOption = commitGuardOption;
   }
 
-  public static EventBuffers getInstance(Option<CommitGuard> 
commitGuardOption) {
-    return new EventBuffers(new ConcurrentHashMap<>(), commitGuardOption);
+  public static EventBuffers getInstance(Configuration conf) {
+    final Option<CommitGuard> commitGuardOpt = 
OptionsResolver.isBlockingInstantGeneration(conf)
+        ? 
Option.of(CommitGuard.create(conf.get(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))) 
: Option.empty();
+    return new EventBuffers(new ConcurrentSkipListMap<>(), commitGuardOpt);
   }
 
   /**
@@ -71,6 +77,10 @@ public class EventBuffers implements Serializable {
     return eventBuffer;
   }
 
+  public void addEventsToBuffer(Map<Long, Pair<String, WriteMetadataEvent[]>> 
events) {
+    this.eventBuffers.putAll(events);
+  }
+
   /**
    * Returns existing bootstrap buffer or creates a new one.
    */
@@ -118,6 +128,12 @@ public class EventBuffers implements Serializable {
     this.eventBuffers.put(checkpointId, Pair.of(instantTime, new 
WriteMetadataEvent[parallelism]));
   }
 
+  public void awaitAllInstantsToCompleteIfNecessary() {
+    if (this.commitGuardOption.isPresent() && nonEmpty()) {
+      this.commitGuardOption.get().blockFor(getPendingInstants());
+    }
+  }
+
   public void reset(long checkpointId) {
     this.eventBuffers.remove(checkpointId);
     this.commitGuardOption.ifPresent(CommitGuard::unblock);
@@ -132,4 +148,13 @@ public class EventBuffers implements Serializable {
   public String getPendingInstants() {
     return 
this.eventBuffers.values().stream().map(Pair::getKey).collect(Collectors.joining(","));
   }
+
+  /**
+   * Get write metadata events where there exists no event sent by eager 
flushing from writers.
+   */
+  public Map<Long, Pair<String, WriteMetadataEvent[]>> getAllCompletedEvents() 
{
+    return this.eventBuffers.entrySet().stream()
+        .filter(entry -> 
Arrays.stream(entry.getValue().getRight()).allMatch(event -> event == null || 
event.isLastBatch()))
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index ce0581f3300b..e75e669b703b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -221,6 +221,38 @@ public class TestWriteMergeOnRead extends 
TestWriteCopyOnWrite {
         .end();
   }
 
+  @Test
+  public void testRecommitAfterCoordinatorRestart() throws Exception {
+    Map<String, String> expected = new HashMap<>();
+    expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
+    expected.put("par2", "[id3,par2,id3,Julian,53,3,par2, 
id4,par2,id4,Fabian,31,4,par2]");
+    preparePipeline(conf)
+        .consume(TestData.DATA_SET_PART1)
+        .emptyEventBuffer()
+        .checkpoint(1)
+        .assertNextEvent(1, "par1")
+        .consume(TestData.DATA_SET_PART3)
+        .checkpoint(2)
+        // both ckp-1 and ckp-2 are not committing
+        .assertNextEvent(1, "par2")
+        // then simulating restarting job manually, coordinator will reset to 
ckp-2
+        // and recommit write metadata for ckp-1
+        .restartCoordinator()
+        .subTaskFails(0, 0)
+        // subtask will resend the write metadata event during initialize state
+        // and coordinator will recommit data for ckp-2
+        .assertNextEvent()
+        // insert another batch of data.
+        .consume(TestData.DATA_SET_PART4)
+        .checkpoint(3)
+        .assertNextEvent(1, "par2")
+        // write metadata will be committed for ckp-3
+        .checkpointComplete(3)
+        // there should be 3 rows and 2 partitions
+        .checkWrittenData(expected, 2)
+        .end();
+  }
+
   @Override
   protected Map<String, String> getExpectedBeforeCheckpointComplete() {
     return EXPECTED1;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 9efd7fb2d302..64e701dba43c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -59,6 +59,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -76,6 +77,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   private final MockOperatorCoordinatorContext coordinatorContext;
   private StreamWriteOperatorCoordinator coordinator;
   private final MockStateInitializationContext stateInitializationContext;
+  private final TreeMap<Long, byte[]> coordinatorStateStore;
 
   /**
    * Function that converts row data to HoodieRecord.
@@ -126,6 +128,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     this.coordinator = new StreamWriteOperatorCoordinator(conf, 
this.coordinatorContext);
     this.bucketAssignFunctionContext = new MockBucketAssignFunctionContext();
     this.stateInitializationContext = new MockStateInitializationContext();
+    this.coordinatorStateStore = new TreeMap<>();
     this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
     this.streamConfig = new StreamConfig(conf);
     streamConfig.setOperatorID(new OperatorID());
@@ -137,6 +140,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   }
 
   public void openFunction() throws Exception {
+    resetCoordinatorToCheckpoint();
     this.coordinator.start();
     this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
     toHoodieFunction = new RowDataToHoodieFunction<>(rowType, conf);
@@ -199,7 +203,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
 
   public void checkpointFunction(long checkpointId) throws Exception {
     // checkpoint the coordinator first
-    this.coordinator.checkpointCoordinator(checkpointId, new 
CompletableFuture<>());
+    checkpointCoordinator(checkpointId);
     if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
       bootstrapOperator.snapshotState(null);
     }
@@ -209,6 +213,21 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     stateInitializationContext.checkpointBegin(checkpointId);
   }
 
+  private void checkpointCoordinator(long checkpointId) throws Exception {
+    CompletableFuture<byte[]> completableFuture = new CompletableFuture<>();
+    // checkpoint the coordinator first
+    this.coordinator.checkpointCoordinator(checkpointId, completableFuture);
+    this.coordinatorStateStore.put(checkpointId, completableFuture.get());
+  }
+
+  private void resetCoordinatorToCheckpoint() {
+    if (coordinatorStateStore.isEmpty()) {
+      return;
+    }
+    Map.Entry<Long, byte[]> latestState = 
this.coordinatorStateStore.lastEntry();
+    this.coordinator.resetToCheckpoint(latestState.getKey(), 
latestState.getValue());
+  }
+
   public void endInput() {
     writeFunction.endInput();
   }
@@ -244,6 +263,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
 
   public void coordinatorFails() throws Exception {
     this.coordinator.close();
+    resetCoordinatorToCheckpoint();
     this.coordinator.start();
     this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
   }
@@ -251,6 +271,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   public void restartCoordinator() throws Exception {
     this.coordinator.close();
     this.coordinator = new StreamWriteOperatorCoordinator(conf, 
this.coordinatorContext);
+    resetCoordinatorToCheckpoint();
     this.coordinator.start();
     this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index d344134e3aa2..f005ecfb9d33 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -367,6 +367,14 @@ public class TestData {
       insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
           TimestampData.fromEpochMillis(1), StringData.fromString("par2")));
 
+  public static List<RowData> DATA_SET_PART3 = Collections.singletonList(
+      insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 
53,
+          TimestampData.fromEpochMillis(3), StringData.fromString("par2")));
+
+  public static List<RowData> DATA_SET_PART4 = Collections.singletonList(
+      insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"), 
31,
+          TimestampData.fromEpochMillis(4), StringData.fromString("par2")));
+
   public static List<RowData> DATA_SET_SINGLE_DELETE = 
Collections.singletonList(
       deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
           TimestampData.fromEpochMillis(5), StringData.fromString("par1")));

Reply via email to