Hi Till, Dawid, Very thanks for the comments and discussion! Glad that it seems we have come to a convergence, and I also agree with that we could not include the optimization in the first version.
Best Yun ------------------------------------------------------------------ From:Dawid Wysakowicz <dwysakow...@apache.org> Send Time:2021 Jul. 19 (Mon.) 17:51 To:dev <dev@flink.apache.org>; Till Rohrmann <trohrm...@apache.org> Cc:Yun Gao <yungao...@aliyun.com>; Yun Gao <yungao...@aliyun.com.invalid> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished Small correction. I meant we need to adjust the EndOfInputEvent of course. Best, Dawid On 19/07/2021 11:48, Dawid Wysakowicz wrote: > Hey Till, > > Yes, you're right we will have to adjust the current state of > EndOfPartitionEvent and move the moment when we emit it to have what > we're discussing here. We are aware of that. > > As for the MAX_WATERMARK vs finish(). My take is that we should always > emit MAX_WATERMARK before calling finish() on an operator. At the same > time finish() should not leave behind anything in state, as the > intention is that we never restore from the taken savepoint/checkpoint > (savepoint w drain or bounded data consumed). > > Best, > > Dawid > > On 19/07/2021 11:33, Till Rohrmann wrote: >> Hi Yun and Dawid, >> >> Thanks for your comments. I do agree with your comments that finish() can >> do more than MAX_WATERMARK. I guess we should then explain how >> MAX_WATERMARK and finish() play together and what kind of >> order guarantees we provide. >> >> Concerning the EndOfPartitionEvent, I am not entirely sure whether it would >> work in its current state because we send this event when the Task is about >> to shut down if I am not mistaken. What we want to have is to bring the >> StreamTasks into a state so that they shut down on the next checkpoint. For >> this we need to keep the StreamTask running. In general, I am a fan of >> making things explicit if possible. I think this helps maintenance and >> evolvability of code. That's why I think sending an EndOfInputEvent which >> is a StreamTask level event and which says that there won't be any other >> records coming only control events could make sense. >> >> I would leave the proposed optimization out of the first version. We can >> still add it at a later point in time. >> >> Cheers, >> Till >> >> On Mon, Jul 19, 2021 at 10:35 AM Dawid Wysakowicz <dwysakow...@apache.org> >> wrote: >> >>> Personally I don't find this optimization important and I'd rather leave >>> it out not to complicate the codebase further. I doubt we save much there. >>> I don't have a strong opinion though. >>> >>> Best, >>> >>> Dawid >>> On 19/07/2021 10:31, Yun Gao wrote: >>> >>> Hi, >>> >>> Very thanks Dawid for the thoughts! >>> >>> Currently I also do not have different opinions regarding this part. >>> But I have one more issue to confirm: during the previous discussion we >>> have discussed that for the final checkpoint case, we might have an >>> optmization >>> that if a task do not have operators using 2-pc, we might skip waiting for >>> the >>> final checkpoint (but we could not skip the savepoint). To allow users to >>> express >>> the logic, we have proposed to add one more method to StreamOperator & >>> CheckpointListener: >>> >>> interface StreamOperator { >>> default boolean requiresFinalCheckpoint() { >>> return true; >>> } >>> } >>> >>> interface CheckpointListener { >>> >>> default boolean requiresFinalCheckpoint() { >>> return true; >>> } >>> } >>> >>> class AbstractUdfStreamOperator { >>> >>> @Override >>> boolean requiresFinalCheckpoint() { >>> return userFunction instanceof CheckpointListener && >>> ((CheckpointListener) userFunction).requiresFinalCheckpoint(); >>> } >>> } >>> >>> >>> I think we should still keep the change ? >>> >>> Best, >>> Yun >>> >>> ------------------Original Mail ------------------ >>> *Sender:*Dawid Wysakowicz <dwysakow...@apache.org> >>> <dwysakow...@apache.org> >>> *Send Date:*Sun Jul 18 18:44:50 2021 >>> *Recipients:*Flink Dev <dev@flink.apache.org> <dev@flink.apache.org>, Yun >>> Gao <yungao...@aliyun.com.INVALID> <yungao...@aliyun.com.INVALID> >>> *Subject:*Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks >>> Finished >>> >>>> I think we're all really close to the same solution. >>>> >>>> >>>> >>>> I second Yun's thoughts that MAX_WATERMARK works well for time based >>>> >>>> buffering, but it does not solve flushing other operations such as e.g. >>>> >>>> count windows or batching requests in Sinks. I'd prefer to treat the >>>> >>>> finish() as a message for Operator to "flush all records". The >>>> >>>> MAX_WATERMARK in my opinion is mostly for backwards compatibility imo. I >>>> >>>> don't think operators need to get a signal "stop-processing" if they >>>> >>>> don't need to flush records. The "WHEN" records are emitted, should be >>>> >>>> in control of the StreamTask, by firing timers or by processing a next >>>> >>>> record from upstream. >>>> >>>> >>>> >>>> The only difference of my previous proposal compared to Yun's is that I >>>> >>>> did not want to send the EndOfUserRecords event in case of stop w/o >>>> >>>> drain. My thinking was that we could directly go from RUNNING to >>>> >>>> WAITING_FOR_FINAL_CP on EndOfPartitionEvent. I agree we could emit >>>> >>>> EndOfUserRecordsEvent with an additional flag and e.g. stop firing >>>> >>>> timers and processing events (without calling finish() on Operator). In >>>> >>>> my initial suggestion I though we don't care about some events >>>> >>>> potentially being emitted after the savepoint was taken, as they would >>>> >>>> anyway belong to the next after FINAL, which would be discarded. I think >>>> >>>> though the proposal to suspend records processing and timers is a >>>> >>>> sensible thing to do and would go with the version that Yun put into the >>>> >>>> FLIP Wiki. >>>> >>>> >>>> >>>> What do you think Till? >>>> >>>> >>>> >>>> Best, >>>> >>>> >>>> >>>> Dawid >>>> >>>> >>>> >>>> >>>> >>>> On 16/07/2021 10:03, Yun Gao wrote: >>>> >>>>> Hi Till, Piotr >>>>> Very thanks for the comments! >>>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK? >>>>> I also agree with Piotr that currently they are independent mechanisms, >>>> and they are basically the same >>>> >>>>> for the event time. >>>>> For more details, first there are some difference among the three >>>> scenarios regarding the finish: >>>> >>>>> For normal finish and stop-with-savepoint --drain, the job would not be >>>> expected to be restarted, >>>> >>>>> and for stop-with-savepoint the job would be expected restart later. >>>>> Then for finish / stop-with-savepoint --drain, currently Flink would >>>> emit MAX_WATERMARK before the >>>> >>>>> EndOfPartition. Besides, as we have discussed before [1], endOfInput / >>>> finish() should also only be called >>>> >>>>> for finish / stop-with-savepoint --drain. Thus currently they always >>>> occurs at the same time. After the change, >>>> >>>>> we could emit MAX_WATERMARK before endOfInput event for the finish / >>>> stop-with-savepoint --drain cases. >>>> >>>>>> 2) StreamOperator.finish says to flush all buffered events. Would a >>>>>> WindowOperator close all windows and emit the results upon calling >>>>>> finish, for example? >>>>> As discussed above for stop-with-savepoint, we would always keep the >>>> window as is, and restore them after restart. >>>> >>>>> Then for the finish / stop-with-savepoint --drain, I think perhaps it >>>> depends on the Triggers. For >>>> >>>>> event-time triggers / process time triggers, it would be reasonable to >>>> flush all the windows since logically >>>> >>>>> the time would always elapse and the window would always get triggered >>>> in a logical future. But for triggers >>>> >>>>> like CountTrigger, no matter how much time pass logically, the windows >>>> would not trigger, thus we may not >>>> >>>>> flush these windows. If there are requirements we may provide >>>> additional triggers. >>>> >>>>>> It's a bit messy and I'm not sure if this should be strengthened out? >>>> Each one of those has a little bit different semantic/meaning, >>>> >>>>>> but at the same time they are very similar. For single input operators >>>> `endInput()` and `finish()` are actually the very same thing. >>>> >>>>> Currently MAX_WATERMARK / endInput / finish indeed always happen at the >>>> same time, and for single input operators `endInput()` and `finish()` >>>> >>>>> are indeed the same thing. During the last discussion we ever mentioned >>>> this issue and at then we thought that we might deprecate `endInput()` >>>> >>>>> in the future, then we would only have endInput(int input) and >>>> finish(). >>>> >>>>> Best, >>>>> Yun >>>>> [1] https://issues.apache.org/jira/browse/FLINK-21132 >>>>> ------------------------------------------------------------------ >>>>> From:Piotr Nowojski >>>>> Send Time:2021 Jul. 16 (Fri.) 13:48 >>>>> To:dev >>>>> Cc:Yun Gao >>>>> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks >>>> Finished >>>> >>>>> Hi Till, >>>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK? >>>>>> 2) StreamOperator.finish says to flush all buffered events. Would a> >>>> WindowOperator close all windows and emit the results upon calling >>>> >>>>>> finish, for example? >>>>> 1) currently they are independent but parallel mechanisms. With event >>>> time, they are basically the same. >>>> >>>>> 2) it probably should for the sake of processing time windows. >>>>> Here you are touching the bit of the current design that I like the >>>> least. We basically have now three different ways of conveying very similar >>>> things: >>>> >>>>> a) sending `MAX_WATERMARK`, used by event time WindowOperator (what >>>> about processing time?) >>>> >>>>> b) endInput(), used for example by AsyncWaitOperator to flush it's >>>> internal state >>>> >>>>> c) finish(), used for example by ContinuousFileReaderOperator >>>>> It's a bit messy and I'm not sure if this should be strengthened out? >>>> Each one of those has a little bit different semantic/meaning, but at the >>>> same time they are very similar. For single input operators `endInput()` >>>> and `finish()` are actually the very same thing. >>>> >>>>> Piotrek >>>>> czw., 15 lip 2021 o 16:47 Till Rohrmann napisaĆ(a): >>>>> Thanks for updating the FLIP. Based on the new section about >>>>> stop-with-savepoint [--drain] I got two other questions: >>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK? >>>>> 2) StreamOperator.finish says to flush all buffered events. Would a >>>>> WindowOperator close all windows and emit the results upon calling >>>>> finish, for example? >>>>> Cheers, >>>>> Till >>>>> On Thu, Jul 15, 2021 at 10:15 AM Till Rohrmann wrote: >>>>>> Thanks a lot for your answers and clarifications Yun. >>>>>> 1+2) Agreed, this can be a future improvement if this becomes a >>>> problem. >>>> >>>>>> 3) Great, this will help a lot with understanding the FLIP. >>>>>> Cheers, >>>>>> Till >>>>>> On Wed, Jul 14, 2021 at 5:41 PM Yun Gao >>>>>> wrote: >>>>>>> Hi Till, >>>>>>> Very thanks for the review and comments! >>>>>>> 1) First I think in fact we could be able to do the computation >>>> outside >>>> >>>>>>> of the main thread, >>>>>>> and the current implementation mainly due to the computation is in >>>>>>> general fast and we >>>>>>> initially want to have a simplified first version. >>>>>>> The main requirement here is to have a constant view of the state of >>>> the >>>> >>>>>>> tasks, otherwise >>>>>>> for example if we have A -> B, if A is running when we check if we >>>> need >>>> >>>>>>> to trigger A, we will >>>>>>> mark A as have to trigger, but if A gets to finished when we check >>>> B, we >>>> >>>>>>> will also mark B as >>>>>>> have to trigger, then B will receive both rpc trigger and checkpoint >>>>>>> barrier, which would break >>>>>>> some assumption on the task side and complicate the implementation. >>>>>>> But to cope this issue, we in fact could first have a snapshot of the >>>>>>> tasks' state and then do the >>>>>>> computation, both the two step do not need to be in the main thread. >>>>>>> 2) For the computation logic, in fact currently we benefit a lot from >>>>>>> some shortcuts on all-to-all >>>>>>> edges and job vertex with all tasks running, these shortcuts could do >>>>>>> checks on the job vertex level >>>>>>> first and skip some job vertices as a whole. With this optimization >>>> we >>>> >>>>>>> have a O(V) algorithm, and the >>>>>>> current running time of the worst case for a job with 320,000 tasks >>>> is >>>> >>>>>>> less than 100ms. For >>>>>>> daily graph sizes the time would be further reduced linearly. >>>>>>> If we do the computation based on the last triggered tasks, we may >>>> not >>>> >>>>>>> easily encode this information >>>>>>> into the shortcuts on the job vertex level. And since the time seems >>>> to >>>> >>>>>>> be short, perhaps it is enough >>>>>>> to do re-computation from the scratch in consideration of the >>>> tradeoff >>>> >>>>>>> between the performance and >>>>>>> the complexity ? >>>>>>> 3) We are going to emit the EndOfInput event exactly after the >>>> finish() >>>> >>>>>>> method and before the last >>>>>>> snapshotState() method so that we could shut down the whole topology >>>> with >>>> >>>>>>> a single final checkpoint. >>>>>>> Very sorry for not include enough details for this part and I'll >>>>>>> complement the FLIP with the details on >>>>>>> the process of the final checkpoint / savepoint. >>>>>>> Best, >>>>>>> Yun >>>>>>> ------------------------------------------------------------------ >>>>>>> From:Till Rohrmann >>>>>>> Send Time:2021 Jul. 14 (Wed.) 22:05 >>>>>>> To:dev >>>>>>> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks >>>>>>> Finished >>>>>>> Hi everyone, >>>>>>> I am a bit late to the voting party but let me ask three questions: >>>>>>> 1) Why do we execute the trigger plan computation in the main thread >>>> if we >>>> >>>>>>> cannot guarantee that all tasks are still running when triggering the >>>>>>> checkpoint? Couldn't we do the computation in a different thread in >>>> order >>>> >>>>>>> to relieve the main thread a bit. >>>>>>> 2) The implementation of the DefaultCheckpointPlanCalculator seems >>>> to go >>>> >>>>>>> over the whole topology for every calculation. Wouldn't it be more >>>>>>> efficient to maintain the set of current tasks to trigger and check >>>>>>> whether >>>>>>> anything has changed and if so check the succeeding tasks until we >>>> have >>>> >>>>>>> found the current checkpoint trigger frontier? >>>>>>> 3) When are we going to send the endOfInput events to a downstream >>>> task? >>>> >>>>>>> If >>>>>>> this happens after we call finish on the upstream operator but before >>>>>>> snapshotState then it would be possible to shut down the whole >>>> topology >>>> >>>>>>> with a single final checkpoint. I think this part could benefit from >>>> a bit >>>> >>>>>>> more detailed description in the FLIP. >>>>>>> Cheers, >>>>>>> Till >>>>>>> On Fri, Jul 2, 2021 at 8:36 AM Yun Gao >>>>>>> wrote: >>>>>>>> Hi there, >>>>>>>> Since the voting time of FLIP-147[1] has passed, I'm closing the >>>> vote >>>> >>>>>>> now. >>>>>>>> There were seven +1 votes ( 6 / 7 are bindings) and no -1 votes: >>>>>>>> - Dawid Wysakowicz (binding) >>>>>>>> - Piotr Nowojski(binding) >>>>>>>> - Jiangang Liu (binding) >>>>>>>> - Arvid Heise (binding) >>>>>>>> - Jing Zhang (binding) >>>>>>>> - Leonard Xu (non-binding) >>>>>>>> - Guowei Ma (binding) >>>>>>>> Thus I'm happy to announce that the update to the FLIP-147 is >>>> accepted. >>>> >>>>>>>> Very thanks everyone! >>>>>>>> Best, >>>>>>>> Yun >>>>>>>> [1] https://cwiki.apache.org/confluence/x/mw-ZCQ >>>> >>>>