Personally I don't find this optimization important and I'd rather leave it out not to complicate the codebase further. I doubt we save much there. I don't have a strong opinion though.
Best, Dawid On 19/07/2021 10:31, Yun Gao wrote: > Hi, > > Very thanks Dawid for the thoughts! > > Currently I also do not have different opinions regarding this part. > But I have one more issue to confirm: during the previous discussion we > have discussed that for the final checkpoint case, we might have an > optmization > that if a task do not have operators using 2-pc, we might skip waiting > for the > final checkpoint (but we could not skip the savepoint). To allow users > to express > the logic, we have proposed to add one more method to StreamOperator & > CheckpointListener: > > |interface| |StreamOperator {| > | ||default| |boolean| |requiresFinalCheckpoint() {| > | ||return| |true||;| > | ||}| > |}| > > |interface| |CheckpointListener {| > > | ||default| |boolean| |requiresFinalCheckpoint() {| > | ||return| |true||;| > | ||}| > |}| > > |class| |AbstractUdfStreamOperator {| > | | > | ||@Override| > | ||boolean| |requiresFinalCheckpoint() {| > | ||return| |userFunction ||instanceof| |CheckpointListener &&| > | ||((CheckpointListener) userFunction).requiresFinalCheckpoint();| > | ||}| > |}| > | > | > > I think we should still keep the change ? > > Best, > Yun > > ------------------Original Mail ------------------ > *Sender:*Dawid Wysakowicz <dwysakow...@apache.org> > *Send Date:*Sun Jul 18 18:44:50 2021 > *Recipients:*Flink Dev <dev@flink.apache.org>, Yun Gao > <yungao...@aliyun.com.INVALID> > *Subject:*Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After > Tasks Finished > > I think we're all really close to the same solution. > > > > I second Yun's thoughts that MAX_WATERMARK works well for time > based > > buffering, but it does not solve flushing other operations > such as e.g. > > count windows or batching requests in Sinks. I'd prefer to > treat the > > finish() as a message for Operator to "flush all records". The > > MAX_WATERMARK in my opinion is mostly for backwards > compatibility imo. I > > don't think operators need to get a signal "stop-processing" > if they > > don't need to flush records. The "WHEN" records are emitted, > should be > > in control of the StreamTask, by firing timers or by > processing a next > > record from upstream. > > > > The only difference of my previous proposal compared to Yun's > is that I > > did not want to send the EndOfUserRecords event in case of > stop w/o > > drain. My thinking was that we could directly go from RUNNING to > > WAITING_FOR_FINAL_CP on EndOfPartitionEvent. I agree we could emit > > EndOfUserRecordsEvent with an additional flag and e.g. stop firing > > timers and processing events (without calling finish() on > Operator). In > > my initial suggestion I though we don't care about some events > > potentially being emitted after the savepoint was taken, as > they would > > anyway belong to the next after FINAL, which would be > discarded. I think > > though the proposal to suspend records processing and timers is a > > sensible thing to do and would go with the version that Yun > put into the > > FLIP Wiki. > > > > What do you think Till? > > > > Best, > > > > Dawid > > > > > > On 16/07/2021 10:03, Yun Gao wrote: > > > Hi Till, Piotr > > > > > > Very thanks for the comments! > > > > > >> 1) Does endOfInput entail sending of the MAX_WATERMARK? > > > I also agree with Piotr that currently they are independent > mechanisms, and they are basically the same > > > for the event time. > > > > > > For more details, first there are some difference among the > three scenarios regarding the finish: > > > For normal finish and stop-with-savepoint --drain, the job > would not be expected to be restarted, > > > and for stop-with-savepoint the job would be expected > restart later. > > > > > > Then for finish / stop-with-savepoint --drain, currently > Flink would emit MAX_WATERMARK before the > > > EndOfPartition. Besides, as we have discussed before [1], > endOfInput / finish() should also only be called > > > for finish / stop-with-savepoint --drain. Thus currently > they always occurs at the same time. After the change, > > > we could emit MAX_WATERMARK before endOfInput event for the > finish / stop-with-savepoint --drain cases. > > > > > >> 2) StreamOperator.finish says to flush all buffered events. > Would a > > >> WindowOperator close all windows and emit the results upon > calling > > >> finish, for example? > > > As discussed above for stop-with-savepoint, we would always > keep the window as is, and restore them after restart. > > > Then for the finish / stop-with-savepoint --drain, I think > perhaps it depends on the Triggers. For > > > event-time triggers / process time triggers, it would be > reasonable to flush all the windows since logically > > > the time would always elapse and the window would always get > triggered in a logical future. But for triggers > > > like CountTrigger, no matter how much time pass logically, > the windows would not trigger, thus we may not > > > flush these windows. If there are requirements we may > provide additional triggers. > > > > > >> It's a bit messy and I'm not sure if this should be > strengthened out? Each one of those has a little bit different > semantic/meaning, > > >> but at the same time they are very similar. For single > input operators `endInput()` and `finish()` are actually the > very same thing. > > > Currently MAX_WATERMARK / endInput / finish indeed always > happen at the same time, and for single input operators > `endInput()` and `finish()` > > > are indeed the same thing. During the last discussion we > ever mentioned this issue and at then we thought that we might > deprecate `endInput()` > > > in the future, then we would only have endInput(int input) > and finish(). > > > > > > Best, > > > Yun > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-21132 > > > > > > > > > > > > > ------------------------------------------------------------------ > > > From:Piotr Nowojski > > > Send Time:2021 Jul. 16 (Fri.) 13:48 > > > To:dev > > > Cc:Yun Gao > > > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint > After Tasks Finished > > > > > > Hi Till, > > > > > >> 1) Does endOfInput entail sending of the MAX_WATERMARK? > > >> > > >> 2) StreamOperator.finish says to flush all buffered events. > Would a> WindowOperator close all windows and emit the results > upon calling > > >> finish, for example? > > > 1) currently they are independent but parallel mechanisms. > With event time, they are basically the same. > > > 2) it probably should for the sake of processing time windows. > > > > > > Here you are touching the bit of the current design that I > like the least. We basically have now three different ways of > conveying very similar things: > > > a) sending `MAX_WATERMARK`, used by event time > WindowOperator (what about processing time?) > > > b) endInput(), used for example by AsyncWaitOperator to > flush it's internal state > > > c) finish(), used for example by ContinuousFileReaderOperator > > > > > > It's a bit messy and I'm not sure if this should be > strengthened out? Each one of those has a little bit different > semantic/meaning, but at the same time they are very similar. > For single input operators `endInput()` and `finish()` are > actually the very same thing. > > > > > > Piotrek > > > czw., 15 lip 2021 o 16:47 Till Rohrmann napisał(a): > > > Thanks for updating the FLIP. Based on the new section about > > > stop-with-savepoint [--drain] I got two other questions: > > > > > > 1) Does endOfInput entail sending of the MAX_WATERMARK? > > > > > > 2) StreamOperator.finish says to flush all buffered events. > Would a > > > WindowOperator close all windows and emit the results upon > calling > > > finish, for example? > > > > > > Cheers, > > > Till > > > > > > On Thu, Jul 15, 2021 at 10:15 AM Till Rohrmann wrote: > > > > > > > Thanks a lot for your answers and clarifications Yun. > > > > > > > > 1+2) Agreed, this can be a future improvement if this > becomes a problem. > > > > > > > > 3) Great, this will help a lot with understanding the FLIP. > > > > > > > > Cheers, > > > > Till > > > > > > > > On Wed, Jul 14, 2021 at 5:41 PM Yun Gao > > > > wrote: > > > > > > > >> Hi Till, > > > >> > > > >> Very thanks for the review and comments! > > > >> > > > >> 1) First I think in fact we could be able to do the > computation outside > > > >> of the main thread, > > > >> and the current implementation mainly due to the > computation is in > > > >> general fast and we > > > >> initially want to have a simplified first version. > > > >> > > > >> The main requirement here is to have a constant view of > the state of the > > > >> tasks, otherwise > > > >> for example if we have A -> B, if A is running when we > check if we need > > > >> to trigger A, we will > > > >> mark A as have to trigger, but if A gets to finished when > we check B, we > > > >> will also mark B as > > > >> have to trigger, then B will receive both rpc trigger and > checkpoint > > > >> barrier, which would break > > > >> some assumption on the task side and complicate the > implementation. > > > >> > > > >> But to cope this issue, we in fact could first have a > snapshot of the > > > >> tasks' state and then do the > > > >> computation, both the two step do not need to be in the > main thread. > > > >> > > > >> 2) For the computation logic, in fact currently we > benefit a lot from > > > >> some shortcuts on all-to-all > > > >> edges and job vertex with all tasks running, these > shortcuts could do > > > >> checks on the job vertex level > > > >> first and skip some job vertices as a whole. With this > optimization we > > > >> have a O(V) algorithm, and the > > > >> current running time of the worst case for a job with > 320,000 tasks is > > > >> less than 100ms. For > > > >> daily graph sizes the time would be further reduced linearly. > > > >> > > > >> If we do the computation based on the last triggered > tasks, we may not > > > >> easily encode this information > > > >> into the shortcuts on the job vertex level. And since the > time seems to > > > >> be short, perhaps it is enough > > > >> to do re-computation from the scratch in consideration of > the tradeoff > > > >> between the performance and > > > >> the complexity ? > > > >> > > > >> 3) We are going to emit the EndOfInput event exactly > after the finish() > > > >> method and before the last > > > >> snapshotState() method so that we could shut down the > whole topology with > > > >> a single final checkpoint. > > > >> Very sorry for not include enough details for this part > and I'll > > > >> complement the FLIP with the details on > > > >> the process of the final checkpoint / savepoint. > > > >> > > > >> Best, > > > >> Yun > > > >> > > > >> > > > >> > > > >> > ------------------------------------------------------------------ > > > >> From:Till Rohrmann > > > >> Send Time:2021 Jul. 14 (Wed.) 22:05 > > > >> To:dev > > > >> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint > After Tasks > > > >> Finished > > > >> > > > >> Hi everyone, > > > >> > > > >> I am a bit late to the voting party but let me ask three > questions: > > > >> > > > >> 1) Why do we execute the trigger plan computation in the > main thread if we > > > >> cannot guarantee that all tasks are still running when > triggering the > > > >> checkpoint? Couldn't we do the computation in a different > thread in order > > > >> to relieve the main thread a bit. > > > >> > > > >> 2) The implementation of the > DefaultCheckpointPlanCalculator seems to go > > > >> over the whole topology for every calculation. Wouldn't > it be more > > > >> efficient to maintain the set of current tasks to trigger > and check > > > >> whether > > > >> anything has changed and if so check the succeeding tasks > until we have > > > >> found the current checkpoint trigger frontier? > > > >> > > > >> 3) When are we going to send the endOfInput events to a > downstream task? > > > >> If > > > >> this happens after we call finish on the upstream > operator but before > > > >> snapshotState then it would be possible to shut down the > whole topology > > > >> with a single final checkpoint. I think this part could > benefit from a bit > > > >> more detailed description in the FLIP. > > > >> > > > >> Cheers, > > > >> Till > > > >> > > > >> On Fri, Jul 2, 2021 at 8:36 AM Yun Gao > > > >> wrote: > > > >> > > > >> > Hi there, > > > >> > > > > >> > Since the voting time of FLIP-147[1] has passed, I'm > closing the vote > > > >> now. > > > >> > > > > >> > There were seven +1 votes ( 6 / 7 are bindings) and no > -1 votes: > > > >> > > > > >> > - Dawid Wysakowicz (binding) > > > >> > - Piotr Nowojski(binding) > > > >> > - Jiangang Liu (binding) > > > >> > - Arvid Heise (binding) > > > >> > - Jing Zhang (binding) > > > >> > - Leonard Xu (non-binding) > > > >> > - Guowei Ma (binding) > > > >> > > > > >> > Thus I'm happy to announce that the update to the > FLIP-147 is accepted. > > > >> > > > > >> > Very thanks everyone! > > > >> > > > > >> > Best, > > > >> > Yun > > > >> > > > > >> > [1] https://cwiki.apache.org/confluence/x/mw-ZCQ > > > >> > > > >> > > > > > >
OpenPGP_signature
Description: OpenPGP digital signature