I'd be fine to continue w/o a vote as long as the FLIP will contain all the clarifications we talked about in this thread and in FLINK-21080.
Cheers, Till On Fri, Jul 23, 2021 at 8:34 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > How do you feel about proceeding without an additional vote? > > I agree, we added quite a bit in there, but none of the added content > changes what has been voted so far. A big part of the added content can > be considered as clarifying the implementation. I'd be inclined to > proceed without a vote. WDYT? > > Best, > > Dawid > > > On 22/07/2021 14:55, Yun Gao wrote: > > Hi Till, > > > > Very thanks for the comments and tips! We will update the FLIP with the > new details and we > > should need a new vote since the FLIP is updated a large part, I think > we would start the > > vote right after we also solve the remaining UnionListState problem and > have a double > > check on that we have no other missing points for the design and FLIP~ > > > > Best, > > Yun > > > > > > ------------------------------------------------------------------ > > From:Till Rohrmann <trohrm...@apache.org> > > Send Time:2021 Jul. 22 (Thu.) 17:57 > > To:Yun Gao <yungao...@aliyun.com> > > Cc:Piotr Nowojski <pnowoj...@apache.org>; dev <dev@flink.apache.org>; > Yun Gao <yungao...@aliyun.com.invalid>; Piotr Nowojski < > pi...@ververica.com> > > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks > Finished > > > > Thanks everyone for this discussion. I think this is very helpful. > > > > I do agree with Piotr's proposal to separate state and the lifecycle of > a StreamOperator. That way the finished state can be used to recover a > StreamOperator or to rescale the topology. I also believe that this will > make the implementation a lot easier since we don't have to remember which > StreamOperator has actually finished. Moreover, it solves the problem of > rescaling. > > > > I also agree with the proposed solution for checkpoints after finish() > has been called. I guess the first checkpoint that was triggered after > finish() and sends notifyCheckpointComplete should be good enough to close > the StreamTask. > > > > Shall we update the FLIP with these details? Do we need another vote for > it or shall we continue w/o it? > > > > Cheers, > > Till > > On Thu, Jul 22, 2021 at 10:45 AM Yun Gao <yungao...@aliyun.com> wrote: > > > > Hi Piotr, > > > > Very thanks for the explanation! and very sorry that initially I should > wrongly understand the problem Dawid proposed. > > > >> And what should we do now? We can of course commit all transactions > until checkpoint 43. > >> But should we keep waiting for `notyifyCheckpointComplete(44)`? > >> hat if in the meantime another checkpoint is triggered? We could end up > waiting indefinitely. > >> Our proposal is to shutdown the task immediately after seeing first > `notifyCheckpointComplete(X)`, > >> where X is any triggered checkpoint AFTER `finish()`. This should be > fine, as: > >> a) ideally there should be no new pending transactions opened after > checkpoint 42 > >> b) even if operator/function is opening some transactions for > checkpoint 43 and > >> checkpoint 44 (`FlinkKafkaProducer`), those transactions after > checkpoint 42 should be empty > > Now I understand more on this issue: currently operators like the > TwoPhaseCommitSinkFunctions would create > > a new transaction when snapshotting state, thus for each checkpoint > there would be one corresponding > > transaction, thus after finish() and before notifyCheckpointComplete(), > there might be some empty > > transactions. > > > > I also agree with the proposal: we could finish the task after we > received the first checkpoint complete > > notification, and the checkpoint with the larger checkpoint id should be > allowed to be finished. The > > checkpoint complete notification should be able to commit all the > non-empty transactions. A part of > > the empty transactions would also get committed, but it should cause no > harm. The other empty > > transactions would be aborted in close(), which should also cause no > harm. > > > >> If checkpoint 44 completes afterwards, it will still be valid. Ideally > we would recommend that after > > seeing `finish()` operators/functions should not be opening any new > transactions, but that shouldn't be required. > > > > And from another of view, I think perhaps it might also be acceptable > that we have some requirements for > > the pattern of the operators, like the sink would be able to skip > creating new transactions after finish() is called. > > Perhaps we may treat the process as a protocol between the framework and > the operators, and the operators > > might need to follow the protocol. Of course it would be better that the > framework could handle all the cases > > elegantly, and put less implicit limitation to the operators, and at > least we should guarantee not cause compatibility > > problem after changing the process. > > > > Very thanks for the careful checks on the whole process. > > > > Best, > > Yun > > > > > > ------------------------------------------------------------------ > > From:Piotr Nowojski <pnowoj...@apache.org> > > Send Time:2021 Jul. 22 (Thu.) 15:33 > > To:dev <dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com> > > Cc:Till Rohrmann <trohrm...@apache.org>; Yun Gao > <yungao...@aliyun.com.invalid>; Piotr Nowojski <pi...@ververica.com> > > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks > Finished > > > > Hi Guowei, > > > >> Thank Dawid and Piotr for sharing the problem. +1 to EndInput/Finish > can becalled repeatedly. > > Just to clarify. It's not about calling `finish()` and `endInput()` > repeatedly, but about (from the perspective of operator's state) > > 1. seeing `finish()` > > 2. checkpoint X triggered and completed > > 3. failure + recovery from X > > 4. potentially processing more records > > 5. another `finish()` > > > > But from the context of the remaining part of your message Guowei I > presume that you have already got that point :) > > > > Yun: > > > >> For this issue perhaps we could explicitly requires the task to wait > for a checkpoint triggered after finish()> method is called for all the > operators ? We could be able to achieve this target by maintaining > >> some state inside the task. > > Isn't this exactly the "WAITING_FOR_FINAL_CP" from the FLIP document? > That we always need to wait for a checkpoint triggered after `finish()` to > complete, before shutting down a task? > > > > What Dawid was describing is a scenario where: > > 1. task/operator received `finish()` > > 2. checkpoint 42 triggered (not yet completed) > > 3. checkpoint 43 triggered (not yet completed) > > 4. checkpoint 44 triggered (not yet completed) > > 5. notifyCheckpointComplete(43) > > > > And what should we do now? We can of course commit all transactions > until checkpoint 43. But should we keep waiting for > `notyifyCheckpointComplete(44)`? What if in the meantime another checkpoint > is triggered? We could end up waiting indefinitely. > > > > Our proposal is to shutdown the task immediately after seeing first > `notifyCheckpointComplete(X)`, where X is any triggered checkpoint AFTER > `finish()`. This should be fine, as: > > a) ideally there should be no new pending transactions opened after > checkpoint 42 > > b) even if operator/function is opening some transactions for checkpoint > 43 and checkpoint 44 (`FlinkKafkaProducer`), those transactions after > checkpoint 42 should be empty > > > > Hence comment from Dawid > >> We must make sure those checkpoints do not leave lingering external > resources. > > After seeing 5. (notifyCheckpointComplete(43)) It should be good enough > to: > > - commit transactions from checkpoint 42, (and 43 if they were created, > depends on the user code) > > - close operator, aborting any pending transactions (for checkpoint 44 > if they were opened, depends on the user code) > > > > If checkpoint 44 completes afterwards, it will still be valid. Ideally > we would recommend that after seeing `finish()` operators/functions should > not be opening any new transactions, but that shouldn't be required. > > > > Best, > > Piotrek > > czw., 22 lip 2021 o 09:00 Yun Gao <yungao...@aliyun.com.invalid> > napisał(a): > > Hi Dawid, Piotr, Steven, > > > > Very thanks for pointing out these issues and very thanks for the > discussion ! > > > > Failure before notifyCheckpointComplete() > > > > For this issue I would agree with what Piotr has proposed. I tried to > use some > > operators like sink / window as example and currently I also do not > found > > explicit scenarios that might cause problems if records are processed > by a > > task that is assigned with states snapshotted after calling finish() > before. > > For the future cases it seems users should be able to implement their > > target logic by explicitly add a flag regarding finished, and perhaps > have > > different logic if this part of states are referred to. Besides, this > case would > > only happen on rescaling or topology change, which embedded some kind > of user > > knowledge inside the action. Thus it looks acceptable that we still > split the operators > > state from the task lifecycle, and do not treat checkpoint after > finish() differently. > > > > Finishing upon receiving notifyCheckpointComplete() of not the latest > checkpoint > > > > For this issue perhaps we could explicitly requires the task to wait > for a checkpoint triggered after finish() > > method is called for all the operators ? We could be able to achieve > this target by maintaining > > some state inside the task. > > > > Checkpointing from a single subtask / UnionListState case > > > > This should indeed cause problems, and I also agree with that we could > focus on this thread in the > > https://issues.apache.org/jira/browse/FLINK-21080. > > > > Best, > > Yun > > > > > > ------------------------------------------------------------------ > > From:Piotr Nowojski <pnowoj...@apache.org> > > Send Time:2021 Jul. 22 (Thu.) 02:46 > > To:dev <dev@flink.apache.org> > > Cc:Yun Gao <yungao...@aliyun.com>; Till Rohrmann <trohrm...@apache.org>; > Yun Gao <yungao...@aliyun.com.invalid>; Piotr Nowojski < > pi...@ververica.com> > > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks > Finished > > > > Hi Steven, > > > > > I probably missed sth here. isn't this the case today already? Why is > it a concern for the proposed change? > > > > The problem is with the newly added `finish()` method and the already > existing `endInput()` call. Currently on master there are no issues, > because we are not checkpointing any operators after some operators have > finished. The purpose of this FLIP-147 is to exactly enable this and this > opens a new problem described by Dawid. > > > > To paraphrase and to give a concrete example. Assume we have an > operator with parallelism of two. Subtask 0 and subtask 1. > > > > 1. Subtask 0 received both `endInput()` and `finish()`, but subtask 1 > hasn't (yet). > > 2. Checkpoint 42 is triggered, and it completes. > > 3. Job fails and is restarted, but at the same time it's rescaled. User > has chosen to scale down his operator down to 1. > > > > Now we have a pickle. We don't know if `notifyCheckpointComplete(42)` > has been processed or not, so while recovering to checkpoint 42, we have to > recover both finished subtask 0 (#1) state and not yet finished subtask 1's > (#1). But at the same time they are scaled down, so we only have a single > subtask 0 (#2) that has a combined state from both of the previous > instances. The potentially confusing issue is that the state from subtask 0 > (#1) was checkpointed AFTER `endInput()` and `finish()` calls, but it's > recovered to an operator that has still some records to process. In step 1. > an user for example could store on the operator's state a bit of > information "end input has been already called!", that after recovery would > no longer be true. > > > > Hence the question about `finish()` and `endInput()` semantics. Should > it be tied down to state, or just to an operator instance/execution attempt? > > > > Piotrek > > śr., 21 lip 2021 o 19:00 Steven Wu <stevenz...@gmail.com> napisał(a): > > > if a failure happens after sequence of finish() -> snapshotState(), > but > > before notifyCheckpointComplete(), we will restore such a state and we > > might end up sending some more records to such an operator. > > > > I probably missed sth here. isn't this the case today already? Why is > it a > > concern for the proposed change? > > > > On Wed, Jul 21, 2021 at 4:39 AM Piotr Nowojski <pnowoj...@apache.org> > wrote: > > > > > Hi Dawid, > > > > > > Thanks for writing down those concerns. > > > > > > I think the first issue boils down what should be the contract of > lifecycle > > > methods like open(), close(), initializeState() etc and especially > the new > > > additions like finish() and endInput(). And what should be their > relation > > > with the operator state (regardless of it's type keyed, non-keyed, > union, > > > ...). Should those methods be tied to state or not? After thinking > about it > > > for a while (and discussing it offline with Dawid), I think the > answer > > > might be no, they shouldn't. I mean maybe we should just openly say > that > > > all of those methods relate to this single particular instance and > > > execution of the operator. And if a job is recovered/rescaled, we > would be > > > allowed to freely resume consumption, ignoring a fact that maybe > some parts > > > of the state have previously seen `endInput()`. Why? > > > > > > 0. Yes, it might be confusing. Especially with `endInput()`. We call > > > `endInput()`, we store something in a state and later after recovery > > > combined with rescaling that state can see more records? Indeed > weird, > > > 1. I haven't come up yet with a counterexample that would break and > make > > > impossible to implement a real life use case. Theoretically yes, the > user > > > can store `endInput()` on state, and after rescaling this state > would be > > > inconsistent with what is actually happening with the operator, but I > > > haven't found a use case that would break because of that. > > > 2. Otherwise, implementation would be very difficult. > > > 3. It's difficult to access keyed state from within > `endInput()`/`finish()` > > > calls, as they do not have key context. > > > 4. After all, openly defining `endInput()` and `finish()` to be tied > with > > > it's operator execution instance lifecycle is not that strange and > quite > > > simple to explain. Sure, it can lead to a bit of confusion (0.), but > that > > > doesn't sound that bad in comparison with the alternatives that I'm > aware > > > of. Also currently methods like `open()` and `close()` are also tied > to the > > > operator execution instance, not to the state. Operators can be > opened and > > > closed multiple times, it doesn't mean that the state is lost after > closing > > > an operator. > > > > > > For the UnionListState problem I have posted my proposal in the > ticket [1], > > > so maybe let's move that particular discussion there? > > > > > > Piotrek > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-21080 > > > > > > śr., 21 lip 2021 o 12:39 Dawid Wysakowicz <dwysakow...@apache.org> > > > napisał(a): > > > > > > > Hey all, > > > > > > > > To make the issues that were found transparent to the community, I > want > > > to > > > > post an update: > > > > > > > > *1. Committing side-effects* > > > > We do want to make sure that all side effects are committed before > > > > bringing tasks down. Side effects are committed when calling > > > > notifyCheckpointComplete. For the final checkpoint we introduced > the > > > method > > > > finish(). This notifies the operator that we have consumed all > incoming > > > > records and we are preparing to close the Task. In turn we should > flush > > > any > > > > pending buffered records and prepare to commit last transactions. > The > > > goal > > > > is that after a successful sequence of finish() -> snapshotState() > -> > > > > notifyCheckpointComplete(), the remaining state can be considered > > > > empty/finished and may be discarded. > > > > > > > > *Failure before notifyCheckpointComplete()* > > > > > > > > The question is what is the contract of the endInput()/finish() > methods > > > > and how do calling these methods affect the operators keyed, > non-keyed > > > > state and external state. Is it allowed to restore state snapshot > taken > > > > after calling endInput()/finish() and process more records? Or do > we > > > assume > > > > that after a restore from such a state taken after finish() we > should not > > > > call any of the lifecycle methods or at least make sure those > methods do > > > > not emit records/interact with mailbox etc. > > > > > > > > Currently it is possible that if a failure happens after sequence > of > > > > finish() -> snapshotState(), but before > notifyCheckpointComplete(), we > > > will > > > > restore such a state and we might end up sending some more records > to > > > such > > > > an operator. It is possible if we rescale and this state is merged > with a > > > > state of a subtask that has not called finish() yet. It can also > happen > > > if > > > > we rescale the upstream operator and the subtask of interest > becomes > > > > connected to a newly added non finished subtask. > > > > > > > > *Snapshotting StreamTasks that finish() has been called* > > > > > > > > > > > > We thought about putting a flag into the snapshot of a subtask > produced > > > > after the finish() method. This would make it possible to skip > execution > > > of > > > > certain lifecycle methods. Unfortunately this creates problems for > > > > rescaling. How do we deal with a situation that subtask states > with both > > > > the feature flag set and unset end up in a single StreamTask. > Additional > > > > problem is that we merge those states into a single > OperatorSubtaskState > > > on > > > > CheckpointCoordinator. > > > > > > > > *Finishing upon receiving notifyCheckpointComplete() of not the > latest > > > > checkpoint* > > > > > > > > We need to wait for a checkpoint to complete, that started after > the > > > > finish() method. However, we support concurrent checkpoints > therefore, > > > > there might be later checkpoints that completed, but the > notification has > > > > not arrived. We must make sure those checkpoints do not leave > lingering > > > > external resources. > > > > > > > > *Checkpointing from a single subtask / UnionListState case* > > > > There are operators that checkpoint from a single subtask only. > Usually > > > > from the subtask index=0. If we let those subtasks finish, > subsequent > > > > checkpoints will miss this information. > > > > Esp. Legacy sources problem: > > > > https://issues.apache.org/jira/browse/FLINK-21080 > > > > > > > > Best, > > > > > > > > Dawid > > > > On 19/07/2021 15:10, Yun Gao wrote: > > > > > > > > Hi Till, Dawid, > > > > > > > > Very thanks for the comments and discussion! Glad that it seems we > have > > > > come to a convergence, and I also agree with that we could not > include > > > the > > > > optimization in the first version. > > > > > > > > Best > > > > Yun > > > > > > > > ------------------------------------------------------------------ > > > > From:Dawid Wysakowicz <dwysakow...@apache.org> < > dwysakow...@apache.org> > > > > Send Time:2021 Jul. 19 (Mon.) 17:51 > > > > To:dev <dev@flink.apache.org> <dev@flink.apache.org>; Till > Rohrmann > > > > <trohrm...@apache.org> <trohrm...@apache.org> > > > > Cc:Yun Gao <yungao...@aliyun.com> <yungao...@aliyun.com>; Yun Gao > > > > <yungao...@aliyun.com.invalid> <yungao...@aliyun.com.invalid> > > > > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks > > > > Finished > > > > > > > > Small correction. I meant we need to adjust the EndOfInputEvent of > > > course. > > > > > > > > Best, > > > > > > > > Dawid > > > > > > > > On 19/07/2021 11:48, Dawid Wysakowicz wrote: > > > > > Hey Till, > > > > > > > > > > Yes, you're right we will have to adjust the current state of > > > > > EndOfPartitionEvent and move the moment when we emit it to have > what > > > > > we're discussing here. We are aware of that. > > > > > > > > > > As for the MAX_WATERMARK vs finish(). My take is that we should > always > > > > > emit MAX_WATERMARK before calling finish() on an operator. At > the same > > > > > time finish() should not leave behind anything in state, as the > > > > > intention is that we never restore from the taken > savepoint/checkpoint > > > > > (savepoint w drain or bounded data consumed). > > > > > > > > > > Best, > > > > > > > > > > Dawid > > > > > > > > > > On 19/07/2021 11:33, Till Rohrmann wrote: > > > > >> Hi Yun and Dawid, > > > > >> > > > > > > > > >> Thanks for your comments. I do agree with your comments that > finish() > > > can > > > > >> do more than MAX_WATERMARK. I guess we should then explain how > > > > >> MAX_WATERMARK and finish() play together and what kind of > > > > >> order guarantees we provide. > > > > >> > > > > > > > > >> Concerning the EndOfPartitionEvent, I am not entirely sure > whether it > > > would > > > > > > > > >> work in its current state because we send this event when the > Task is > > > about > > > > >> to shut down if I am not mistaken. What we want to have is to > bring > > > the > > > > > > > > >> StreamTasks into a state so that they shut down on the next > > > checkpoint. For > > > > >> this we need to keep the StreamTask running. In general, I am a > fan of > > > > >> making things explicit if possible. I think this helps > maintenance and > > > > > > > > >> evolvability of code. That's why I think sending an > EndOfInputEvent > > > which > > > > >> is a StreamTask level event and which says that there won't be > any > > > other > > > > >> records coming only control events could make sense. > > > > >> > > > > >> I would leave the proposed optimization out of the first > version. We > > > can > > > > >> still add it at a later point in time. > > > > >> > > > > >> Cheers, > > > > >> Till > > > > >> > > > > >> On Mon, Jul 19, 2021 at 10:35 AM Dawid Wysakowicz > > > > <dwysakow...@apache.org> <dwysakow...@apache.org> > > > > >> wrote: > > > > >> > > > > > > > > >>> Personally I don't find this optimization important and I'd > rather > > > leave > > > > > > > > >>> it out not to complicate the codebase further. I doubt we save > much > > > there. > > > > >>> I don't have a strong opinion though. > > > > >>> > > > > >>> Best, > > > > >>> > > > > >>> Dawid > > > > >>> On 19/07/2021 10:31, Yun Gao wrote: > > > > >>> > > > > >>> Hi, > > > > >>> > > > > >>> Very thanks Dawid for the thoughts! > > > > >>> > > > > >>> Currently I also do not have different opinions regarding this > part. > > > > >>> But I have one more issue to confirm: during the previous > discussion > > > we > > > > >>> have discussed that for the final checkpoint case, we might > have an > > > > >>> optmization > > > > > > > > >>> that if a task do not have operators using 2-pc, we might skip > > > waiting for > > > > >>> the > > > > > > > > >>> final checkpoint (but we could not skip the savepoint). To > allow > > > users to > > > > >>> express > > > > >>> the logic, we have proposed to add one more method to > StreamOperator > > > & > > > > >>> CheckpointListener: > > > > >>> > > > > >>> interface StreamOperator { > > > > >>> default boolean requiresFinalCheckpoint() { > > > > >>> return true; > > > > >>> } > > > > >>> } > > > > >>> > > > > >>> interface CheckpointListener { > > > > >>> > > > > >>> default boolean requiresFinalCheckpoint() { > > > > >>> return true; > > > > >>> } > > > > >>> } > > > > >>> > > > > >>> class AbstractUdfStreamOperator { > > > > >>> > > > > >>> @Override > > > > >>> boolean requiresFinalCheckpoint() { > > > > >>> return userFunction instanceof CheckpointListener && > > > > > > > > >>> ((CheckpointListener) > > > userFunction).requiresFinalCheckpoint(); > > > > >>> } > > > > >>> } > > > > >>> > > > > >>> > > > > >>> I think we should still keep the change ? > > > > >>> > > > > >>> Best, > > > > >>> Yun > > > > >>> > > > > >>> ------------------Original Mail ------------------ > > > > >>> *Sender:*Dawid Wysakowicz <dwysakow...@apache.org> > > > > <dwysakow...@apache.org> > > > > >>> <dwysakow...@apache.org> <dwysakow...@apache.org> > > > > >>> *Send Date:*Sun Jul 18 18:44:50 2021 > > > > >>> *Recipients:*Flink Dev <dev@flink.apache.org> < > dev@flink.apache.org> > > > > <dev@flink.apache.org> <dev@flink.apache.org>, Yun > > > > >>> Gao <yungao...@aliyun.com.INVALID> > <yungao...@aliyun.com.INVALID> > > > > <yungao...@aliyun.com.INVALID> <yungao...@aliyun.com.INVALID> > > > > >>> *Subject:*Re: [RESULT][VOTE] FLIP-147: Support Checkpoint > After Tasks > > > > >>> Finished > > > > >>> > > > > >>>> I think we're all really close to the same solution. > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> I second Yun's thoughts that MAX_WATERMARK works well for > time based > > > > >>>> > > > > > > > > >>>> buffering, but it does not solve flushing other operations > such as > > > e.g. > > > > >>>> > > > > >>>> count windows or batching requests in Sinks. I'd prefer to > treat the > > > > >>>> > > > > >>>> finish() as a message for Operator to "flush all records". The > > > > >>>> > > > > > > > > >>>> MAX_WATERMARK in my opinion is mostly for backwards > compatibility > > > imo. I > > > > >>>> > > > > >>>> don't think operators need to get a signal "stop-processing" > if they > > > > >>>> > > > > >>>> don't need to flush records. The "WHEN" records are emitted, > should > > > be > > > > >>>> > > > > >>>> in control of the StreamTask, by firing timers or by > processing a > > > next > > > > >>>> > > > > >>>> record from upstream. > > > > >>>> > > > > >>>> > > > > >>>> > > > > > > > > >>>> The only difference of my previous proposal compared to Yun's > is > > > that I > > > > >>>> > > > > >>>> did not want to send the EndOfUserRecords event in case of > stop w/o > > > > >>>> > > > > >>>> drain. My thinking was that we could directly go from RUNNING > to > > > > >>>> > > > > >>>> WAITING_FOR_FINAL_CP on EndOfPartitionEvent. I agree we could > emit > > > > >>>> > > > > >>>> EndOfUserRecordsEvent with an additional flag and e.g. stop > firing > > > > >>>> > > > > > > > > >>>> timers and processing events (without calling finish() on > > > Operator). In > > > > >>>> > > > > >>>> my initial suggestion I though we don't care about some events > > > > >>>> > > > > >>>> potentially being emitted after the savepoint was taken, as > they > > > would > > > > >>>> > > > > > > > > >>>> anyway belong to the next after FINAL, which would be > discarded. I > > > think > > > > >>>> > > > > >>>> though the proposal to suspend records processing and timers > is a > > > > >>>> > > > > > > > > >>>> sensible thing to do and would go with the version that Yun > put > > > into the > > > > >>>> > > > > >>>> FLIP Wiki. > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> What do you think Till? > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> Best, > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> Dawid > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> On 16/07/2021 10:03, Yun Gao wrote: > > > > >>>> > > > > >>>>> Hi Till, Piotr > > > > >>>>> Very thanks for the comments! > > > > >>>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK? > > > > > > > > >>>>> I also agree with Piotr that currently they are independent > > > mechanisms, > > > > >>>> and they are basically the same > > > > >>>> > > > > >>>>> for the event time. > > > > >>>>> For more details, first there are some difference among the > three > > > > >>>> scenarios regarding the finish: > > > > >>>> > > > > > > > > >>>>> For normal finish and stop-with-savepoint --drain, the job > would > > > not be > > > > >>>> expected to be restarted, > > > > >>>> > > > > >>>>> and for stop-with-savepoint the job would be expected restart > > > later. > > > > >>>>> Then for finish / stop-with-savepoint --drain, currently > Flink > > > would > > > > >>>> emit MAX_WATERMARK before the > > > > >>>> > > > > > > > > >>>>> EndOfPartition. Besides, as we have discussed before [1], > > > endOfInput / > > > > >>>> finish() should also only be called > > > > >>>> > > > > >>>>> for finish / stop-with-savepoint --drain. Thus currently they > > > always > > > > >>>> occurs at the same time. After the change, > > > > >>>> > > > > >>>>> we could emit MAX_WATERMARK before endOfInput event for the > finish > > > / > > > > >>>> stop-with-savepoint --drain cases. > > > > >>>> > > > > >>>>>> 2) StreamOperator.finish says to flush all buffered events. > Would > > > a > > > > >>>>>> WindowOperator close all windows and emit the results upon > calling > > > > >>>>>> finish, for example? > > > > >>>>> As discussed above for stop-with-savepoint, we would always > keep > > > the > > > > >>>> window as is, and restore them after restart. > > > > >>>> > > > > >>>>> Then for the finish / stop-with-savepoint --drain, I think > perhaps > > > it > > > > >>>> depends on the Triggers. For > > > > >>>> > > > > > > > > >>>>> event-time triggers / process time triggers, it would be > > > reasonable to > > > > >>>> flush all the windows since logically > > > > >>>> > > > > > > > > >>>>> the time would always elapse and the window would always get > > > triggered > > > > >>>> in a logical future. But for triggers > > > > >>>> > > > > > > > > >>>>> like CountTrigger, no matter how much time pass logically, > the > > > windows > > > > >>>> would not trigger, thus we may not > > > > >>>> > > > > >>>>> flush these windows. If there are requirements we may provide > > > > >>>> additional triggers. > > > > >>>> > > > > > > > > >>>>>> It's a bit messy and I'm not sure if this should be > strengthened > > > out? > > > > >>>> Each one of those has a little bit different semantic/meaning, > > > > >>>> > > > > > > > > >>>>>> but at the same time they are very similar. For single input > > > operators > > > > >>>> `endInput()` and `finish()` are actually the very same thing. > > > > >>>> > > > > > > > > >>>>> Currently MAX_WATERMARK / endInput / finish indeed always > happen > > > at the > > > > >>>> same time, and for single input operators `endInput()` and > > > `finish()` > > > > >>>> > > > > > > > > >>>>> are indeed the same thing. During the last discussion we ever > > > mentioned > > > > >>>> this issue and at then we thought that we might deprecate > > > `endInput()` > > > > >>>> > > > > >>>>> in the future, then we would only have endInput(int input) > and > > > > >>>> finish(). > > > > >>>> > > > > >>>>> Best, > > > > >>>>> Yun > > > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-21132 > > > > >>>>> > ------------------------------------------------------------------ > > > > >>>>> From:Piotr Nowojski > > > > >>>>> Send Time:2021 Jul. 16 (Fri.) 13:48 > > > > >>>>> To:dev > > > > >>>>> Cc:Yun Gao > > > > >>>>> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint > After Tasks > > > > >>>> Finished > > > > >>>> > > > > >>>>> Hi Till, > > > > >>>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK? > > > > >>>>>> 2) StreamOperator.finish says to flush all buffered events. > Would > > > a> > > > > >>>> WindowOperator close all windows and emit the results upon > calling > > > > >>>> > > > > >>>>>> finish, for example? > > > > >>>>> 1) currently they are independent but parallel mechanisms. > With > > > event > > > > >>>> time, they are basically the same. > > > > >>>> > > > > >>>>> 2) it probably should for the sake of processing time > windows. > > > > >>>>> Here you are touching the bit of the current design that I > like the > > > > > > > > >>>> least. We basically have now three different ways of > conveying very > > > similar > > > > >>>> things: > > > > >>>> > > > > >>>>> a) sending `MAX_WATERMARK`, used by event time > WindowOperator (what > > > > >>>> about processing time?) > > > > >>>> > > > > >>>>> b) endInput(), used for example by AsyncWaitOperator to > flush it's > > > > >>>> internal state > > > > >>>> > > > > >>>>> c) finish(), used for example by ContinuousFileReaderOperator > > > > >>>>> It's a bit messy and I'm not sure if this should be > strengthened > > > out? > > > > > > > > >>>> Each one of those has a little bit different > semantic/meaning, but > > > at the > > > > > > > > >>>> same time they are very similar. For single input operators > > > `endInput()` > > > > >>>> and `finish()` are actually the very same thing. > > > > >>>> > > > > >>>>> Piotrek > > > > >>>>> czw., 15 lip 2021 o 16:47 Till Rohrmann napisał(a): > > > > >>>>> Thanks for updating the FLIP. Based on the new section about > > > > >>>>> stop-with-savepoint [--drain] I got two other questions: > > > > >>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK? > > > > >>>>> 2) StreamOperator.finish says to flush all buffered events. > Would a > > > > >>>>> WindowOperator close all windows and emit the results upon > calling > > > > >>>>> finish, for example? > > > > >>>>> Cheers, > > > > >>>>> Till > > > > >>>>> On Thu, Jul 15, 2021 at 10:15 AM Till Rohrmann wrote: > > > > >>>>>> Thanks a lot for your answers and clarifications Yun. > > > > >>>>>> 1+2) Agreed, this can be a future improvement if this > becomes a > > > > >>>> problem. > > > > >>>> > > > > >>>>>> 3) Great, this will help a lot with understanding the FLIP. > > > > >>>>>> Cheers, > > > > >>>>>> Till > > > > >>>>>> On Wed, Jul 14, 2021 at 5:41 PM Yun Gao > > > > >>>>>> wrote: > > > > >>>>>>> Hi Till, > > > > >>>>>>> Very thanks for the review and comments! > > > > >>>>>>> 1) First I think in fact we could be able to do the > computation > > > > >>>> outside > > > > >>>> > > > > >>>>>>> of the main thread, > > > > >>>>>>> and the current implementation mainly due to the > computation is > > > in > > > > >>>>>>> general fast and we > > > > >>>>>>> initially want to have a simplified first version. > > > > > > > > >>>>>>> The main requirement here is to have a constant view of the > > > state of > > > > >>>> the > > > > >>>> > > > > >>>>>>> tasks, otherwise > > > > >>>>>>> for example if we have A -> B, if A is running when we > check if > > > we > > > > >>>> need > > > > >>>> > > > > >>>>>>> to trigger A, we will > > > > >>>>>>> mark A as have to trigger, but if A gets to finished when > we > > > check > > > > >>>> B, we > > > > >>>> > > > > >>>>>>> will also mark B as > > > > > > > > >>>>>>> have to trigger, then B will receive both rpc trigger and > > > checkpoint > > > > >>>>>>> barrier, which would break > > > > >>>>>>> some assumption on the task side and complicate the > > > implementation. > > > > > > > > >>>>>>> But to cope this issue, we in fact could first have a > snapshot > > > of the > > > > >>>>>>> tasks' state and then do the > > > > > > > > >>>>>>> computation, both the two step do not need to be in the > main > > > thread. > > > > > > > > >>>>>>> 2) For the computation logic, in fact currently we benefit > a lot > > > from > > > > >>>>>>> some shortcuts on all-to-all > > > > > > > > >>>>>>> edges and job vertex with all tasks running, these > shortcuts > > > could do > > > > >>>>>>> checks on the job vertex level > > > > >>>>>>> first and skip some job vertices as a whole. With this > > > optimization > > > > >>>> we > > > > >>>> > > > > >>>>>>> have a O(V) algorithm, and the > > > > >>>>>>> current running time of the worst case for a job with > 320,000 > > > tasks > > > > >>>> is > > > > >>>> > > > > >>>>>>> less than 100ms. For > > > > >>>>>>> daily graph sizes the time would be further reduced > linearly. > > > > >>>>>>> If we do the computation based on the last triggered > tasks, we > > > may > > > > >>>> not > > > > >>>> > > > > >>>>>>> easily encode this information > > > > > > > > >>>>>>> into the shortcuts on the job vertex level. And since the > time > > > seems > > > > >>>> to > > > > >>>> > > > > >>>>>>> be short, perhaps it is enough > > > > >>>>>>> to do re-computation from the scratch in consideration of > the > > > > >>>> tradeoff > > > > >>>> > > > > >>>>>>> between the performance and > > > > >>>>>>> the complexity ? > > > > >>>>>>> 3) We are going to emit the EndOfInput event exactly after > the > > > > >>>> finish() > > > > >>>> > > > > >>>>>>> method and before the last > > > > > > > > >>>>>>> snapshotState() method so that we could shut down the whole > > > topology > > > > >>>> with > > > > >>>> > > > > >>>>>>> a single final checkpoint. > > > > >>>>>>> Very sorry for not include enough details for this part > and I'll > > > > >>>>>>> complement the FLIP with the details on > > > > >>>>>>> the process of the final checkpoint / savepoint. > > > > >>>>>>> Best, > > > > >>>>>>> Yun > > > > >>>>>>> > > > ------------------------------------------------------------------ > > > > >>>>>>> From:Till Rohrmann > > > > >>>>>>> Send Time:2021 Jul. 14 (Wed.) 22:05 > > > > >>>>>>> To:dev > > > > >>>>>>> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint > After > > > Tasks > > > > >>>>>>> Finished > > > > >>>>>>> Hi everyone, > > > > >>>>>>> I am a bit late to the voting party but let me ask three > > > questions: > > > > > > > > >>>>>>> 1) Why do we execute the trigger plan computation in the > main > > > thread > > > > >>>> if we > > > > >>>> > > > > > > > > >>>>>>> cannot guarantee that all tasks are still running when > > > triggering the > > > > >>>>>>> checkpoint? Couldn't we do the computation in a different > thread > > > in > > > > >>>> order > > > > >>>> > > > > >>>>>>> to relieve the main thread a bit. > > > > >>>>>>> 2) The implementation of the > DefaultCheckpointPlanCalculator > > > seems > > > > >>>> to go > > > > >>>> > > > > >>>>>>> over the whole topology for every calculation. Wouldn't it > be > > > more > > > > >>>>>>> efficient to maintain the set of current tasks to trigger > and > > > check > > > > >>>>>>> whether > > > > >>>>>>> anything has changed and if so check the succeeding tasks > until > > > we > > > > >>>> have > > > > >>>> > > > > >>>>>>> found the current checkpoint trigger frontier? > > > > >>>>>>> 3) When are we going to send the endOfInput events to a > > > downstream > > > > >>>> task? > > > > >>>> > > > > >>>>>>> If > > > > > > > > >>>>>>> this happens after we call finish on the upstream operator > but > > > before > > > > >>>>>>> snapshotState then it would be possible to shut down the > whole > > > > >>>> topology > > > > >>>> > > > > > > > > >>>>>>> with a single final checkpoint. I think this part could > benefit > > > from > > > > >>>> a bit > > > > >>>> > > > > >>>>>>> more detailed description in the FLIP. > > > > >>>>>>> Cheers, > > > > >>>>>>> Till > > > > >>>>>>> On Fri, Jul 2, 2021 at 8:36 AM Yun Gao > > > > >>>>>>> wrote: > > > > >>>>>>>> Hi there, > > > > >>>>>>>> Since the voting time of FLIP-147[1] has passed, I'm > closing the > > > > >>>> vote > > > > >>>> > > > > >>>>>>> now. > > > > >>>>>>>> There were seven +1 votes ( 6 / 7 are bindings) and no -1 > votes: > > > > >>>>>>>> - Dawid Wysakowicz (binding) > > > > >>>>>>>> - Piotr Nowojski(binding) > > > > >>>>>>>> - Jiangang Liu (binding) > > > > >>>>>>>> - Arvid Heise (binding) > > > > >>>>>>>> - Jing Zhang (binding) > > > > >>>>>>>> - Leonard Xu (non-binding) > > > > >>>>>>>> - Guowei Ma (binding) > > > > >>>>>>>> Thus I'm happy to announce that the update to the > FLIP-147 is > > > > >>>> accepted. > > > > >>>> > > > > >>>>>>>> Very thanks everyone! > > > > >>>>>>>> Best, > > > > >>>>>>>> Yun > > > > >>>>>>>> [1] https://cwiki.apache.org/confluence/x/mw-ZCQ > > > > >>>> > > > > >>>> > > > > > > > > > > > > > > > > > > >