Hi all,

    I would like to resume this discussion for supporting checkpoints after 
tasks Finished :) Based on the previous discussion, we now implement a version 
of PoC [1] to try the idea. During the PoC we also met with some possible 
issues:

    1. To include EndOfPartition into consideration for barrier alignment at 
the TM side, we now tend to decouple the logic for EndOfPartition with the 
normal alignment behaviors to avoid the complex interference (which seems to be 
a bit not trackable). We could do so by inserting suitable barriers for input 
channels received but not processed EndOfPartition. For example, if a task with 
four inputs has received barrier 2 from two input channels, but the other two 
inputs do not received barrier 2  before EndOfPartition due to the precedent 
tasks are finished, we could then insert barrier 2 for the last two channels so 
that we could still finish the checkpoint 2.
    2. As we have discussed, if a tasks finished during we triggering the 
tasks, it would cause checkpoint failure and we should re-trigger its 
descendants. But if possible we think we might skip this issue at the first 
version to reduce the implementation complexity since it should not affect the 
correctness. We could considering support it in the following versions.

    3. We would have to add a field isFinished  to OperatorState so that we 
could not re-run finished sources after failover. However, this would require a 
new version of checkpoint meta. Currently Flink have an abstract 
MetaV2V3SerializerBase and have V2 and V3 extends it to share some 
implementation. To add V4 which is only different from V3 for one field, the 
current PoC want to introduce a new MetaV3V4SerializerBase extends 
MetaV2V3SerializerBase to share implementation between V3 and V4. This might 
looks a little complex and we might need a general mechanism to extend 
checkpoint meta format. 

    4. With the change StreamTask would have two types of subclasses according 
to how to implement triggerCheckpoint, one is source tasks that perform 
checkpoints immediately and another is the non-source tasks that would notify 
CheckpointBarrierHandler in some way. However, since we have multiple source 
tasks (legacy and new source) and multiple non-source tasks (one-input, 
two-input, multiple-input), it would cause the cases that multiple subclasses 
share the same implementation and  cause code repetition. Currently the PoC 
introduces a new level of abstraction, namely SourceStreamTasks and 
NonSourceStreamTasks, but what makes it more complicated is that 
StreamingIterationHead extends OneInputStreamTask but it need to perform 
checkpoint as source tasks.

Glad to hear your opinions!

Best,
 Yun

[1] https://github.com/gaoyunhaii/flink/commits/try_checkpoint_6 , starts from 
commit f8005be1ab5e5124e981e56db7bdf2908f4a969a.

Reply via email to