This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3a1fd22841 [HUDI-4311] Fix Flink lose data on some rollback scene
(#5950)
3a1fd22841 is described below
commit 3a1fd22841ab40b98589f5dc17717548500bde4b
Author: 吴祥平 <[email protected]>
AuthorDate: Mon Jun 27 16:09:44 2022 +0800
[HUDI-4311] Fix Flink lose data on some rollback scene (#5950)
---
.../main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java | 2 +-
.../hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java | 2 --
2 files changed, 1 insertion(+), 3 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 44553820be..6aa4c0b1f8 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -271,7 +271,7 @@ public class StreamWriteOperatorCoordinator
@Override
public void notifyCheckpointAborted(long checkpointId) {
- if (checkpointId == this.checkpointId) {
+ if (checkpointId == this.checkpointId &&
!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) {
executor.execute(() -> {
this.ckpMetadata.abortInstant(this.instant);
}, "abort instant %s", this.instant);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index 45a4e04bab..f059c7050c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -97,8 +97,6 @@ public class CkpMetadata implements Serializable {
public void bootstrap(HoodieTableMetaClient metaClient) throws IOException {
fs.delete(path, true);
fs.mkdirs(path);
-
metaClient.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction()
- .lastInstant().ifPresent(instant ->
startInstant(instant.getTimestamp()));
}
public void startInstant(String instant) {