Hi Till, Very thanks for the feedbacks ! > 1) When restarting all tasks independent of the status at checkpoint time > (finished, running, scheduled), we might allocate more resources than we > actually need to run the remaining job. From a scheduling perspective it > would be easier if we already know that certain subtasks don't need to be > rescheduled. I believe this can be an optimization, though. > 2) In the section Compatibility, Deprecation and Migration Plan you mentioned > that you want to record operators in the CompletedCheckpoint which are fully > finished. How will this information be used for constructing a recovered > ExecutionGraph? Why wouldn't the same principle work for the task level?
I think the first two issues should be related. The main reason that with external checkpoints the checkpoint might taken from one job and used in another jobs, but we do not have a unique ID to match tasks across jobs. Furthermore, users may also change the parallelism of JobVertex, or even modify the graph structures by adding/removing operators or changing the chain relationship between operators. On the other side, currently Flink already provides custom UID for operators, which makes the operators a stable unit for recovery. The current checkpoints are also organized in the unit of operators to support rescale and job Upgrading. When restarting from a checkpoint with finished operators, we could only starts the tasks with operators that are not fully finished (namely some subtasks are still running when taking checkpoints). Then during the execution of a single task, we only initialize/open/run/close the operators not fully finished. The Scheduler should be able to compute if a tasks contains not fully finished operators with the current JobGraph and the operator finish states restored from the checkpoints. > 3) How will checkpointing work together with fully bounded jobs and FLIP-1 > (fine grained recovery)? Currently I think it should be compatible with fully bounded jobs and FLIP-1 since it could be viewed as a completion of the current checkpoint mechanism. Concretely 1. The batch job (with blocking execution mode) should be not affected since checkpoints are not enabled in this case. 2. The bounded job running with pipeline mode would be also supported with checkpoints during it is finishing with the modification. As discussed in the FLIP it should not affect the current behavior after restored for almost all the jobs. 3. The region failover and more fine-grained tasks should also not be affected: similar to the previous behavior, after failover, the failover policy (full/region/fine-grained) decides which tasks to restart and the checkpoint only decides what state are restored for these tasks. The only difference with this modification is that these tasks are now might restored from a checkpoints taken after some tasks are finished. Since the perviously finished tasks would always be skipped by not started or run an empty execution, and the behavior of the previously running tasks should keeps unchanged, the overall behavior should be not affected. Best, Yun ------------------------------------------------------------------ From:Till Rohrmann <trohrm...@apache.org> Send Time:2020 Oct. 13 (Tue.) 17:25 To:Yun Gao <yungao...@aliyun.com> Cc:Arvid Heise <ar...@ververica.com>; Flink Dev <d...@flink.apache.org>; User-Flink <user@flink.apache.org> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Thanks for starting this discussion Yun Gao, I have three comments/questions: 1) When restarting all tasks independent of the status at checkpoint time (finished, running, scheduled), we might allocate more resources than we actually need to run the remaining job. From a scheduling perspective it would be easier if we already know that certain subtasks don't need to be rescheduled. I believe this can be an optimization, though. 2) In the section Compatibility, Deprecation and Migration Plan you mentioned that you want to record operators in the CompletedCheckpoint which are fully finished. How will this information be used for constructing a recovered ExecutionGraph? Why wouldn't the same principle work for the task level? 3) How will checkpointing work together with fully bounded jobs and FLIP-1 (fine grained recovery)? Cheers, Till On Tue, Oct 13, 2020 at 9:30 AM Yun Gao <yungao...@aliyun.com> wrote: Hi Arvid, Very thanks for the comments! >>> 4) Yes, the interaction is not trivial and also I have not completely >>> thought it through. But in general, I'm currently at the point where I >>> think that we also need non-checkpoint related events in unaligned >>> checkpoints. So just keep that in mind, that we might converge anyhow at >>> this point. I also agree with that it would be better to keep the unaligned checkpoints behavior on EndOfPartition, I will then double check on this issue again. >>> In general, what is helping in this case is to remember that there no >>> unaligned checkpoint barrier ever going to overtake EndOfPartition. So, we >>> can completely ignore the problem on how to store and restore output >>> buffers of a completed task (also important for the next point). Exactly, we should not need to persist the output buffers for the completed tasks, and that would simply the implementation a lot. >>> 5) I think we are on the same page and I completely agree that for the >>> MVP/first version, it's completely fine to start and immediately stop. A >>> tad better would be even to not even start the procession loop. I also agree with this part. We would keep optimizing the implementation after the first version. Best, Yun