Hi all,

Very thanks for all the discussions!

Regarding the operator API on exit, I'm also glad that we are reaching 
consistency roughly. Based on the previous discussions
I'm also support doing the final checkpoint after the previous "close" method, 
and we rename these methods to make them
more clear. But I'm still have concerns on how we achieve the final checkpoint 
after emitting the EndOfPartitionEvent, which 
has closed the network channels ? Suppose we have A -> B and both A and B need 
to wait for the final checkpoint, there might
be several options from my side:
1. We might introduce a new type of event to notify the endOfInput() though the 
graph first, and then A/B waits for the final 
checkpoint, then A emit EndOfPartitionEvent to exit all the tasks as now.
2. We directly use EndOfPartitionEvent to notify the endOfInput() throw the 
graph first, but we also notify JM (or at least 
CheckpointCoordinator) that these tasks are in a new state TERMINATING (or 
FINISHING). Then when triggering checkpoint
the CheckpointCoordinator would treat these tasks differently by not 
considering the edges emitted EndOfPartitionEvent. In this
case, we would trigger A and B separately and there is not need for A to 
broadcast the barriers to B.
3. We still use EndOfPartitionEvent  to notify the endOfInput() throw the graph 
first and do not notify JM. Then for all the checkpoints 
we would need to trigger all the running tasks. Like in this case, both A and B 
are running and CheckpointCoordinator do not know 
whether the network between them are closed, then it has to assume the worst 
case and trigger both A and B. But this option would
introduce more overhead compared with that we only trigger sources and 
broadcast barriers in task side. 

For whether we allow the operators and tasks that do not need to commit side 
effect to exit first, I think it is also related to the above
options: if we go towards 1, the tasks would still need to exit from the 
source, and if we go towards 2/3, we could be able to allow these
tasks to finish first.

Regarding the watermark,
> There is a pre-existing issue
> that watermarks are not checkpointed/stored on state, and there was/is now
> clear answer how we should handle this as far as I remember. One
> problematic case are two/multiple input tasks or UnionInputGate, where
> combined watermark is the min of all inputs (held in memory). The problem
> so far is a bit benign, as after recovery we are losing the combined
> watermark value, but it's being slowly/lazily restored, as new watermarks
> are sent from the sources. With finished sources that won't be a case.

Very thanks for the further explain and it should indeed be a problem. Since 
now for stop-with-savepoint --drain we always emit
advance the watermark to MAX, should we also need to do it for normal exit ? If 
so, since now for recovery after some tasks finished
we would first start all the tasks and stop the finished tasks directly, I 
think for simplicity we could first emit a new MAX watermark 
from the sources before or with EndOfpartitionEvent as till suggested, and for 
the long run we could also consider snapshotting the
min watermark if we are going to not start the finished tasks directly.

Regarding the global-commit-handles,
I also like to proposal of the global-committer-handler, From the sinks' view 
I'm also lean towards emit these handler after notifyCheckpointComplete, 
but we could create these handlers in snapshotState() so that we could also 
include them in the checkpoint.

 For example, the global committer handler might be write meta store for 
FileSystem/Hive sinks, and these should happen after all the pending
files are renamed to the final ones, otherwise the downstream jobs might miss 
some files if they relies on the meta store to identify ready partitions.
Thus we would have to emit the global-committer-handler after 
notifyCheckpointComplete. But since we could be able to know the list of files
to rename in snapshotState(), we could create the global-committer-handler and 
store them there. 

Also since we might want to keep the order, the operator coordinator might not 
relies on its own notifyCheckpointComplete() notification,
but wait for the the operator to notify it about the checkpoint complete after 
the operator has finished its processing first.

Best,
Yun



------------------------------------------------------------------
From:Piotr Nowojski <pnowoj...@apache.org>
Send Time:2021 Mar. 4 (Thu.) 17:16
To:Kezhu Wang <kez...@gmail.com>
Cc:dev <dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com>; 
jingsongl...@gmail.com <jingsongl...@gmail.com>; Guowei Ma 
<guowei....@gmail.com>; Till Rohrmann <trohrm...@apache.org>
Subject:Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Kezhu,

What do you mean by “end-flushing”? I was suggesting to just keep 
`endOfInput()` and `dispose()`. Are you suggesting to have a one 
`endFlushing()` method, that is called after quiescing timers/mailbox, but 
before final checkpoint and `dispose()`? Are we sure we really need this extra 
call? Note. If we don't need it at the moment, we could always introduce it in 
the future, while if we don't and won't need it, why complicate the API?

About the idea of returning the global-commit-handle from 
`notifyCheckpointComplete()` call. Note it will be more difficult to implement, 
as `CheckpointCoordinator` will need to have one extra stage of waiting for 
some actions to complete. Implementation will probably be easier if we return 
the global-commit-handle from `snapshotState()` call.

Also, `global-commit-handles` will need to be part of the checkpoint. They will 
need to be restored/recovered in case of failure. Because of that it might be 
actually impossible to implement those handles as returned from 
`notifyCheckpointComplete()`. In this solution we would be in a precarious 
position if the main checkpoint succeeded, CheckpointCoordinator would start 
issuing `notifyCheckpointComplete()`, but persisting of the handles would 
fail/keep failing. How would we recover from such a situation? We can not 
recover to a previous checkpoint (`notifyCheckpointComplete()` were already 
issued), but at the same time the current checkpoint is not fully completed 
(global-commit-handles can not be checkpointed).

Best,
Piotrek



czw., 4 mar 2021 o 06:33 Kezhu Wang <kez...@gmail.com> napisał(a):


Reply via email to