Hi all, Very thanks for all the discussions!
Regarding the operator API on exit, I'm also glad that we are reaching consistency roughly. Based on the previous discussions I'm also support doing the final checkpoint after the previous "close" method, and we rename these methods to make them more clear. But I'm still have concerns on how we achieve the final checkpoint after emitting the EndOfPartitionEvent, which has closed the network channels ? Suppose we have A -> B and both A and B need to wait for the final checkpoint, there might be several options from my side: 1. We might introduce a new type of event to notify the endOfInput() though the graph first, and then A/B waits for the final checkpoint, then A emit EndOfPartitionEvent to exit all the tasks as now. 2. We directly use EndOfPartitionEvent to notify the endOfInput() throw the graph first, but we also notify JM (or at least CheckpointCoordinator) that these tasks are in a new state TERMINATING (or FINISHING). Then when triggering checkpoint the CheckpointCoordinator would treat these tasks differently by not considering the edges emitted EndOfPartitionEvent. In this case, we would trigger A and B separately and there is not need for A to broadcast the barriers to B. 3. We still use EndOfPartitionEvent to notify the endOfInput() throw the graph first and do not notify JM. Then for all the checkpoints we would need to trigger all the running tasks. Like in this case, both A and B are running and CheckpointCoordinator do not know whether the network between them are closed, then it has to assume the worst case and trigger both A and B. But this option would introduce more overhead compared with that we only trigger sources and broadcast barriers in task side. For whether we allow the operators and tasks that do not need to commit side effect to exit first, I think it is also related to the above options: if we go towards 1, the tasks would still need to exit from the source, and if we go towards 2/3, we could be able to allow these tasks to finish first. Regarding the watermark, > There is a pre-existing issue > that watermarks are not checkpointed/stored on state, and there was/is now > clear answer how we should handle this as far as I remember. One > problematic case are two/multiple input tasks or UnionInputGate, where > combined watermark is the min of all inputs (held in memory). The problem > so far is a bit benign, as after recovery we are losing the combined > watermark value, but it's being slowly/lazily restored, as new watermarks > are sent from the sources. With finished sources that won't be a case. Very thanks for the further explain and it should indeed be a problem. Since now for stop-with-savepoint --drain we always emit advance the watermark to MAX, should we also need to do it for normal exit ? If so, since now for recovery after some tasks finished we would first start all the tasks and stop the finished tasks directly, I think for simplicity we could first emit a new MAX watermark from the sources before or with EndOfpartitionEvent as till suggested, and for the long run we could also consider snapshotting the min watermark if we are going to not start the finished tasks directly. Regarding the global-commit-handles, I also like to proposal of the global-committer-handler, From the sinks' view I'm also lean towards emit these handler after notifyCheckpointComplete, but we could create these handlers in snapshotState() so that we could also include them in the checkpoint. For example, the global committer handler might be write meta store for FileSystem/Hive sinks, and these should happen after all the pending files are renamed to the final ones, otherwise the downstream jobs might miss some files if they relies on the meta store to identify ready partitions. Thus we would have to emit the global-committer-handler after notifyCheckpointComplete. But since we could be able to know the list of files to rename in snapshotState(), we could create the global-committer-handler and store them there. Also since we might want to keep the order, the operator coordinator might not relies on its own notifyCheckpointComplete() notification, but wait for the the operator to notify it about the checkpoint complete after the operator has finished its processing first. Best, Yun ------------------------------------------------------------------ From:Piotr Nowojski <pnowoj...@apache.org> Send Time:2021 Mar. 4 (Thu.) 17:16 To:Kezhu Wang <kez...@gmail.com> Cc:dev <dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com>; jingsongl...@gmail.com <jingsongl...@gmail.com>; Guowei Ma <guowei....@gmail.com>; Till Rohrmann <trohrm...@apache.org> Subject:Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi Kezhu, What do you mean by “end-flushing”? I was suggesting to just keep `endOfInput()` and `dispose()`. Are you suggesting to have a one `endFlushing()` method, that is called after quiescing timers/mailbox, but before final checkpoint and `dispose()`? Are we sure we really need this extra call? Note. If we don't need it at the moment, we could always introduce it in the future, while if we don't and won't need it, why complicate the API? About the idea of returning the global-commit-handle from `notifyCheckpointComplete()` call. Note it will be more difficult to implement, as `CheckpointCoordinator` will need to have one extra stage of waiting for some actions to complete. Implementation will probably be easier if we return the global-commit-handle from `snapshotState()` call. Also, `global-commit-handles` will need to be part of the checkpoint. They will need to be restored/recovered in case of failure. Because of that it might be actually impossible to implement those handles as returned from `notifyCheckpointComplete()`. In this solution we would be in a precarious position if the main checkpoint succeeded, CheckpointCoordinator would start issuing `notifyCheckpointComplete()`, but persisting of the handles would fail/keep failing. How would we recover from such a situation? We can not recover to a previous checkpoint (`notifyCheckpointComplete()` were already issued), but at the same time the current checkpoint is not fully completed (global-commit-handles can not be checkpointed). Best, Piotrek czw., 4 mar 2021 o 06:33 Kezhu Wang <kez...@gmail.com> napisał(a):