Hi Yun and Dawid, Thanks for your comments. I do agree with your comments that finish() can do more than MAX_WATERMARK. I guess we should then explain how MAX_WATERMARK and finish() play together and what kind of order guarantees we provide.
Concerning the EndOfPartitionEvent, I am not entirely sure whether it would work in its current state because we send this event when the Task is about to shut down if I am not mistaken. What we want to have is to bring the StreamTasks into a state so that they shut down on the next checkpoint. For this we need to keep the StreamTask running. In general, I am a fan of making things explicit if possible. I think this helps maintenance and evolvability of code. That's why I think sending an EndOfInputEvent which is a StreamTask level event and which says that there won't be any other records coming only control events could make sense. I would leave the proposed optimization out of the first version. We can still add it at a later point in time. Cheers, Till On Mon, Jul 19, 2021 at 10:35 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Personally I don't find this optimization important and I'd rather leave > it out not to complicate the codebase further. I doubt we save much there. > I don't have a strong opinion though. > > Best, > > Dawid > On 19/07/2021 10:31, Yun Gao wrote: > > Hi, > > Very thanks Dawid for the thoughts! > > Currently I also do not have different opinions regarding this part. > But I have one more issue to confirm: during the previous discussion we > have discussed that for the final checkpoint case, we might have an > optmization > that if a task do not have operators using 2-pc, we might skip waiting for > the > final checkpoint (but we could not skip the savepoint). To allow users to > express > the logic, we have proposed to add one more method to StreamOperator & > CheckpointListener: > > interface StreamOperator { > default boolean requiresFinalCheckpoint() { > return true; > } > } > > interface CheckpointListener { > > default boolean requiresFinalCheckpoint() { > return true; > } > } > > class AbstractUdfStreamOperator { > > @Override > boolean requiresFinalCheckpoint() { > return userFunction instanceof CheckpointListener && > ((CheckpointListener) userFunction).requiresFinalCheckpoint(); > } > } > > > I think we should still keep the change ? > > Best, > Yun > > ------------------Original Mail ------------------ > *Sender:*Dawid Wysakowicz <dwysakow...@apache.org> > <dwysakow...@apache.org> > *Send Date:*Sun Jul 18 18:44:50 2021 > *Recipients:*Flink Dev <dev@flink.apache.org> <dev@flink.apache.org>, Yun > Gao <yungao...@aliyun.com.INVALID> <yungao...@aliyun.com.INVALID> > *Subject:*Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks > Finished > >> I think we're all really close to the same solution. >> >> >> >> I second Yun's thoughts that MAX_WATERMARK works well for time based >> >> buffering, but it does not solve flushing other operations such as e.g. >> >> count windows or batching requests in Sinks. I'd prefer to treat the >> >> finish() as a message for Operator to "flush all records". The >> >> MAX_WATERMARK in my opinion is mostly for backwards compatibility imo. I >> >> don't think operators need to get a signal "stop-processing" if they >> >> don't need to flush records. The "WHEN" records are emitted, should be >> >> in control of the StreamTask, by firing timers or by processing a next >> >> record from upstream. >> >> >> >> The only difference of my previous proposal compared to Yun's is that I >> >> did not want to send the EndOfUserRecords event in case of stop w/o >> >> drain. My thinking was that we could directly go from RUNNING to >> >> WAITING_FOR_FINAL_CP on EndOfPartitionEvent. I agree we could emit >> >> EndOfUserRecordsEvent with an additional flag and e.g. stop firing >> >> timers and processing events (without calling finish() on Operator). In >> >> my initial suggestion I though we don't care about some events >> >> potentially being emitted after the savepoint was taken, as they would >> >> anyway belong to the next after FINAL, which would be discarded. I think >> >> though the proposal to suspend records processing and timers is a >> >> sensible thing to do and would go with the version that Yun put into the >> >> FLIP Wiki. >> >> >> >> What do you think Till? >> >> >> >> Best, >> >> >> >> Dawid >> >> >> >> >> >> On 16/07/2021 10:03, Yun Gao wrote: >> >> > Hi Till, Piotr >> >> > >> >> > Very thanks for the comments! >> >> > >> >> >> 1) Does endOfInput entail sending of the MAX_WATERMARK? >> >> > I also agree with Piotr that currently they are independent mechanisms, >> and they are basically the same >> >> > for the event time. >> >> > >> >> > For more details, first there are some difference among the three >> scenarios regarding the finish: >> >> > For normal finish and stop-with-savepoint --drain, the job would not be >> expected to be restarted, >> >> > and for stop-with-savepoint the job would be expected restart later. >> >> > >> >> > Then for finish / stop-with-savepoint --drain, currently Flink would >> emit MAX_WATERMARK before the >> >> > EndOfPartition. Besides, as we have discussed before [1], endOfInput / >> finish() should also only be called >> >> > for finish / stop-with-savepoint --drain. Thus currently they always >> occurs at the same time. After the change, >> >> > we could emit MAX_WATERMARK before endOfInput event for the finish / >> stop-with-savepoint --drain cases. >> >> > >> >> >> 2) StreamOperator.finish says to flush all buffered events. Would a >> >> >> WindowOperator close all windows and emit the results upon calling >> >> >> finish, for example? >> >> > As discussed above for stop-with-savepoint, we would always keep the >> window as is, and restore them after restart. >> >> > Then for the finish / stop-with-savepoint --drain, I think perhaps it >> depends on the Triggers. For >> >> > event-time triggers / process time triggers, it would be reasonable to >> flush all the windows since logically >> >> > the time would always elapse and the window would always get triggered >> in a logical future. But for triggers >> >> > like CountTrigger, no matter how much time pass logically, the windows >> would not trigger, thus we may not >> >> > flush these windows. If there are requirements we may provide >> additional triggers. >> >> > >> >> >> It's a bit messy and I'm not sure if this should be strengthened out? >> Each one of those has a little bit different semantic/meaning, >> >> >> but at the same time they are very similar. For single input operators >> `endInput()` and `finish()` are actually the very same thing. >> >> > Currently MAX_WATERMARK / endInput / finish indeed always happen at the >> same time, and for single input operators `endInput()` and `finish()` >> >> > are indeed the same thing. During the last discussion we ever mentioned >> this issue and at then we thought that we might deprecate `endInput()` >> >> > in the future, then we would only have endInput(int input) and >> finish(). >> >> > >> >> > Best, >> >> > Yun >> >> > >> >> > >> >> > [1] https://issues.apache.org/jira/browse/FLINK-21132 >> >> > >> >> > >> >> > >> >> > ------------------------------------------------------------------ >> >> > From:Piotr Nowojski >> >> > Send Time:2021 Jul. 16 (Fri.) 13:48 >> >> > To:dev >> >> > Cc:Yun Gao >> >> > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks >> Finished >> >> > >> >> > Hi Till, >> >> > >> >> >> 1) Does endOfInput entail sending of the MAX_WATERMARK? >> >> >> >> >> >> 2) StreamOperator.finish says to flush all buffered events. Would a> >> WindowOperator close all windows and emit the results upon calling >> >> >> finish, for example? >> >> > 1) currently they are independent but parallel mechanisms. With event >> time, they are basically the same. >> >> > 2) it probably should for the sake of processing time windows. >> >> > >> >> > Here you are touching the bit of the current design that I like the >> least. We basically have now three different ways of conveying very similar >> things: >> >> > a) sending `MAX_WATERMARK`, used by event time WindowOperator (what >> about processing time?) >> >> > b) endInput(), used for example by AsyncWaitOperator to flush it's >> internal state >> >> > c) finish(), used for example by ContinuousFileReaderOperator >> >> > >> >> > It's a bit messy and I'm not sure if this should be strengthened out? >> Each one of those has a little bit different semantic/meaning, but at the >> same time they are very similar. For single input operators `endInput()` >> and `finish()` are actually the very same thing. >> >> > >> >> > Piotrek >> >> > czw., 15 lip 2021 o 16:47 Till Rohrmann napisaĆ(a): >> >> > Thanks for updating the FLIP. Based on the new section about >> >> > stop-with-savepoint [--drain] I got two other questions: >> >> > >> >> > 1) Does endOfInput entail sending of the MAX_WATERMARK? >> >> > >> >> > 2) StreamOperator.finish says to flush all buffered events. Would a >> >> > WindowOperator close all windows and emit the results upon calling >> >> > finish, for example? >> >> > >> >> > Cheers, >> >> > Till >> >> > >> >> > On Thu, Jul 15, 2021 at 10:15 AM Till Rohrmann wrote: >> >> > >> >> > > Thanks a lot for your answers and clarifications Yun. >> >> > > >> >> > > 1+2) Agreed, this can be a future improvement if this becomes a >> problem. >> >> > > >> >> > > 3) Great, this will help a lot with understanding the FLIP. >> >> > > >> >> > > Cheers, >> >> > > Till >> >> > > >> >> > > On Wed, Jul 14, 2021 at 5:41 PM Yun Gao >> >> > > wrote: >> >> > > >> >> > >> Hi Till, >> >> > >> >> >> > >> Very thanks for the review and comments! >> >> > >> >> >> > >> 1) First I think in fact we could be able to do the computation >> outside >> >> > >> of the main thread, >> >> > >> and the current implementation mainly due to the computation is in >> >> > >> general fast and we >> >> > >> initially want to have a simplified first version. >> >> > >> >> >> > >> The main requirement here is to have a constant view of the state of >> the >> >> > >> tasks, otherwise >> >> > >> for example if we have A -> B, if A is running when we check if we >> need >> >> > >> to trigger A, we will >> >> > >> mark A as have to trigger, but if A gets to finished when we check >> B, we >> >> > >> will also mark B as >> >> > >> have to trigger, then B will receive both rpc trigger and checkpoint >> >> > >> barrier, which would break >> >> > >> some assumption on the task side and complicate the implementation. >> >> > >> >> >> > >> But to cope this issue, we in fact could first have a snapshot of the >> >> > >> tasks' state and then do the >> >> > >> computation, both the two step do not need to be in the main thread. >> >> > >> >> >> > >> 2) For the computation logic, in fact currently we benefit a lot from >> >> > >> some shortcuts on all-to-all >> >> > >> edges and job vertex with all tasks running, these shortcuts could do >> >> > >> checks on the job vertex level >> >> > >> first and skip some job vertices as a whole. With this optimization >> we >> >> > >> have a O(V) algorithm, and the >> >> > >> current running time of the worst case for a job with 320,000 tasks >> is >> >> > >> less than 100ms. For >> >> > >> daily graph sizes the time would be further reduced linearly. >> >> > >> >> >> > >> If we do the computation based on the last triggered tasks, we may >> not >> >> > >> easily encode this information >> >> > >> into the shortcuts on the job vertex level. And since the time seems >> to >> >> > >> be short, perhaps it is enough >> >> > >> to do re-computation from the scratch in consideration of the >> tradeoff >> >> > >> between the performance and >> >> > >> the complexity ? >> >> > >> >> >> > >> 3) We are going to emit the EndOfInput event exactly after the >> finish() >> >> > >> method and before the last >> >> > >> snapshotState() method so that we could shut down the whole topology >> with >> >> > >> a single final checkpoint. >> >> > >> Very sorry for not include enough details for this part and I'll >> >> > >> complement the FLIP with the details on >> >> > >> the process of the final checkpoint / savepoint. >> >> > >> >> >> > >> Best, >> >> > >> Yun >> >> > >> >> >> > >> >> >> > >> >> >> > >> ------------------------------------------------------------------ >> >> > >> From:Till Rohrmann >> >> > >> Send Time:2021 Jul. 14 (Wed.) 22:05 >> >> > >> To:dev >> >> > >> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks >> >> > >> Finished >> >> > >> >> >> > >> Hi everyone, >> >> > >> >> >> > >> I am a bit late to the voting party but let me ask three questions: >> >> > >> >> >> > >> 1) Why do we execute the trigger plan computation in the main thread >> if we >> >> > >> cannot guarantee that all tasks are still running when triggering the >> >> > >> checkpoint? Couldn't we do the computation in a different thread in >> order >> >> > >> to relieve the main thread a bit. >> >> > >> >> >> > >> 2) The implementation of the DefaultCheckpointPlanCalculator seems >> to go >> >> > >> over the whole topology for every calculation. Wouldn't it be more >> >> > >> efficient to maintain the set of current tasks to trigger and check >> >> > >> whether >> >> > >> anything has changed and if so check the succeeding tasks until we >> have >> >> > >> found the current checkpoint trigger frontier? >> >> > >> >> >> > >> 3) When are we going to send the endOfInput events to a downstream >> task? >> >> > >> If >> >> > >> this happens after we call finish on the upstream operator but before >> >> > >> snapshotState then it would be possible to shut down the whole >> topology >> >> > >> with a single final checkpoint. I think this part could benefit from >> a bit >> >> > >> more detailed description in the FLIP. >> >> > >> >> >> > >> Cheers, >> >> > >> Till >> >> > >> >> >> > >> On Fri, Jul 2, 2021 at 8:36 AM Yun Gao >> >> > >> wrote: >> >> > >> >> >> > >> > Hi there, >> >> > >> > >> >> > >> > Since the voting time of FLIP-147[1] has passed, I'm closing the >> vote >> >> > >> now. >> >> > >> > >> >> > >> > There were seven +1 votes ( 6 / 7 are bindings) and no -1 votes: >> >> > >> > >> >> > >> > - Dawid Wysakowicz (binding) >> >> > >> > - Piotr Nowojski(binding) >> >> > >> > - Jiangang Liu (binding) >> >> > >> > - Arvid Heise (binding) >> >> > >> > - Jing Zhang (binding) >> >> > >> > - Leonard Xu (non-binding) >> >> > >> > - Guowei Ma (binding) >> >> > >> > >> >> > >> > Thus I'm happy to announce that the update to the FLIP-147 is >> accepted. >> >> > >> > >> >> > >> > Very thanks everyone! >> >> > >> > >> >> > >> > Best, >> >> > >> > Yun >> >> > >> > >> >> > >> > [1] https://cwiki.apache.org/confluence/x/mw-ZCQ >> >> > >> >> >> > >> >> >> > >> >> >> >>