zhijiangW commented on a change in pull request #12460:
URL: https://github.com/apache/flink/pull/12460#discussion_r435782846
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -409,5 +416,15 @@ synchronized long getCurrentCheckpointId() {
boolean isCheckpointPending() {
return numBarriersReceived > 0;
}
+
+ private void resetReceivedBarriers() {
+ Arrays.fill(storeNewBuffers, false);
+ numBarriersReceived = 0;
+ }
+
+ private void notifyAbort(CheckpointException exception) throws
IOException {
+ long currentCheckpointId = currentReceivedCheckpointId;
+ handler.executeInTaskThread(() ->
handler.notifyAbort(currentCheckpointId, exception), "notifyAbort");
+ }
Review comment:
Yeah, we already had this guard in `#notifyCheckpoint` method. So are
there any concerns for `#notifyAbort` method?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]