zhijiangW commented on a change in pull request #12460:
URL: https://github.com/apache/flink/pull/12460#discussion_r434711700



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -387,12 +378,28 @@ synchronized void resetReceivedBarriers(long 
checkpointId) {
                        return allBarriersReceivedFuture;
                }
 
-               synchronized void onChannelClosed() {
+               synchronized boolean onChannelClosed() throws IOException {
                        numOpenChannels--;
+
+                       if (numBarriersReceived > 0) {
+                               resetReceivedBarriers();
+                               notifyAbort(new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+                               return true;
+                       }
+                       return false;
                }
 
-               synchronized void setCurrentReceivedCheckpointId(long 
currentReceivedCheckpointId) {
-                       this.currentReceivedCheckpointId = 
Math.max(currentReceivedCheckpointId, this.currentReceivedCheckpointId);
+               synchronized boolean setCancelledCheckpointId(long 
canceledCheckpointId) {
+                       boolean shouldAbort = false;
+                       if (canceledCheckpointId > currentReceivedCheckpointId) 
{
+                               currentReceivedCheckpointId = 
canceledCheckpointId;
+                               shouldAbort = true;
+
+                       } else if (canceledCheckpointId == 
currentReceivedCheckpointId && isCheckpointPending()) {
+                               resetReceivedBarriers();
+                               shouldAbort = true;
+                       }
+                       return shouldAbort;

Review comment:
       For cancellation checkpoint case, it is still valid because the the 
`currentReceivedCheckpointId` from 
`threadSafeUnaligner.getCurrentCheckpointId()` would be updated via 
`#setCancelledCheckpointId()`.
   
   For end of partition case, we did not couple it with specific checkpoint id 
as explained in above comment. And we only abort the current pending checkpoint 
if have, and allow the following new checkpoint happen afterwards.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to