Hi, Very thanks Dawid for the thoughts!
Currently I also do not have different opinions regarding this part. But I have one more issue to confirm: during the previous discussion we have discussed that for the final checkpoint case, we might have an optmization that if a task do not have operators using 2-pc, we might skip waiting for the final checkpoint (but we could not skip the savepoint). To allow users to express the logic, we have proposed to add one more method to StreamOperator & CheckpointListener: interface StreamOperator { default boolean requiresFinalCheckpoint() { return true; } } interface CheckpointListener { default boolean requiresFinalCheckpoint() { return true; } } class AbstractUdfStreamOperator { @Override boolean requiresFinalCheckpoint() { return userFunction instanceof CheckpointListener && ((CheckpointListener) userFunction).requiresFinalCheckpoint(); } } I think we should still keep the change ? Best, Yun ------------------Original Mail ------------------ Sender:Dawid Wysakowicz <dwysakow...@apache.org> Send Date:Sun Jul 18 18:44:50 2021 Recipients:Flink Dev <dev@flink.apache.org>, Yun Gao <yungao...@aliyun.com.INVALID> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished I think we're all really close to the same solution. I second Yun's thoughts that MAX_WATERMARK works well for time based buffering, but it does not solve flushing other operations such as e.g. count windows or batching requests in Sinks. I'd prefer to treat the finish() as a message for Operator to "flush all records". The MAX_WATERMARK in my opinion is mostly for backwards compatibility imo. I don't think operators need to get a signal "stop-processing" if they don't need to flush records. The "WHEN" records are emitted, should be in control of the StreamTask, by firing timers or by processing a next record from upstream. The only difference of my previous proposal compared to Yun's is that I did not want to send the EndOfUserRecords event in case of stop w/o drain. My thinking was that we could directly go from RUNNING to WAITING_FOR_FINAL_CP on EndOfPartitionEvent. I agree we could emit EndOfUserRecordsEvent with an additional flag and e.g. stop firing timers and processing events (without calling finish() on Operator). In my initial suggestion I though we don't care about some events potentially being emitted after the savepoint was taken, as they would anyway belong to the next after FINAL, which would be discarded. I think though the proposal to suspend records processing and timers is a sensible thing to do and would go with the version that Yun put into the FLIP Wiki. What do you think Till? Best, Dawid On 16/07/2021 10:03, Yun Gao wrote: > Hi Till, Piotr > > Very thanks for the comments! > >> 1) Does endOfInput entail sending of the MAX_WATERMARK? > I also agree with Piotr that currently they are independent mechanisms, and > they are basically the same > for the event time. > > For more details, first there are some difference among the three scenarios > regarding the finish: > For normal finish and stop-with-savepoint --drain, the job would not be > expected to be restarted, > and for stop-with-savepoint the job would be expected restart later. > > Then for finish / stop-with-savepoint --drain, currently Flink would emit > MAX_WATERMARK before the > EndOfPartition. Besides, as we have discussed before [1], endOfInput / > finish() should also only be called > for finish / stop-with-savepoint --drain. Thus currently they always occurs > at the same time. After the change, > we could emit MAX_WATERMARK before endOfInput event for the finish / > stop-with-savepoint --drain cases. > >> 2) StreamOperator.finish says to flush all buffered events. Would a >> WindowOperator close all windows and emit the results upon calling >> finish, for example? > As discussed above for stop-with-savepoint, we would always keep the window > as is, and restore them after restart. > Then for the finish / stop-with-savepoint --drain, I think perhaps it depends > on the Triggers. For > event-time triggers / process time triggers, it would be reasonable to flush > all the windows since logically > the time would always elapse and the window would always get triggered in a > logical future. But for triggers > like CountTrigger, no matter how much time pass logically, the windows would > not trigger, thus we may not > flush these windows. If there are requirements we may provide additional > triggers. > >> It's a bit messy and I'm not sure if this should be strengthened out? Each >> one of those has a little bit different semantic/meaning, >> but at the same time they are very similar. For single input operators >> `endInput()` and `finish()` are actually the very same thing. > Currently MAX_WATERMARK / endInput / finish indeed always happen at the same > time, and for single input operators `endInput()` and `finish()` > are indeed the same thing. During the last discussion we ever mentioned this > issue and at then we thought that we might deprecate `endInput()` > in the future, then we would only have endInput(int input) and finish(). > > Best, > Yun > > > [1] https://issues.apache.org/jira/browse/FLINK-21132 > > > > ------------------------------------------------------------------ > From:Piotr Nowojski > Send Time:2021 Jul. 16 (Fri.) 13:48 > To:dev > Cc:Yun Gao > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished > > Hi Till, > >> 1) Does endOfInput entail sending of the MAX_WATERMARK? >> >> 2) StreamOperator.finish says to flush all buffered events. Would a> >> WindowOperator close all windows and emit the results upon calling >> finish, for example? > 1) currently they are independent but parallel mechanisms. With event time, > they are basically the same. > 2) it probably should for the sake of processing time windows. > > Here you are touching the bit of the current design that I like the least. We > basically have now three different ways of conveying very similar things: > a) sending `MAX_WATERMARK`, used by event time WindowOperator (what about > processing time?) > b) endInput(), used for example by AsyncWaitOperator to flush it's internal > state > c) finish(), used for example by ContinuousFileReaderOperator > > It's a bit messy and I'm not sure if this should be strengthened out? Each > one of those has a little bit different semantic/meaning, but at the same > time they are very similar. For single input operators `endInput()` and > `finish()` are actually the very same thing. > > Piotrek > czw., 15 lip 2021 o 16:47 Till Rohrmann napisaĆ(a): > Thanks for updating the FLIP. Based on the new section about > stop-with-savepoint [--drain] I got two other questions: > > 1) Does endOfInput entail sending of the MAX_WATERMARK? > > 2) StreamOperator.finish says to flush all buffered events. Would a > WindowOperator close all windows and emit the results upon calling > finish, for example? > > Cheers, > Till > > On Thu, Jul 15, 2021 at 10:15 AM Till Rohrmann wrote: > > > Thanks a lot for your answers and clarifications Yun. > > > > 1+2) Agreed, this can be a future improvement if this becomes a problem. > > > > 3) Great, this will help a lot with understanding the FLIP. > > > > Cheers, > > Till > > > > On Wed, Jul 14, 2021 at 5:41 PM Yun Gao > > wrote: > > > >> Hi Till, > >> > >> Very thanks for the review and comments! > >> > >> 1) First I think in fact we could be able to do the computation outside > >> of the main thread, > >> and the current implementation mainly due to the computation is in > >> general fast and we > >> initially want to have a simplified first version. > >> > >> The main requirement here is to have a constant view of the state of the > >> tasks, otherwise > >> for example if we have A -> B, if A is running when we check if we need > >> to trigger A, we will > >> mark A as have to trigger, but if A gets to finished when we check B, we > >> will also mark B as > >> have to trigger, then B will receive both rpc trigger and checkpoint > >> barrier, which would break > >> some assumption on the task side and complicate the implementation. > >> > >> But to cope this issue, we in fact could first have a snapshot of the > >> tasks' state and then do the > >> computation, both the two step do not need to be in the main thread. > >> > >> 2) For the computation logic, in fact currently we benefit a lot from > >> some shortcuts on all-to-all > >> edges and job vertex with all tasks running, these shortcuts could do > >> checks on the job vertex level > >> first and skip some job vertices as a whole. With this optimization we > >> have a O(V) algorithm, and the > >> current running time of the worst case for a job with 320,000 tasks is > >> less than 100ms. For > >> daily graph sizes the time would be further reduced linearly. > >> > >> If we do the computation based on the last triggered tasks, we may not > >> easily encode this information > >> into the shortcuts on the job vertex level. And since the time seems to > >> be short, perhaps it is enough > >> to do re-computation from the scratch in consideration of the tradeoff > >> between the performance and > >> the complexity ? > >> > >> 3) We are going to emit the EndOfInput event exactly after the finish() > >> method and before the last > >> snapshotState() method so that we could shut down the whole topology with > >> a single final checkpoint. > >> Very sorry for not include enough details for this part and I'll > >> complement the FLIP with the details on > >> the process of the final checkpoint / savepoint. > >> > >> Best, > >> Yun > >> > >> > >> > >> ------------------------------------------------------------------ > >> From:Till Rohrmann > >> Send Time:2021 Jul. 14 (Wed.) 22:05 > >> To:dev > >> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks > >> Finished > >> > >> Hi everyone, > >> > >> I am a bit late to the voting party but let me ask three questions: > >> > >> 1) Why do we execute the trigger plan computation in the main thread if we > >> cannot guarantee that all tasks are still running when triggering the > >> checkpoint? Couldn't we do the computation in a different thread in order > >> to relieve the main thread a bit. > >> > >> 2) The implementation of the DefaultCheckpointPlanCalculator seems to go > >> over the whole topology for every calculation. Wouldn't it be more > >> efficient to maintain the set of current tasks to trigger and check > >> whether > >> anything has changed and if so check the succeeding tasks until we have > >> found the current checkpoint trigger frontier? > >> > >> 3) When are we going to send the endOfInput events to a downstream task? > >> If > >> this happens after we call finish on the upstream operator but before > >> snapshotState then it would be possible to shut down the whole topology > >> with a single final checkpoint. I think this part could benefit from a bit > >> more detailed description in the FLIP. > >> > >> Cheers, > >> Till > >> > >> On Fri, Jul 2, 2021 at 8:36 AM Yun Gao > >> wrote: > >> > >> > Hi there, > >> > > >> > Since the voting time of FLIP-147[1] has passed, I'm closing the vote > >> now. > >> > > >> > There were seven +1 votes ( 6 / 7 are bindings) and no -1 votes: > >> > > >> > - Dawid Wysakowicz (binding) > >> > - Piotr Nowojski(binding) > >> > - Jiangang Liu (binding) > >> > - Arvid Heise (binding) > >> > - Jing Zhang (binding) > >> > - Leonard Xu (non-binding) > >> > - Guowei Ma (binding) > >> > > >> > Thus I'm happy to announce that the update to the FLIP-147 is accepted. > >> > > >> > Very thanks everyone! > >> > > >> > Best, > >> > Yun > >> > > >> > [1] https://cwiki.apache.org/confluence/x/mw-ZCQ > >> > >> >