This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a1fd22841 [HUDI-4311] Fix Flink lose data on some rollback scene 
(#5950)
3a1fd22841 is described below

commit 3a1fd22841ab40b98589f5dc17717548500bde4b
Author: 吴祥平 <[email protected]>
AuthorDate: Mon Jun 27 16:09:44 2022 +0800

    [HUDI-4311] Fix Flink lose data on some rollback scene (#5950)
---
 .../main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java  | 2 +-
 .../hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java | 2 --
 2 files changed, 1 insertion(+), 3 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 44553820be..6aa4c0b1f8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -271,7 +271,7 @@ public class StreamWriteOperatorCoordinator
 
   @Override
   public void notifyCheckpointAborted(long checkpointId) {
-    if (checkpointId == this.checkpointId) {
+    if (checkpointId == this.checkpointId && 
!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) {
       executor.execute(() -> {
         this.ckpMetadata.abortInstant(this.instant);
       }, "abort instant %s", this.instant);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index 45a4e04bab..f059c7050c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -97,8 +97,6 @@ public class CkpMetadata implements Serializable {
   public void bootstrap(HoodieTableMetaClient metaClient) throws IOException {
     fs.delete(path, true);
     fs.mkdirs(path);
-    
metaClient.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction()
-        .lastInstant().ifPresent(instant -> 
startInstant(instant.getTimestamp()));
   }
 
   public void startInstant(String instant) {

Reply via email to