gaoyunhaii commented on pull request #15055: URL: https://github.com/apache/flink/pull/15055#issuecomment-868255975
Hi @dawidwys very thanks for the effective review! The current modification in the `CheckpointBarrierHandler` is indeed for tracking the pending barriers and received barriers for each channel, and not modify the logic regarding the alignment time metric. The current alignment time is indeed not reasonable for `CheckpointBarrierTracker` and I also agree with that we could deal with it separately. Regarding the process of `EndOfPartitionEvent` in the `CheckpointBarrierHandler`, I also agree with that it would be more safe to keep strict state for blocked and priority channels, and we might split the implementation for `SingleCheckpointBarrierHandler` and `CheckpointTracker`. The only remaining concern is about the complexity of direct implementation of processing `EndOfPartitionEvent`. With the direct implementation we might need to check carefully of the consistency with processing barrier, for example, for aligned checkpoints with timeout, perhaps we might also need to check the timeout when processing `EndOfPartitionEvent`? However, viewing `EndOfPartitionEvent` as `Barriers for each input channel followed by an end notification` was indeed conflict with the point 1, since when process the virtually added last checkpoint barrier the expected logic is different from normal barriers for block/unblock & prioritize channels. And I also not very tend to add additional flag to `BarrierHandlerState#barrierReceived()` since it would make the logic fragile. Thus overall speaking I also agree with that the commit in https://github.com/dawidwys/flink/commit/c1a4867cd090997bd504f8e201324214bdebece3 would be more preferred~ I'll update the PR with this method, perhaps with some implementation modification: 1. Consider timeout for `EndOfPartitionEvent` for alternating checkpoints. 2. For CheckpointBarrierTracker, we might need to find the largest checkpoint that is aligned when received EndOfPartition. 3. we might also need to maintain the set of aligned channels for `SingleBarrierCheckpointHandler`, otherwise there might be problem if we received both `barrier` and `EndOfPartitionEvent` from one channel during checkpoint. For example, suppose we have 3 channels, if ``` a. Received barrier@#1, barrierReceived = 1, openChannels = 3 b. Received barrier@#2, barrierReceived = 2, openChannels = 3 c. Received EndOfPartitionEvent@#1, barrier received = 2, open channels = 2, triggered worngly. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org