Hi all, I must admit I think I agree with Till that the relation between life-cycles of the (Stream)Task and the Operator is missing in the FLIP. Moreover I am now a bit concerned that we missed the relation with stopping with savepoint. I am still positive we can incorporate that in the current efforts. At the same time I think the proposed Operator APIs still make sense.
First I'd like to start from the Operator's point of view. * * *endOfInput vs finish* The way I see the relation between endOfInput/finish/MAX_WATERMARK/close is that basically endOfInput() and finish() are the same for one-input operators and the difference for multi input operators is that endOfInput is called for each input separately and then once all inputs finish the finish() is called. For simplicity I will from now on use just the finish method and leave the endOfInput out of the discussion. *w drain vs w/o drain* I agree we have two cases of finishing based on the reason: 1. we consumed all data (bounded source or stop-with-savepoint w drain) 2. stop-with-savepoint w/o drain I think we're at the same page that finish() should be called only in the first case. I suggest we keep the current behaviour that the we emit MAX_WATERMARK separately and then call finish(). For even-time based scenarios this might be duplicated information, but it does not harm. The order would be that we first emit MAX_WATERMARK and then call finish(). Of course we would emit MAX_WATERMARK only in the first case. Lastly the assumption should be that if there is finish() there will be one extra snapshotState/notifyCheckpointComplete. For the w/o drain the operator should not care when it is closed() or which savepoint was the last one. That's from the operator point of view. Now I'd like to move on to the (Stream)Task point of view. I think here lay the problems that we missed and Till rightly points those out. *When should we call finish()* In the current implementation we call finish() once we receive the EndOfPartitionEvent which, as Till said, makes it impossible to bring down the entire topology in a single savepoint. We can finish all chained operators in a single Task, but not all Tasks in a DAG. It makes it also a bit harder to distinguish if we should call the finish() then or not. Point taken and it is an important omission from our side, in my opinion. However, we introduced EndOfUserRecordsEvent which says there will be no more records, but the channel is not closed and you can expect other events such as e.g. CheckpointBarriers. The original idea behind this event was to make unaligned checkpoints work well with checkpoints after tasks finished. I think we could and should use that event to call the finish() method, though. That way we could call finish() before the producer task finishes. This would also solve the problem of savepoint w/o drain, as this event would not be emitted in that case and we would not call the finish(). Having that a StreamTask could finish if all following conditions are met: * in case of all data consumed o received EndOfUserRecordsEvent from all channels o called finish() method o checkpoint was triggered and its completion was acknowledged (notifyCheckpointComplete) o received EndOfPartitionEvent from all channels * stop-with-savepoint w/o drain o savepoint acknowledged o received EndOfPartitionEvent from all channels We can distinguish that from the previous case, because we would receive EndOfPartitionEvent from all channels without receiving EndOfUserRecordsEvent before that. Lets then see how this could look for end of data for two Tasks: Task1 (Source) -> Task2 Task1: (consume all data) -> call finish() -> send EndOfUserRecordsEvent -> wait for checkpoint x trigger -> wait for checkpoint x complete + wait for EndOfUserRecordsEvent acknowledged -> close / send EndOfPartitionEvent Task2: receive EndOfUserRecordsEvent -> send EndOfUserRecordsEvent -> wait for checkpoint y barrier (it could happen that y=x) -> wait for checkpoint y complete + wait for EndOfUserRecordsEvent acknowledged -> close The savepoint w drain is very similar, it just replaces consume all data, and in this case y will always be x, because all sources will finish with the same savepoint. For savepoint w/o drain it's slightly different: Task1: savepoint w/o drain -> wait for savepoint complete -> close / send EndOfPartitionEvent Task 2: EndOfPartition received + wait for savepoint complete -> close Does that make sense? Best, Dawid On 16/07/2021 11:26, Till Rohrmann wrote: > Ok, so the plan is that finish() will flush all pending events and then > send the MAX_WATERMARK. > > What I am missing is the connection between the lifecycle of the operator > and signals the StreamTask and Task might receive (and also their life > cycles). At the moment we do have the EndOfPartitionEvent that we send once > the Task reaches its end. Based on what signals will > StreamOperator.finish() be called? What kind of other signals for the > StreamTask do we have to introduce in order to support stop-with-savepoint > w/o --drain if we don't want to call finish()? > > If finish() sends MAX_WATERMARK after flushing all pending events and > finish() itself is being triggered by some endOfInputEvent from the > StreamTask, then we might actually send MAX_WATERMARK multiple times. > > I think what I am lacking a bit is how the StreamOperator lifecycle will > fit together with the lifecycle of its owner and the corresponding signals > we have to send. > > Cheers, > Till > > Cheers, > Till > > On Fri, Jul 16, 2021 at 10:40 AM Yun Gao <yungao...@aliyun.com> wrote: > >> Hi Till, >> >> Sorry that I do not see the reply when sending the last mail, I think as a >> whole we are on the same page for the >> 1. For normal stop-with-savepoint, we do not call finish() and do not emit >> MAX_WATERMARK >> 2. For normal finish / stop-with-savepoint --drain, we would call finish() >> and emit MAX_WATERMARK >> >>> But then there is the question, how do we signal the operator that the >> next checkpoint is supposed to stop the operator >>> (how will the operator's lifecycle look in this case)? Maybe we simply >> don't tell the operator and handle this situation on the >>> StreamTask level. >> Logically I think in this case UDF seems do not need to know the next >> checkpoint is supposed to stop the operator since the final >> checkpoint in this case have no difference with the ordinary checkpoints. >> >>> So I guess the question is will finish() advance the time to the end or >> is this a separate mechanism (e.g. explicit watermarks). >> >> I tend to have an explicit MAX_WATERMARK since it makes watermark >> processing to be unified with normal cases and make the meanings of >> each event explicit. But this might be a private preference and both >> methods would work. >> >> Very sorry for not making the whole thing clear in the FLIP again, if >> there are no other concerns I'll update the FLIP with the above conclusions >> to make it precise in this part. >> >> >> Best, >> Yun >> >> ------------------------------------------------------------------ >> From:Till Rohrmann <trohrm...@apache.org> >> Send Time:2021 Jul. 16 (Fri.) 16:00 >> To:dev <dev@flink.apache.org> >> Cc:Yun Gao <yungao...@aliyun.com> >> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks >> Finished >> >> I think we should try to sort this out because it might affect how and >> when finish() will be called (or in general how the operator lifecycle >> looks like). >> >> To give an example let's take a look at the stop-with-savepoint w/ and w/o >> --drain: >> >> 1) stop-with-savepoint w/o --drain: Conceptually what we would like to do >> is to stop processing w/o completing any windows or waiting for the >> AsyncOperator to finish its operations. All these unfinished operations >> should become part of the final checkpoint so that we can resume from it at >> a later point. Depending on what finish() does (flush unfinished windows or >> not), this method must or must not be called. Assuming that finish() >> flushes unfinished windows/waits for uncompleted async operations, we >> clearly shouldn't call it. But then there is the question, how do we >> signal the operator that the next checkpoint is supposed to stop the >> operator (how will the operator's lifecycle look in this case)? Maybe we >> simply don't tell the operator and handle this situation on the StreamTask >> level. If finish() does not flush unfinished windows, then it shouldn't be >> a problem. >> >> 2) stop-with-savepoint w/ --drain: Here we want to complete all pending >> operations and flush out all results because we don't intend to resume the >> job. Conceptually, we tell the system that we have reached MAX_WATERMARK. >> If finish() is defined so that it implicitly advances the watermark to >> MAX_WATERMARK, then there is no problem. If finish() does not have this >> semantic, then we need to send the MAX_WATERMARK before sending the >> endOfInput event to a downstream task. In fact, stop-with-savepoint /w >> --drain shouldn't be a lot different from a bounded source that reaches its >> end. It would also send MAX_WATERMARK and then signal the endOfInput event >> (note that endOfInput is decoupled from the event time here). >> >> So I guess the question is will finish() advance the time to the end or is >> this a separate mechanism (e.g. explicit watermarks). >> >> Concerning how to handle processing time, I am a bit unsure tbh. I can see >> arguments for completing processing time windows/firing processing time >> timers when calling stop-with-savepoint w/ --drain. On the other hand, I >> could also see that people want to define actions based on the wall clock >> time that are independent of the stream state and, thus, would want to >> ignore them if the Flink application is stopped before reaching this time. >> >> Cheers, >> Till >> >> On Fri, Jul 16, 2021 at 7:48 AM Piotr Nowojski <pnowoj...@apache.org> >> wrote: >> Hi Till, >> >>> 1) Does endOfInput entail sending of the MAX_WATERMARK? >>> >>> 2) StreamOperator.finish says to flush all buffered events. Would a >>> WindowOperator close all windows and emit the results upon calling >>> finish, for example? >> 1) currently they are independent but parallel mechanisms. With event time, >> they are basically the same. >> 2) it probably should for the sake of processing time windows. >> >> Here you are touching the bit of the current design that I like the least. >> We basically have now three different ways of conveying very similar >> things: >> a) sending `MAX_WATERMARK`, used by event time WindowOperator (what about >> processing time?) >> b) endInput(), used for example by AsyncWaitOperator to flush it's internal >> state >> c) finish(), used for example by ContinuousFileReaderOperator >> >> It's a bit messy and I'm not sure if this should be strengthened out? Each >> one of those has a little bit different semantic/meaning, but at the same >> time they are very similar. For single input operators `endInput()` and >> `finish()` are actually the very same thing. >> >> Piotrek >> >> czw., 15 lip 2021 o 16:47 Till Rohrmann <trohrm...@apache.org> napisaĆ(a): >> >>> Thanks for updating the FLIP. Based on the new section about >>> stop-with-savepoint [--drain] I got two other questions: >>> >>> 1) Does endOfInput entail sending of the MAX_WATERMARK? >>> >>> 2) StreamOperator.finish says to flush all buffered events. Would a >>> WindowOperator close all windows and emit the results upon calling >>> finish, for example? >>> >>> Cheers, >>> Till >>> >>> On Thu, Jul 15, 2021 at 10:15 AM Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Thanks a lot for your answers and clarifications Yun. >>>> >>>> 1+2) Agreed, this can be a future improvement if this becomes a >> problem. >>>> 3) Great, this will help a lot with understanding the FLIP. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Wed, Jul 14, 2021 at 5:41 PM Yun Gao <yungao...@aliyun.com.invalid> >>>> wrote: >>>> >>>>> Hi Till, >>>>> >>>>> Very thanks for the review and comments! >>>>> >>>>> 1) First I think in fact we could be able to do the computation >> outside >>>>> of the main thread, >>>>> and the current implementation mainly due to the computation is in >>>>> general fast and we >>>>> initially want to have a simplified first version. >>>>> >>>>> The main requirement here is to have a constant view of the state of >> the >>>>> tasks, otherwise >>>>> for example if we have A -> B, if A is running when we check if we >> need >>>>> to trigger A, we will >>>>> mark A as have to trigger, but if A gets to finished when we check B, >> we >>>>> will also mark B as >>>>> have to trigger, then B will receive both rpc trigger and checkpoint >>>>> barrier, which would break >>>>> some assumption on the task side and complicate the implementation. >>>>> >>>>> But to cope this issue, we in fact could first have a snapshot of the >>>>> tasks' state and then do the >>>>> computation, both the two step do not need to be in the main thread. >>>>> >>>>> 2) For the computation logic, in fact currently we benefit a lot from >>>>> some shortcuts on all-to-all >>>>> edges and job vertex with all tasks running, these shortcuts could do >>>>> checks on the job vertex level >>>>> first and skip some job vertices as a whole. With this optimization we >>>>> have a O(V) algorithm, and the >>>>> current running time of the worst case for a job with 320,000 tasks is >>>>> less than 100ms. For >>>>> daily graph sizes the time would be further reduced linearly. >>>>> >>>>> If we do the computation based on the last triggered tasks, we may not >>>>> easily encode this information >>>>> into the shortcuts on the job vertex level. And since the time seems >> to >>>>> be short, perhaps it is enough >>>>> to do re-computation from the scratch in consideration of the tradeoff >>>>> between the performance and >>>>> the complexity ? >>>>> >>>>> 3) We are going to emit the EndOfInput event exactly after the >> finish() >>>>> method and before the last >>>>> snapshotState() method so that we could shut down the whole topology >>> with >>>>> a single final checkpoint. >>>>> Very sorry for not include enough details for this part and I'll >>>>> complement the FLIP with the details on >>>>> the process of the final checkpoint / savepoint. >>>>> >>>>> Best, >>>>> Yun >>>>> >>>>> >>>>> >>>>> ------------------------------------------------------------------ >>>>> From:Till Rohrmann <trohrm...@apache.org> >>>>> Send Time:2021 Jul. 14 (Wed.) 22:05 >>>>> To:dev <dev@flink.apache.org> >>>>> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks >>>>> Finished >>>>> >>>>> Hi everyone, >>>>> >>>>> I am a bit late to the voting party but let me ask three questions: >>>>> >>>>> 1) Why do we execute the trigger plan computation in the main thread >> if >>> we >>>>> cannot guarantee that all tasks are still running when triggering the >>>>> checkpoint? Couldn't we do the computation in a different thread in >>> order >>>>> to relieve the main thread a bit. >>>>> >>>>> 2) The implementation of the DefaultCheckpointPlanCalculator seems to >> go >>>>> over the whole topology for every calculation. Wouldn't it be more >>>>> efficient to maintain the set of current tasks to trigger and check >>>>> whether >>>>> anything has changed and if so check the succeeding tasks until we >> have >>>>> found the current checkpoint trigger frontier? >>>>> >>>>> 3) When are we going to send the endOfInput events to a downstream >> task? >>>>> If >>>>> this happens after we call finish on the upstream operator but before >>>>> snapshotState then it would be possible to shut down the whole >> topology >>>>> with a single final checkpoint. I think this part could benefit from a >>> bit >>>>> more detailed description in the FLIP. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Fri, Jul 2, 2021 at 8:36 AM Yun Gao <yungao...@aliyun.com.invalid> >>>>> wrote: >>>>> >>>>>> Hi there, >>>>>> >>>>>> Since the voting time of FLIP-147[1] has passed, I'm closing the >> vote >>>>> now. >>>>>> There were seven +1 votes ( 6 / 7 are bindings) and no -1 votes: >>>>>> >>>>>> - Dawid Wysakowicz (binding) >>>>>> - Piotr Nowojski(binding) >>>>>> - Jiangang Liu (binding) >>>>>> - Arvid Heise (binding) >>>>>> - Jing Zhang (binding) >>>>>> - Leonard Xu (non-binding) >>>>>> - Guowei Ma (binding) >>>>>> >>>>>> Thus I'm happy to announce that the update to the FLIP-147 is >>> accepted. >>>>>> Very thanks everyone! >>>>>> >>>>>> Best, >>>>>> Yun >>>>>> >>>>>> [1] https://cwiki.apache.org/confluence/x/mw-ZCQ >>>>> >> >>
OpenPGP_signature
Description: OpenPGP digital signature