> if a failure happens after sequence of finish() -> snapshotState(), but before notifyCheckpointComplete(), we will restore such a state and we might end up sending some more records to such an operator.
I probably missed sth here. isn't this the case today already? Why is it a concern for the proposed change? On Wed, Jul 21, 2021 at 4:39 AM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi Dawid, > > Thanks for writing down those concerns. > > I think the first issue boils down what should be the contract of lifecycle > methods like open(), close(), initializeState() etc and especially the new > additions like finish() and endInput(). And what should be their relation > with the operator state (regardless of it's type keyed, non-keyed, union, > ...). Should those methods be tied to state or not? After thinking about it > for a while (and discussing it offline with Dawid), I think the answer > might be no, they shouldn't. I mean maybe we should just openly say that > all of those methods relate to this single particular instance and > execution of the operator. And if a job is recovered/rescaled, we would be > allowed to freely resume consumption, ignoring a fact that maybe some parts > of the state have previously seen `endInput()`. Why? > > 0. Yes, it might be confusing. Especially with `endInput()`. We call > `endInput()`, we store something in a state and later after recovery > combined with rescaling that state can see more records? Indeed weird, > 1. I haven't come up yet with a counterexample that would break and make > impossible to implement a real life use case. Theoretically yes, the user > can store `endInput()` on state, and after rescaling this state would be > inconsistent with what is actually happening with the operator, but I > haven't found a use case that would break because of that. > 2. Otherwise, implementation would be very difficult. > 3. It's difficult to access keyed state from within `endInput()`/`finish()` > calls, as they do not have key context. > 4. After all, openly defining `endInput()` and `finish()` to be tied with > it's operator execution instance lifecycle is not that strange and quite > simple to explain. Sure, it can lead to a bit of confusion (0.), but that > doesn't sound that bad in comparison with the alternatives that I'm aware > of. Also currently methods like `open()` and `close()` are also tied to the > operator execution instance, not to the state. Operators can be opened and > closed multiple times, it doesn't mean that the state is lost after closing > an operator. > > For the UnionListState problem I have posted my proposal in the ticket [1], > so maybe let's move that particular discussion there? > > Piotrek > > [1] https://issues.apache.org/jira/browse/FLINK-21080 > > śr., 21 lip 2021 o 12:39 Dawid Wysakowicz <dwysakow...@apache.org> > napisał(a): > > > Hey all, > > > > To make the issues that were found transparent to the community, I want > to > > post an update: > > > > *1. Committing side-effects* > > We do want to make sure that all side effects are committed before > > bringing tasks down. Side effects are committed when calling > > notifyCheckpointComplete. For the final checkpoint we introduced the > method > > finish(). This notifies the operator that we have consumed all incoming > > records and we are preparing to close the Task. In turn we should flush > any > > pending buffered records and prepare to commit last transactions. The > goal > > is that after a successful sequence of finish() -> snapshotState() -> > > notifyCheckpointComplete(), the remaining state can be considered > > empty/finished and may be discarded. > > > > *Failure before notifyCheckpointComplete()* > > > > The question is what is the contract of the endInput()/finish() methods > > and how do calling these methods affect the operators keyed, non-keyed > > state and external state. Is it allowed to restore state snapshot taken > > after calling endInput()/finish() and process more records? Or do we > assume > > that after a restore from such a state taken after finish() we should not > > call any of the lifecycle methods or at least make sure those methods do > > not emit records/interact with mailbox etc. > > > > Currently it is possible that if a failure happens after sequence of > > finish() -> snapshotState(), but before notifyCheckpointComplete(), we > will > > restore such a state and we might end up sending some more records to > such > > an operator. It is possible if we rescale and this state is merged with a > > state of a subtask that has not called finish() yet. It can also happen > if > > we rescale the upstream operator and the subtask of interest becomes > > connected to a newly added non finished subtask. > > > > *Snapshotting StreamTasks that finish() has been called* > > > > > > We thought about putting a flag into the snapshot of a subtask produced > > after the finish() method. This would make it possible to skip execution > of > > certain lifecycle methods. Unfortunately this creates problems for > > rescaling. How do we deal with a situation that subtask states with both > > the feature flag set and unset end up in a single StreamTask. Additional > > problem is that we merge those states into a single OperatorSubtaskState > on > > CheckpointCoordinator. > > > > *Finishing upon receiving notifyCheckpointComplete() of not the latest > > checkpoint* > > > > We need to wait for a checkpoint to complete, that started after the > > finish() method. However, we support concurrent checkpoints therefore, > > there might be later checkpoints that completed, but the notification has > > not arrived. We must make sure those checkpoints do not leave lingering > > external resources. > > > > *Checkpointing from a single subtask / UnionListState case* > > There are operators that checkpoint from a single subtask only. Usually > > from the subtask index=0. If we let those subtasks finish, subsequent > > checkpoints will miss this information. > > Esp. Legacy sources problem: > > https://issues.apache.org/jira/browse/FLINK-21080 > > > > Best, > > > > Dawid > > On 19/07/2021 15:10, Yun Gao wrote: > > > > Hi Till, Dawid, > > > > Very thanks for the comments and discussion! Glad that it seems we have > > come to a convergence, and I also agree with that we could not include > the > > optimization in the first version. > > > > Best > > Yun > > > > ------------------------------------------------------------------ > > From:Dawid Wysakowicz <dwysakow...@apache.org> <dwysakow...@apache.org> > > Send Time:2021 Jul. 19 (Mon.) 17:51 > > To:dev <dev@flink.apache.org> <dev@flink.apache.org>; Till Rohrmann > > <trohrm...@apache.org> <trohrm...@apache.org> > > Cc:Yun Gao <yungao...@aliyun.com> <yungao...@aliyun.com>; Yun Gao > > <yungao...@aliyun.com.invalid> <yungao...@aliyun.com.invalid> > > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks > > Finished > > > > Small correction. I meant we need to adjust the EndOfInputEvent of > course. > > > > Best, > > > > Dawid > > > > On 19/07/2021 11:48, Dawid Wysakowicz wrote: > > > Hey Till, > > > > > > Yes, you're right we will have to adjust the current state of > > > EndOfPartitionEvent and move the moment when we emit it to have what > > > we're discussing here. We are aware of that. > > > > > > As for the MAX_WATERMARK vs finish(). My take is that we should always > > > emit MAX_WATERMARK before calling finish() on an operator. At the same > > > time finish() should not leave behind anything in state, as the > > > intention is that we never restore from the taken savepoint/checkpoint > > > (savepoint w drain or bounded data consumed). > > > > > > Best, > > > > > > Dawid > > > > > > On 19/07/2021 11:33, Till Rohrmann wrote: > > >> Hi Yun and Dawid, > > >> > > > > >> Thanks for your comments. I do agree with your comments that finish() > can > > >> do more than MAX_WATERMARK. I guess we should then explain how > > >> MAX_WATERMARK and finish() play together and what kind of > > >> order guarantees we provide. > > >> > > > > >> Concerning the EndOfPartitionEvent, I am not entirely sure whether it > would > > > > >> work in its current state because we send this event when the Task is > about > > >> to shut down if I am not mistaken. What we want to have is to bring > the > > > > >> StreamTasks into a state so that they shut down on the next > checkpoint. For > > >> this we need to keep the StreamTask running. In general, I am a fan of > > >> making things explicit if possible. I think this helps maintenance and > > > > >> evolvability of code. That's why I think sending an EndOfInputEvent > which > > >> is a StreamTask level event and which says that there won't be any > other > > >> records coming only control events could make sense. > > >> > > >> I would leave the proposed optimization out of the first version. We > can > > >> still add it at a later point in time. > > >> > > >> Cheers, > > >> Till > > >> > > >> On Mon, Jul 19, 2021 at 10:35 AM Dawid Wysakowicz > > <dwysakow...@apache.org> <dwysakow...@apache.org> > > >> wrote: > > >> > > > > >>> Personally I don't find this optimization important and I'd rather > leave > > > > >>> it out not to complicate the codebase further. I doubt we save much > there. > > >>> I don't have a strong opinion though. > > >>> > > >>> Best, > > >>> > > >>> Dawid > > >>> On 19/07/2021 10:31, Yun Gao wrote: > > >>> > > >>> Hi, > > >>> > > >>> Very thanks Dawid for the thoughts! > > >>> > > >>> Currently I also do not have different opinions regarding this part. > > >>> But I have one more issue to confirm: during the previous discussion > we > > >>> have discussed that for the final checkpoint case, we might have an > > >>> optmization > > > > >>> that if a task do not have operators using 2-pc, we might skip > waiting for > > >>> the > > > > >>> final checkpoint (but we could not skip the savepoint). To allow > users to > > >>> express > > >>> the logic, we have proposed to add one more method to StreamOperator > & > > >>> CheckpointListener: > > >>> > > >>> interface StreamOperator { > > >>> default boolean requiresFinalCheckpoint() { > > >>> return true; > > >>> } > > >>> } > > >>> > > >>> interface CheckpointListener { > > >>> > > >>> default boolean requiresFinalCheckpoint() { > > >>> return true; > > >>> } > > >>> } > > >>> > > >>> class AbstractUdfStreamOperator { > > >>> > > >>> @Override > > >>> boolean requiresFinalCheckpoint() { > > >>> return userFunction instanceof CheckpointListener && > > > > >>> ((CheckpointListener) > userFunction).requiresFinalCheckpoint(); > > >>> } > > >>> } > > >>> > > >>> > > >>> I think we should still keep the change ? > > >>> > > >>> Best, > > >>> Yun > > >>> > > >>> ------------------Original Mail ------------------ > > >>> *Sender:*Dawid Wysakowicz <dwysakow...@apache.org> > > <dwysakow...@apache.org> > > >>> <dwysakow...@apache.org> <dwysakow...@apache.org> > > >>> *Send Date:*Sun Jul 18 18:44:50 2021 > > >>> *Recipients:*Flink Dev <dev@flink.apache.org> <dev@flink.apache.org> > > <dev@flink.apache.org> <dev@flink.apache.org>, Yun > > >>> Gao <yungao...@aliyun.com.INVALID> <yungao...@aliyun.com.INVALID> > > <yungao...@aliyun.com.INVALID> <yungao...@aliyun.com.INVALID> > > >>> *Subject:*Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks > > >>> Finished > > >>> > > >>>> I think we're all really close to the same solution. > > >>>> > > >>>> > > >>>> > > >>>> I second Yun's thoughts that MAX_WATERMARK works well for time based > > >>>> > > > > >>>> buffering, but it does not solve flushing other operations such as > e.g. > > >>>> > > >>>> count windows or batching requests in Sinks. I'd prefer to treat the > > >>>> > > >>>> finish() as a message for Operator to "flush all records". The > > >>>> > > > > >>>> MAX_WATERMARK in my opinion is mostly for backwards compatibility > imo. I > > >>>> > > >>>> don't think operators need to get a signal "stop-processing" if they > > >>>> > > >>>> don't need to flush records. The "WHEN" records are emitted, should > be > > >>>> > > >>>> in control of the StreamTask, by firing timers or by processing a > next > > >>>> > > >>>> record from upstream. > > >>>> > > >>>> > > >>>> > > > > >>>> The only difference of my previous proposal compared to Yun's is > that I > > >>>> > > >>>> did not want to send the EndOfUserRecords event in case of stop w/o > > >>>> > > >>>> drain. My thinking was that we could directly go from RUNNING to > > >>>> > > >>>> WAITING_FOR_FINAL_CP on EndOfPartitionEvent. I agree we could emit > > >>>> > > >>>> EndOfUserRecordsEvent with an additional flag and e.g. stop firing > > >>>> > > > > >>>> timers and processing events (without calling finish() on > Operator). In > > >>>> > > >>>> my initial suggestion I though we don't care about some events > > >>>> > > >>>> potentially being emitted after the savepoint was taken, as they > would > > >>>> > > > > >>>> anyway belong to the next after FINAL, which would be discarded. I > think > > >>>> > > >>>> though the proposal to suspend records processing and timers is a > > >>>> > > > > >>>> sensible thing to do and would go with the version that Yun put > into the > > >>>> > > >>>> FLIP Wiki. > > >>>> > > >>>> > > >>>> > > >>>> What do you think Till? > > >>>> > > >>>> > > >>>> > > >>>> Best, > > >>>> > > >>>> > > >>>> > > >>>> Dawid > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> On 16/07/2021 10:03, Yun Gao wrote: > > >>>> > > >>>>> Hi Till, Piotr > > >>>>> Very thanks for the comments! > > >>>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK? > > > > >>>>> I also agree with Piotr that currently they are independent > mechanisms, > > >>>> and they are basically the same > > >>>> > > >>>>> for the event time. > > >>>>> For more details, first there are some difference among the three > > >>>> scenarios regarding the finish: > > >>>> > > > > >>>>> For normal finish and stop-with-savepoint --drain, the job would > not be > > >>>> expected to be restarted, > > >>>> > > >>>>> and for stop-with-savepoint the job would be expected restart > later. > > >>>>> Then for finish / stop-with-savepoint --drain, currently Flink > would > > >>>> emit MAX_WATERMARK before the > > >>>> > > > > >>>>> EndOfPartition. Besides, as we have discussed before [1], > endOfInput / > > >>>> finish() should also only be called > > >>>> > > >>>>> for finish / stop-with-savepoint --drain. Thus currently they > always > > >>>> occurs at the same time. After the change, > > >>>> > > >>>>> we could emit MAX_WATERMARK before endOfInput event for the finish > / > > >>>> stop-with-savepoint --drain cases. > > >>>> > > >>>>>> 2) StreamOperator.finish says to flush all buffered events. Would > a > > >>>>>> WindowOperator close all windows and emit the results upon calling > > >>>>>> finish, for example? > > >>>>> As discussed above for stop-with-savepoint, we would always keep > the > > >>>> window as is, and restore them after restart. > > >>>> > > >>>>> Then for the finish / stop-with-savepoint --drain, I think perhaps > it > > >>>> depends on the Triggers. For > > >>>> > > > > >>>>> event-time triggers / process time triggers, it would be > reasonable to > > >>>> flush all the windows since logically > > >>>> > > > > >>>>> the time would always elapse and the window would always get > triggered > > >>>> in a logical future. But for triggers > > >>>> > > > > >>>>> like CountTrigger, no matter how much time pass logically, the > windows > > >>>> would not trigger, thus we may not > > >>>> > > >>>>> flush these windows. If there are requirements we may provide > > >>>> additional triggers. > > >>>> > > > > >>>>>> It's a bit messy and I'm not sure if this should be strengthened > out? > > >>>> Each one of those has a little bit different semantic/meaning, > > >>>> > > > > >>>>>> but at the same time they are very similar. For single input > operators > > >>>> `endInput()` and `finish()` are actually the very same thing. > > >>>> > > > > >>>>> Currently MAX_WATERMARK / endInput / finish indeed always happen > at the > > >>>> same time, and for single input operators `endInput()` and > `finish()` > > >>>> > > > > >>>>> are indeed the same thing. During the last discussion we ever > mentioned > > >>>> this issue and at then we thought that we might deprecate > `endInput()` > > >>>> > > >>>>> in the future, then we would only have endInput(int input) and > > >>>> finish(). > > >>>> > > >>>>> Best, > > >>>>> Yun > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-21132 > > >>>>> ------------------------------------------------------------------ > > >>>>> From:Piotr Nowojski > > >>>>> Send Time:2021 Jul. 16 (Fri.) 13:48 > > >>>>> To:dev > > >>>>> Cc:Yun Gao > > >>>>> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks > > >>>> Finished > > >>>> > > >>>>> Hi Till, > > >>>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK? > > >>>>>> 2) StreamOperator.finish says to flush all buffered events. Would > a> > > >>>> WindowOperator close all windows and emit the results upon calling > > >>>> > > >>>>>> finish, for example? > > >>>>> 1) currently they are independent but parallel mechanisms. With > event > > >>>> time, they are basically the same. > > >>>> > > >>>>> 2) it probably should for the sake of processing time windows. > > >>>>> Here you are touching the bit of the current design that I like the > > > > >>>> least. We basically have now three different ways of conveying very > similar > > >>>> things: > > >>>> > > >>>>> a) sending `MAX_WATERMARK`, used by event time WindowOperator (what > > >>>> about processing time?) > > >>>> > > >>>>> b) endInput(), used for example by AsyncWaitOperator to flush it's > > >>>> internal state > > >>>> > > >>>>> c) finish(), used for example by ContinuousFileReaderOperator > > >>>>> It's a bit messy and I'm not sure if this should be strengthened > out? > > > > >>>> Each one of those has a little bit different semantic/meaning, but > at the > > > > >>>> same time they are very similar. For single input operators > `endInput()` > > >>>> and `finish()` are actually the very same thing. > > >>>> > > >>>>> Piotrek > > >>>>> czw., 15 lip 2021 o 16:47 Till Rohrmann napisał(a): > > >>>>> Thanks for updating the FLIP. Based on the new section about > > >>>>> stop-with-savepoint [--drain] I got two other questions: > > >>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK? > > >>>>> 2) StreamOperator.finish says to flush all buffered events. Would a > > >>>>> WindowOperator close all windows and emit the results upon calling > > >>>>> finish, for example? > > >>>>> Cheers, > > >>>>> Till > > >>>>> On Thu, Jul 15, 2021 at 10:15 AM Till Rohrmann wrote: > > >>>>>> Thanks a lot for your answers and clarifications Yun. > > >>>>>> 1+2) Agreed, this can be a future improvement if this becomes a > > >>>> problem. > > >>>> > > >>>>>> 3) Great, this will help a lot with understanding the FLIP. > > >>>>>> Cheers, > > >>>>>> Till > > >>>>>> On Wed, Jul 14, 2021 at 5:41 PM Yun Gao > > >>>>>> wrote: > > >>>>>>> Hi Till, > > >>>>>>> Very thanks for the review and comments! > > >>>>>>> 1) First I think in fact we could be able to do the computation > > >>>> outside > > >>>> > > >>>>>>> of the main thread, > > >>>>>>> and the current implementation mainly due to the computation is > in > > >>>>>>> general fast and we > > >>>>>>> initially want to have a simplified first version. > > > > >>>>>>> The main requirement here is to have a constant view of the > state of > > >>>> the > > >>>> > > >>>>>>> tasks, otherwise > > >>>>>>> for example if we have A -> B, if A is running when we check if > we > > >>>> need > > >>>> > > >>>>>>> to trigger A, we will > > >>>>>>> mark A as have to trigger, but if A gets to finished when we > check > > >>>> B, we > > >>>> > > >>>>>>> will also mark B as > > > > >>>>>>> have to trigger, then B will receive both rpc trigger and > checkpoint > > >>>>>>> barrier, which would break > > >>>>>>> some assumption on the task side and complicate the > implementation. > > > > >>>>>>> But to cope this issue, we in fact could first have a snapshot > of the > > >>>>>>> tasks' state and then do the > > > > >>>>>>> computation, both the two step do not need to be in the main > thread. > > > > >>>>>>> 2) For the computation logic, in fact currently we benefit a lot > from > > >>>>>>> some shortcuts on all-to-all > > > > >>>>>>> edges and job vertex with all tasks running, these shortcuts > could do > > >>>>>>> checks on the job vertex level > > >>>>>>> first and skip some job vertices as a whole. With this > optimization > > >>>> we > > >>>> > > >>>>>>> have a O(V) algorithm, and the > > >>>>>>> current running time of the worst case for a job with 320,000 > tasks > > >>>> is > > >>>> > > >>>>>>> less than 100ms. For > > >>>>>>> daily graph sizes the time would be further reduced linearly. > > >>>>>>> If we do the computation based on the last triggered tasks, we > may > > >>>> not > > >>>> > > >>>>>>> easily encode this information > > > > >>>>>>> into the shortcuts on the job vertex level. And since the time > seems > > >>>> to > > >>>> > > >>>>>>> be short, perhaps it is enough > > >>>>>>> to do re-computation from the scratch in consideration of the > > >>>> tradeoff > > >>>> > > >>>>>>> between the performance and > > >>>>>>> the complexity ? > > >>>>>>> 3) We are going to emit the EndOfInput event exactly after the > > >>>> finish() > > >>>> > > >>>>>>> method and before the last > > > > >>>>>>> snapshotState() method so that we could shut down the whole > topology > > >>>> with > > >>>> > > >>>>>>> a single final checkpoint. > > >>>>>>> Very sorry for not include enough details for this part and I'll > > >>>>>>> complement the FLIP with the details on > > >>>>>>> the process of the final checkpoint / savepoint. > > >>>>>>> Best, > > >>>>>>> Yun > > >>>>>>> > ------------------------------------------------------------------ > > >>>>>>> From:Till Rohrmann > > >>>>>>> Send Time:2021 Jul. 14 (Wed.) 22:05 > > >>>>>>> To:dev > > >>>>>>> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After > Tasks > > >>>>>>> Finished > > >>>>>>> Hi everyone, > > >>>>>>> I am a bit late to the voting party but let me ask three > questions: > > > > >>>>>>> 1) Why do we execute the trigger plan computation in the main > thread > > >>>> if we > > >>>> > > > > >>>>>>> cannot guarantee that all tasks are still running when > triggering the > > >>>>>>> checkpoint? Couldn't we do the computation in a different thread > in > > >>>> order > > >>>> > > >>>>>>> to relieve the main thread a bit. > > >>>>>>> 2) The implementation of the DefaultCheckpointPlanCalculator > seems > > >>>> to go > > >>>> > > >>>>>>> over the whole topology for every calculation. Wouldn't it be > more > > >>>>>>> efficient to maintain the set of current tasks to trigger and > check > > >>>>>>> whether > > >>>>>>> anything has changed and if so check the succeeding tasks until > we > > >>>> have > > >>>> > > >>>>>>> found the current checkpoint trigger frontier? > > >>>>>>> 3) When are we going to send the endOfInput events to a > downstream > > >>>> task? > > >>>> > > >>>>>>> If > > > > >>>>>>> this happens after we call finish on the upstream operator but > before > > >>>>>>> snapshotState then it would be possible to shut down the whole > > >>>> topology > > >>>> > > > > >>>>>>> with a single final checkpoint. I think this part could benefit > from > > >>>> a bit > > >>>> > > >>>>>>> more detailed description in the FLIP. > > >>>>>>> Cheers, > > >>>>>>> Till > > >>>>>>> On Fri, Jul 2, 2021 at 8:36 AM Yun Gao > > >>>>>>> wrote: > > >>>>>>>> Hi there, > > >>>>>>>> Since the voting time of FLIP-147[1] has passed, I'm closing the > > >>>> vote > > >>>> > > >>>>>>> now. > > >>>>>>>> There were seven +1 votes ( 6 / 7 are bindings) and no -1 votes: > > >>>>>>>> - Dawid Wysakowicz (binding) > > >>>>>>>> - Piotr Nowojski(binding) > > >>>>>>>> - Jiangang Liu (binding) > > >>>>>>>> - Arvid Heise (binding) > > >>>>>>>> - Jing Zhang (binding) > > >>>>>>>> - Leonard Xu (non-binding) > > >>>>>>>> - Guowei Ma (binding) > > >>>>>>>> Thus I'm happy to announce that the update to the FLIP-147 is > > >>>> accepted. > > >>>> > > >>>>>>>> Very thanks everyone! > > >>>>>>>> Best, > > >>>>>>>> Yun > > >>>>>>>> [1] https://cwiki.apache.org/confluence/x/mw-ZCQ > > >>>> > > >>>> > > > > >