gaoyunhaii commented on pull request #15055:
URL: https://github.com/apache/flink/pull/15055#issuecomment-868255975


   Hi @dawidwys very thanks for the effective review! The current modification 
in the `CheckpointBarrierHandler` is indeed for tracking the pending barriers 
and received barriers for each channel, and not modify the logic regarding the 
alignment time metric. The current alignment time is indeed not reasonable for 
`CheckpointBarrierTracker` and I also agree with that we could deal with it 
separately.
   
   Regarding the process of `EndOfPartitionEvent` in the 
`CheckpointBarrierHandler`,  I also agree with that it would be more safe to 
keep strict state for blocked and priority channels, and we might split the 
implementation for `SingleCheckpointBarrierHandler` and `CheckpointTracker`.  
The only remaining concern is about the complexity of direct implementation of 
processing `EndOfPartitionEvent`. With the direct implementation we might need 
to check carefully of the consistency with processing barrier, for example, for 
aligned checkpoints with timeout, perhaps we might also need to check the 
timeout when processing `EndOfPartitionEvent`? 
   
   However, viewing `EndOfPartitionEvent` as `Barriers for each input channel 
followed by an end notification` was indeed conflict with the point 1, since 
when process the virtually added last checkpoint barrier the expected logic is 
different from normal barriers for block/unblock & prioritize channels. And I 
also not very tend to add additional flag to 
`BarrierHandlerState#barrierReceived()` since it would make the logic fragile. 
Thus overall speaking I also agree with that the commit in 
https://github.com/dawidwys/flink/commit/c1a4867cd090997bd504f8e201324214bdebece3
 would be more preferred~ I'll update the PR with this method, perhaps with 
some implementation modification:
   
   1. Consider timeout for `EndOfPartitionEvent` for alternating checkpoints.
   2. For CheckpointBarrierTracker, we might need to find the largest 
checkpoint that is aligned when received EndOfPartition. 
   3. we might also need to maintain the set of aligned channels for 
`SingleBarrierCheckpointHandler`, otherwise there might be problem if we 
received both `barrier` and `EndOfPartitionEvent` from one channel during 
checkpoint. For example, suppose we have 3 channels, if
   
   ```
   a. Received barrier@#1, barrierReceived = 1, openChannels = 3
   b. Received barrier@#2, barrierReceived = 2, openChannels = 3
   c. Received EndOfPartitionEvent@#1, barrier received = 2, open channels = 2, 
triggered worngly. 
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to