Hi Yun, Thank you for starting the discussion. This will solve one of the long-standing issues [1] that confuse users. I'm also a big fan of option 3. It is also a bit closer to Chandy-Lamport again.
A couple of comments: 1) You call the tasks that get the barriers injected leaf nodes, which would make the sinks the root nodes. That is very similar to how graphs in relational algebra are labeled. However, I got the feeling that in Flink, we rather iterate from sources to sink, making the sources root nodes and the sinks the leaf nodes. However, I have no clue how it's done in similar cases, so please take that hint cautiously. 2) I'd make the algorithm to find the subtasks iterative and react in CheckpointCoordinator. Let's assume that we inject the barrier at all root subtasks (initially all sources). So in the iterative algorithm, whenever root A finishes, it looks at all connected subtasks B if they have any upstream task left. If not B becomes a new root. That would require to only touch a part of the job graph, but would require some callback from JobManager to CheckpointCoordinator. 2b) We also need to be careful for out-of-sync updates: if the root is about to finish, we could send the barrier to it from CheckpointCoordinator, but at the time it arrives, the subtask is finished already. 3) An implied change is that checkpoints are not aborted anymore at EndOfPartition, which is good, but might be explicitly added. 4) The interaction between unaligned checkpoint and EndOfPartition is a bit ambiguous: What happens when an unaligned checkpoint is started and then one input channel contains the EndOfPartition event? From the written description, it sounds to me like, we move back to an aligned checkpoint for the whole receiving task. However, that is neither easily possible nor necessary. Imho it would be enough to also store the EndOfPartition in the channel state. 5) I'd expand the recovery section a bit. It would be the first time that we recover an incomplete DAG. Afaik the subtasks are deployed before the state is recovered, so at some point, the subtasks either need to be removed again or maybe we could even avoid them being created in the first place. [1] https://issues.apache.org/jira/browse/FLINK-2491 On Fri, Oct 9, 2020 at 8:22 AM Yun Gao <yungao...@aliyun.com> wrote: > Hi, devs & users > > Very sorry for the spoiled formats, I resent the discussion as follows. > > > As discussed in FLIP-131[1], Flink will make DataStream the unified API for > processing bounded and unbounded data in both streaming and blocking modes. > However, one long-standing problem for the streaming mode is that currently > Flink does not s > > upport checkpoints after some tasks finished, which causes some problems for > bounded or mixed jobs: > 1. > Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed > before committed to external systems in streaming mode. If sources are > bounded and checkpoints are disabled after some tasks are finished, the data > sent after the last checkpoint would always not be able to be committed. This > issue has already been reported some times in the user ML[2][3][4] and is > future brought up when working on FLIP-143: Unified Sink API [5]. > 2. > The jobs with both bounded and unbounded sources might have to replay a large > amount of records after failover due to no periodic checkpoints are taken > after the bounded sources finished. > > > Therefore, we propose to also support checkpoints after some tasks finished. > Your Could find more details in FLIP-147[6]. > > Best, > Yun > > [1] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741 > [2] > https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E > [3] > https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E > [4] > https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E > [5] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API > [6] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished > > ------------------Original Mail ------------------ > *Sender:*Yun Gao <yungao...@aliyun.com.INVALID> > *Send Date:*Fri Oct 9 14:16:52 2020 > *Recipients:*Flink Dev <d...@flink.apache.org>, User-Flink < > user@flink.apache.org> > *Subject:*[DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished > >> Hi, devs & users >> >> >> As discussed in FLIP-131 [1], Flink will make DataStream the unified API for >> processing bounded and unbounded data in both streaming and blocking modes. >> However, one long-standing problem for the streaming mode is that currently >> Flink does not support checkpoints after some tasks finished, which causes >> some problems for bounded or mixed jobs: >> >> Flink exactly-once sinks rely on checkpoints to ensure data won’t be >> replayed before committed to external systems in streaming mode. If sources >> are bounded and checkpoints are disabled after some tasks are finished, the >> data sent after the last checkpoint would always not be able to be >> committed. This issue has already been reported some times in the user >> ML[2][3][4] and is future brought up when working on FLIP-143: Unified Sink >> API [5]. >> >> The jobs with both bounded and unbounded sources might have to replay a >> large amount of records after failover due to no periodic checkpoints are >> taken after the bounded sources finished. >> >> Therefore, we propose to also support checkpoints after some tasks finished. >> Your Could find more details in FLIP-147[6]. >> Best, >> Yun >> >> [1] >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741 >> [2] >> https://lists.apache.org/thread.html/rea1ac2d82f646fcea1395b5738be495f144c5b0312a290a1d4a339c1%40%3Cuser.flink.apache.org%3E >> [3] >> https://lists.apache.org/thread.html/rad4adeec838093b8b56ae9e2ea6a937a4b1882b53045a12acb7e61ea%40%3Cuser.flink.apache.org%3E >> [4] >> https://lists.apache.org/thread.html/4cf28a9fa3732dfdd9e673da6233c5288ca80b20d58cee130bf1c141%40%3Cuser.flink.apache.org%3E >> [5] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API >> [6] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished > > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng