zhijiangW commented on a change in pull request #12460:
URL: https://github.com/apache/flink/pull/12460#discussion_r434708019



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -409,5 +416,15 @@ synchronized long getCurrentCheckpointId() {
                boolean isCheckpointPending() {
                        return numBarriersReceived > 0;
                }
+
+               private void resetReceivedBarriers() {
+                       Arrays.fill(storeNewBuffers, false);
+                       numBarriersReceived = 0;
+               }
+
+               private void notifyAbort(CheckpointException exception) throws 
IOException {
+                       long currentCheckpointId = currentReceivedCheckpointId;
+                       handler.executeInTaskThread(() -> 
handler.notifyAbort(currentCheckpointId, exception), "notifyAbort");
+               }

Review comment:
       Yeah, I get your point here. 
   
   `notifyAbort` actually be used in two places ATM, one case is that the netty 
thread firstly receive the ch1 from one channel and triggered it into mailbox, 
then it receives the ch2 from another input channel and should abort the 
previous ch1. But we do not know whether the previous triggered cha1 was 
executing or not. If not executed, it would be skipped via comparing the 
current checkpoint id. Otherwise it would be aborted during execution.
   
   Another case is for processing end of partition (eof) as you said above. My 
previous thought was that in the process of async checkpoint, the task thread 
can still process input (eof). If so, we can not always assume the previous 
triggered checkpoint has not started when processing eof.
   
   If my assumption is wrong, your suggestion should be an improvement in this 
case. But it might have some concerns to couple eof with current checkpoint 
ids. Because the current semantic of eof only aborts the current pending 
checkpoint, that means it can still trigger new checkpoint afterwards. So it 
might destroy the normal checkpoint/cancellation id judgment for the following 
checkpoints even though some channels are already ended.
   
   E.g. when the eof is processed and the current checkpoint id is 3, we should 
abort the current ch3. But when we receive ch4 from one channel, we should also 
trigger ch4 in normal way from the perspective of `CheckpointBarrierHandler`. 
If we bump the current checkpoint id to 4  or larger id when processing eof, 
that would break the following new received ids. Unless we make the assumption 
that `CheckpointCoordinator` would never trigger new checkpoint after 
processing eof, but we actually did not have such assumption in the past from 
`CheckpointBarrierHandler` view.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to