danny0405 commented on code in PR #13530:
URL: https://github.com/apache/hudi/pull/13530#discussion_r2194782527
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java:
##########
@@ -207,31 +207,33 @@ private void initCheckpointId(int attemptId, long
restoredCheckpointId) throws E
this.checkpointId = restoredCheckpointId;
}
- private void sendBootstrapEvent(int attemptId, boolean isRestored) throws
Exception {
- if (attemptId <= 0) {
- if (isRestored) {
- HoodieTimeline pendingTimeline =
this.metaClient.getActiveTimeline().filterPendingExcludingCompaction();
- // if the task is initially started, resend the pending event.
- for (WriteMetadataEvent event : this.writeMetadataState.get()) {
- // Must filter out the completed instants in case it is a partial
failover,
- // the write status should not be accumulated in such case.
- if (pendingTimeline.containsInstant(event.getInstantTime())) {
- // Reset taskID for event
- event.setTaskID(taskID);
- // The checkpoint succeed but the meta does not commit,
- // re-commit the inflight instant
- this.eventGateway.sendEventToCoordinator(event);
- LOG.info("Send uncommitted write metadata event to coordinator,
task[{}].", taskID);
- }
- }
- }
- } else {
- // otherwise sends an empty bootstrap event instead.
+ private void sendBootstrapEvent(boolean isRestored) throws Exception {
+ if (!isRestored || !sendPendingCommitEvents()) {
this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID,
checkpointId));
LOG.info("Send bootstrap write metadata event to coordinator,
task[{}].", taskID);
}
}
+ private boolean sendPendingCommitEvents() throws Exception {
+ boolean eventSent = false;
Review Comment:
> Also It seems there’s no harm in resending it once :)
It could incur duplicate metadata event then duplicates for incremental read.
And the fix in the patch does not really solve the issue you raised here, we
need to add instead of override the metadata events state if the exiting
instant does not commit yet.
My idea is in the request instant time PRC, we return all the pending
instants altogether with the new instant time, then check the state to only
remove those that is already committed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]