Hi all, I would like to resume this discussion for supporting checkpoints after tasks Finished :) Based on the previous discussion, we now implement a version of PoC [1] to try the idea. During the PoC we also met with some possible issues:
1. To include EndOfPartition into consideration for barrier alignment at the TM side, we now tend to decouple the logic for EndOfPartition with the normal alignment behaviors to avoid the complex interference (which seems to be a bit not trackable). We could do so by inserting suitable barriers for input channels received but not processed EndOfPartition. For example, if a task with four inputs has received barrier 2 from two input channels, but the other two inputs do not received barrier 2 before EndOfPartition due to the precedent tasks are finished, we could then insert barrier 2 for the last two channels so that we could still finish the checkpoint 2. 2. As we have discussed, if a tasks finished during we triggering the tasks, it would cause checkpoint failure and we should re-trigger its descendants. But if possible we think we might skip this issue at the first version to reduce the implementation complexity since it should not affect the correctness. We could considering support it in the following versions. 3. We would have to add a field isFinished to OperatorState so that we could not re-run finished sources after failover. However, this would require a new version of checkpoint meta. Currently Flink have an abstract MetaV2V3SerializerBase and have V2 and V3 extends it to share some implementation. To add V4 which is only different from V3 for one field, the current PoC want to introduce a new MetaV3V4SerializerBase extends MetaV2V3SerializerBase to share implementation between V3 and V4. This might looks a little complex and we might need a general mechanism to extend checkpoint meta format. 4. With the change StreamTask would have two types of subclasses according to how to implement triggerCheckpoint, one is source tasks that perform checkpoints immediately and another is the non-source tasks that would notify CheckpointBarrierHandler in some way. However, since we have multiple source tasks (legacy and new source) and multiple non-source tasks (one-input, two-input, multiple-input), it would cause the cases that multiple subclasses share the same implementation and cause code repetition. Currently the PoC introduces a new level of abstraction, namely SourceStreamTasks and NonSourceStreamTasks, but what makes it more complicated is that StreamingIterationHead extends OneInputStreamTask but it need to perform checkpoint as source tasks. Glad to hear your opinions! Best, Yun [1] https://github.com/gaoyunhaii/flink/commits/try_checkpoint_6 , starts from commit f8005be1ab5e5124e981e56db7bdf2908f4a969a.