This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0d0c84705ca3 [HUDI-9570] Using coordinator state to persist and
recommit write metadata events (#13543)
0d0c84705ca3 is described below
commit 0d0c84705ca31aa8f11d9cce97b83898e4ff233a
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Jul 17 09:09:08 2025 +0800
[HUDI-9570] Using coordinator state to persist and recommit write metadata
events (#13543)
---
hudi-examples/hudi-examples-flink/pom.xml | 6 +++
.../hudi/sink/StreamWriteOperatorCoordinator.java | 54 ++++++++++++----------
.../org/apache/hudi/sink/utils/EventBuffers.java | 31 +++++++++++--
.../org/apache/hudi/sink/TestWriteMergeOnRead.java | 32 +++++++++++++
.../sink/utils/StreamWriteFunctionWrapper.java | 23 ++++++++-
.../test/java/org/apache/hudi/utils/TestData.java | 8 ++++
6 files changed, 125 insertions(+), 29 deletions(-)
diff --git a/hudi-examples/hudi-examples-flink/pom.xml
b/hudi-examples/hudi-examples-flink/pom.xml
index 4e7a7f16d72b..08666562cf48 100644
--- a/hudi-examples/hudi-examples-flink/pom.xml
+++ b/hudi-examples/hudi-examples-flink/pom.xml
@@ -117,6 +117,12 @@
<scope>compile</scope>
</dependency>
+ <!-- Kryo -->
+ <dependency>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>kryo-shaded</artifactId>
+ </dependency>
+
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 4368ca3b3654..e6c37acb9d24 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SerializationUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
@@ -38,7 +39,6 @@ import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
import org.apache.hudi.sink.event.Correspondent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
-import org.apache.hudi.sink.utils.CommitGuard;
import org.apache.hudi.sink.utils.CoordinationResponseSerDe;
import org.apache.hudi.sink.utils.EventBuffers;
import org.apache.hudi.sink.utils.ExplicitClassloaderThreadFactory;
@@ -195,11 +195,6 @@ public class StreamWriteOperatorCoordinator
*/
private ClientIds clientIds;
- /**
- * The commit guard for blocking instant time generation.
- */
- private Option<CommitGuard> commitGuardOpt;
-
/**
* Constructs a StreamingSinkOperatorCoordinator.
*
@@ -220,10 +215,8 @@ public class StreamWriteOperatorCoordinator
// setup classloader for APIs that use reflection without taking
ClassLoader param
// reference:
https://stackoverflow.com/questions/1771679/difference-between-threads-context-class-loader-and-normal-classloader
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ initEventBufferIfNecessary();
this.tableState = TableState.create(conf);
- initCommitGuard(this.conf);
- // initialize event buffer
- this.eventBuffers = EventBuffers.getInstance(this.commitGuardOpt);
this.gateways = new SubtaskGateway[this.parallelism];
try {
// init table, create if not exists.
@@ -249,6 +242,7 @@ public class StreamWriteOperatorCoordinator
if (OptionsResolver.isMultiWriter(conf)) {
initClientIds(conf);
}
+ restoreEvents();
} catch (Throwable throwable) {
LOG.error("Failed to start operator coordinator.", throwable);
context.failJob(throwable);
@@ -283,7 +277,8 @@ public class StreamWriteOperatorCoordinator
executor.execute(
() -> {
try {
- result.complete(new byte[0]);
+ byte[] eventBytes =
SerializationUtils.serialize(this.eventBuffers.getAllCompletedEvents());
+ result.complete(eventBytes);
} catch (Throwable throwable) {
// when a checkpoint fails, throws directly.
result.completeExceptionally(
@@ -316,7 +311,10 @@ public class StreamWriteOperatorCoordinator
@Override
public void resetToCheckpoint(long checkpointID, byte[] checkpointData) {
- // no operation
+ if (checkpointData != null) {
+ initEventBufferIfNecessary();
+
this.eventBuffers.addEventsToBuffer(SerializationUtils.deserialize(checkpointData));
+ }
}
@Override
@@ -367,7 +365,7 @@ public class StreamWriteOperatorCoordinator
final String instantTime;
if (instantTimeAndEventBuffer == null) {
// wait until previous instants are committed.
- awaitAllInstantsToCompleteIfNecessary();
+ eventBuffers.awaitAllInstantsToCompleteIfNecessary();
instantTime = startInstant();
this.eventBuffers.initNewEventBuffer(checkpointId, instantTime,
this.parallelism);
} else {
@@ -382,9 +380,12 @@ public class StreamWriteOperatorCoordinator
// Utilities
// -------------------------------------------------------------------------
- private void awaitAllInstantsToCompleteIfNecessary() {
- if (this.commitGuardOpt.isPresent() && this.eventBuffers.nonEmpty()) {
-
this.commitGuardOpt.get().blockFor(this.eventBuffers.getPendingInstants());
+ private void restoreEvents() {
+ if (this.eventBuffers.nonEmpty()) {
+ final HoodieTimeline completedTimeline =
this.metaClient.getActiveTimeline().filterCompletedInstants();
+ this.eventBuffers.getEventBufferStream()
+ .forEach(entry -> recommitInstant(completedTimeline, entry.getKey(),
entry.getValue().getLeft(), entry.getValue().getRight()));
+ this.metaClient.reloadActiveTimeline();
}
}
@@ -441,12 +442,12 @@ public class StreamWriteOperatorCoordinator
this.clientIds.start();
}
- private void initCommitGuard(Configuration conf) {
- if (tableState.isBlockingInstantGeneration) {
- this.commitGuardOpt =
Option.of(CommitGuard.create(conf.get(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)));
- } else {
- this.commitGuardOpt = Option.empty();
+ private void initEventBufferIfNecessary() {
+ if (this.eventBuffers != null) {
+ return;
}
+ // initialize event buffer
+ this.eventBuffers = EventBuffers.getInstance(conf);
}
private String startInstant() {
@@ -470,6 +471,14 @@ public class StreamWriteOperatorCoordinator
*/
private void recommitInstant(long checkpointId, String instant,
WriteMetadataEvent[] bootstrapBuffer) {
HoodieTimeline completedTimeline =
this.metaClient.getActiveTimeline().filterCompletedInstants();
+ recommitInstant(completedTimeline, checkpointId, instant, bootstrapBuffer);
+ }
+
+ /**
+ * Recommits the last inflight instant if the write metadata checkpoint
successfully
+ * but was not committed due to some rare cases.
+ */
+ private void recommitInstant(HoodieTimeline completedTimeline, long
checkpointId, String instant, WriteMetadataEvent[] bootstrapBuffer) {
if (!completedTimeline.containsInstant(instant)) {
LOG.info("Recommit instant {}", instant);
// Recommit should start heartbeat for lazy failed writes clean policy
to avoid aborting for heartbeat expired;
@@ -686,10 +695,6 @@ public class StreamWriteOperatorCoordinator
final boolean syncHive;
final boolean syncMetadata;
final boolean isDeltaTimeCompaction;
- /**
- * Whether the writer for the table applies blocking instant generation.
- */
- final boolean isBlockingInstantGeneration;
private TableState(Configuration conf) {
this.operationType =
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
@@ -701,7 +706,6 @@ public class StreamWriteOperatorCoordinator
this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED);
this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf);
- this.isBlockingInstantGeneration =
OptionsResolver.isBlockingInstantGeneration(conf);
}
public static TableState create(Configuration conf) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
index fa5595f93528..d4eb6a393883 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
@@ -20,13 +20,17 @@ package org.apache.hudi.sink.utils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.flink.configuration.Configuration;
+
import java.io.Serializable;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -45,8 +49,10 @@ public class EventBuffers implements Serializable {
this.commitGuardOption = commitGuardOption;
}
- public static EventBuffers getInstance(Option<CommitGuard>
commitGuardOption) {
- return new EventBuffers(new ConcurrentHashMap<>(), commitGuardOption);
+ public static EventBuffers getInstance(Configuration conf) {
+ final Option<CommitGuard> commitGuardOpt =
OptionsResolver.isBlockingInstantGeneration(conf)
+ ?
Option.of(CommitGuard.create(conf.get(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)))
: Option.empty();
+ return new EventBuffers(new ConcurrentSkipListMap<>(), commitGuardOpt);
}
/**
@@ -71,6 +77,10 @@ public class EventBuffers implements Serializable {
return eventBuffer;
}
+ public void addEventsToBuffer(Map<Long, Pair<String, WriteMetadataEvent[]>>
events) {
+ this.eventBuffers.putAll(events);
+ }
+
/**
* Returns existing bootstrap buffer or creates a new one.
*/
@@ -118,6 +128,12 @@ public class EventBuffers implements Serializable {
this.eventBuffers.put(checkpointId, Pair.of(instantTime, new
WriteMetadataEvent[parallelism]));
}
+ public void awaitAllInstantsToCompleteIfNecessary() {
+ if (this.commitGuardOption.isPresent() && nonEmpty()) {
+ this.commitGuardOption.get().blockFor(getPendingInstants());
+ }
+ }
+
public void reset(long checkpointId) {
this.eventBuffers.remove(checkpointId);
this.commitGuardOption.ifPresent(CommitGuard::unblock);
@@ -132,4 +148,13 @@ public class EventBuffers implements Serializable {
public String getPendingInstants() {
return
this.eventBuffers.values().stream().map(Pair::getKey).collect(Collectors.joining(","));
}
+
+ /**
+ * Get write metadata events where there exists no event sent by eager
flushing from writers.
+ */
+ public Map<Long, Pair<String, WriteMetadataEvent[]>> getAllCompletedEvents()
{
+ return this.eventBuffers.entrySet().stream()
+ .filter(entry ->
Arrays.stream(entry.getValue().getRight()).allMatch(event -> event == null ||
event.isLastBatch()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index ce0581f3300b..e75e669b703b 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -221,6 +221,38 @@ public class TestWriteMergeOnRead extends
TestWriteCopyOnWrite {
.end();
}
+ @Test
+ public void testRecommitAfterCoordinatorRestart() throws Exception {
+ Map<String, String> expected = new HashMap<>();
+ expected.put("par1", "[id1,par1,id1,Danny,23,1,par1]");
+ expected.put("par2", "[id3,par2,id3,Julian,53,3,par2,
id4,par2,id4,Fabian,31,4,par2]");
+ preparePipeline(conf)
+ .consume(TestData.DATA_SET_PART1)
+ .emptyEventBuffer()
+ .checkpoint(1)
+ .assertNextEvent(1, "par1")
+ .consume(TestData.DATA_SET_PART3)
+ .checkpoint(2)
+ // both ckp-1 and ckp-2 are not committing
+ .assertNextEvent(1, "par2")
+ // then simulating restarting job manually, coordinator will reset to
ckp-2
+ // and recommit write metadata for ckp-1
+ .restartCoordinator()
+ .subTaskFails(0, 0)
+ // subtask will resend the write metadata event during initialize state
+ // and coordinator will recommit data for ckp-2
+ .assertNextEvent()
+ // insert another batch of data.
+ .consume(TestData.DATA_SET_PART4)
+ .checkpoint(3)
+ .assertNextEvent(1, "par2")
+ // write metadata will be committed for ckp-3
+ .checkpointComplete(3)
+ // there should be 3 rows and 2 partitions
+ .checkWrittenData(expected, 2)
+ .end();
+ }
+
@Override
protected Map<String, String> getExpectedBeforeCheckpointComplete() {
return EXPECTED1;
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 9efd7fb2d302..64e701dba43c 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -59,6 +59,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
/**
@@ -76,6 +77,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
private final MockOperatorCoordinatorContext coordinatorContext;
private StreamWriteOperatorCoordinator coordinator;
private final MockStateInitializationContext stateInitializationContext;
+ private final TreeMap<Long, byte[]> coordinatorStateStore;
/**
* Function that converts row data to HoodieRecord.
@@ -126,6 +128,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
this.coordinator = new StreamWriteOperatorCoordinator(conf,
this.coordinatorContext);
this.bucketAssignFunctionContext = new MockBucketAssignFunctionContext();
this.stateInitializationContext = new MockStateInitializationContext();
+ this.coordinatorStateStore = new TreeMap<>();
this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
this.streamConfig = new StreamConfig(conf);
streamConfig.setOperatorID(new OperatorID());
@@ -137,6 +140,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
}
public void openFunction() throws Exception {
+ resetCoordinatorToCheckpoint();
this.coordinator.start();
this.coordinator.setExecutor(new
MockCoordinatorExecutor(coordinatorContext));
toHoodieFunction = new RowDataToHoodieFunction<>(rowType, conf);
@@ -199,7 +203,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
public void checkpointFunction(long checkpointId) throws Exception {
// checkpoint the coordinator first
- this.coordinator.checkpointCoordinator(checkpointId, new
CompletableFuture<>());
+ checkpointCoordinator(checkpointId);
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
bootstrapOperator.snapshotState(null);
}
@@ -209,6 +213,21 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
stateInitializationContext.checkpointBegin(checkpointId);
}
+ private void checkpointCoordinator(long checkpointId) throws Exception {
+ CompletableFuture<byte[]> completableFuture = new CompletableFuture<>();
+ // checkpoint the coordinator first
+ this.coordinator.checkpointCoordinator(checkpointId, completableFuture);
+ this.coordinatorStateStore.put(checkpointId, completableFuture.get());
+ }
+
+ private void resetCoordinatorToCheckpoint() {
+ if (coordinatorStateStore.isEmpty()) {
+ return;
+ }
+ Map.Entry<Long, byte[]> latestState =
this.coordinatorStateStore.lastEntry();
+ this.coordinator.resetToCheckpoint(latestState.getKey(),
latestState.getValue());
+ }
+
public void endInput() {
writeFunction.endInput();
}
@@ -244,6 +263,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
public void coordinatorFails() throws Exception {
this.coordinator.close();
+ resetCoordinatorToCheckpoint();
this.coordinator.start();
this.coordinator.setExecutor(new
MockCoordinatorExecutor(coordinatorContext));
}
@@ -251,6 +271,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
public void restartCoordinator() throws Exception {
this.coordinator.close();
this.coordinator = new StreamWriteOperatorCoordinator(conf,
this.coordinatorContext);
+ resetCoordinatorToCheckpoint();
this.coordinator.start();
this.coordinator.setExecutor(new
MockCoordinatorExecutor(coordinatorContext));
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index d344134e3aa2..f005ecfb9d33 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -367,6 +367,14 @@ public class TestData {
insertRow(StringData.fromString("id1"), StringData.fromString("Danny"),
23,
TimestampData.fromEpochMillis(1), StringData.fromString("par2")));
+ public static List<RowData> DATA_SET_PART3 = Collections.singletonList(
+ insertRow(StringData.fromString("id3"), StringData.fromString("Julian"),
53,
+ TimestampData.fromEpochMillis(3), StringData.fromString("par2")));
+
+ public static List<RowData> DATA_SET_PART4 = Collections.singletonList(
+ insertRow(StringData.fromString("id4"), StringData.fromString("Fabian"),
31,
+ TimestampData.fromEpochMillis(4), StringData.fromString("par2")));
+
public static List<RowData> DATA_SET_SINGLE_DELETE =
Collections.singletonList(
deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"),
23,
TimestampData.fromEpochMillis(5), StringData.fromString("par1")));