Personally I don't find this optimization important and I'd rather leave
it out not to complicate the codebase further. I doubt we save much
there. I don't have a strong opinion though.

Best,

Dawid

On 19/07/2021 10:31, Yun Gao wrote:
> Hi,
>
> Very thanks Dawid for the thoughts!
>
> Currently I also do not have different opinions regarding this part. 
> But I have one more issue to confirm: during the previous discussion we 
> have discussed that for the final checkpoint case, we might have an
> optmization
> that if a task do not have operators using 2-pc, we might skip waiting
> for the 
> final checkpoint (but we could not skip the savepoint). To allow users
> to express
> the logic, we have proposed to add one more method to StreamOperator &
> CheckpointListener:
>
> |interface| |StreamOperator {|
> |    ||default| |boolean| |requiresFinalCheckpoint() {|
> |        ||return| |true||;|
> |    ||}|
> |}|
>  
> |interface| |CheckpointListener {|
>  
> |    ||default| |boolean| |requiresFinalCheckpoint() {|
> |        ||return| |true||;|
> |    ||}|
> |}|
>  
> |class| |AbstractUdfStreamOperator {|
> |    | 
> |    ||@Override|
> |    ||boolean| |requiresFinalCheckpoint() {|
> |        ||return| |userFunction ||instanceof| |CheckpointListener &&|
> |            ||((CheckpointListener) userFunction).requiresFinalCheckpoint();|
> |    ||}|
> |}|
> |
> |
>
> I think we should still keep the change ? 
>
> Best,
> Yun
>
>     ------------------Original Mail ------------------
>     *Sender:*Dawid Wysakowicz <dwysakow...@apache.org>
>     *Send Date:*Sun Jul 18 18:44:50 2021
>     *Recipients:*Flink Dev <dev@flink.apache.org>, Yun Gao
>     <yungao...@aliyun.com.INVALID>
>     *Subject:*Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After
>     Tasks Finished
>
>         I think we're all really close to the same solution.
>
>
>
>         I second Yun's thoughts that MAX_WATERMARK works well for time
>         based
>
>         buffering, but it does not solve flushing other operations
>         such as e.g.
>
>         count windows or batching requests in Sinks. I'd prefer to
>         treat the
>
>         finish() as a message for Operator to "flush all records". The
>
>         MAX_WATERMARK in my opinion is mostly for backwards
>         compatibility imo. I
>
>         don't think operators need to get a signal "stop-processing"
>         if they
>
>         don't need to flush records. The "WHEN" records are emitted,
>         should be
>
>         in control of the StreamTask, by firing timers or by
>         processing a next
>
>         record from upstream.
>
>
>
>         The only difference of my previous proposal compared to Yun's
>         is that I
>
>         did not want to send the EndOfUserRecords event in case of
>         stop w/o
>
>         drain. My thinking was that we could directly go from RUNNING to
>
>         WAITING_FOR_FINAL_CP on EndOfPartitionEvent. I agree we could emit
>
>         EndOfUserRecordsEvent with an additional flag and e.g. stop firing
>
>         timers and processing events (without calling finish() on
>         Operator). In
>
>         my initial suggestion I though we don't care about some events
>
>         potentially being emitted after the savepoint was taken, as
>         they would
>
>         anyway belong to the next after FINAL, which would be
>         discarded. I think
>
>         though the proposal to suspend records processing and timers is a
>
>         sensible thing to do and would go with the version that Yun
>         put into the
>
>         FLIP Wiki.
>
>
>
>         What do you think Till?
>
>
>
>         Best,
>
>
>
>         Dawid
>
>
>
>
>
>         On 16/07/2021 10:03, Yun Gao wrote:
>
>         > Hi Till, Piotr
>
>         >
>
>         > Very thanks for the comments!
>
>         >
>
>         >> 1) Does endOfInput entail sending of the MAX_WATERMARK?
>
>         > I also agree with Piotr that currently they are independent
>         mechanisms, and they are basically the same
>
>         > for the event time.
>
>         >
>
>         > For more details, first there are some difference among the
>         three scenarios regarding the finish:
>
>         > For normal finish and stop-with-savepoint --drain, the job
>         would not be expected to be restarted,
>
>         > and for stop-with-savepoint the job would be expected
>         restart later.
>
>         >
>
>         > Then for finish / stop-with-savepoint --drain, currently
>         Flink would emit MAX_WATERMARK before the
>
>         > EndOfPartition. Besides, as we have discussed before [1],
>         endOfInput / finish() should also only be called
>
>         > for finish / stop-with-savepoint --drain. Thus currently
>         they always occurs at the same time. After the change,
>
>         > we could emit MAX_WATERMARK before endOfInput event for the
>         finish / stop-with-savepoint --drain cases.
>
>         >
>
>         >> 2) StreamOperator.finish says to flush all buffered events.
>         Would a
>
>         >> WindowOperator close all windows and emit the results upon
>         calling
>
>         >> finish, for example?
>
>         > As discussed above for stop-with-savepoint, we would always
>         keep the window as is, and restore them after restart.
>
>         > Then for the finish / stop-with-savepoint --drain, I think
>         perhaps it depends on the Triggers. For
>
>         > event-time triggers / process time triggers, it would be
>         reasonable to flush all the windows since logically
>
>         > the time would always elapse and the window would always get
>         triggered in a logical future. But for triggers
>
>         > like CountTrigger, no matter how much time pass logically,
>         the windows would not trigger, thus we may not
>
>         > flush these windows. If there are requirements we may
>         provide additional triggers.
>
>         >
>
>         >> It's a bit messy and I'm not sure if this should be
>         strengthened out? Each one of those has a little bit different
>         semantic/meaning,
>
>         >> but at the same time they are very similar. For single
>         input operators `endInput()` and `finish()` are actually the
>         very same thing.
>
>         > Currently MAX_WATERMARK / endInput / finish indeed always
>         happen at the same time, and for single input operators
>         `endInput()` and `finish()`
>
>         > are indeed the same thing. During the last discussion we
>         ever mentioned this issue and at then we thought that we might
>         deprecate `endInput()`
>
>         > in the future, then we would only have endInput(int input)
>         and finish().
>
>         >
>
>         > Best,
>
>         > Yun
>
>         >
>
>         >
>
>         > [1] https://issues.apache.org/jira/browse/FLINK-21132
>
>         >
>
>         >
>
>         >
>
>         >
>         ------------------------------------------------------------------
>
>         > From:Piotr Nowojski
>
>         > Send Time:2021 Jul. 16 (Fri.) 13:48
>
>         > To:dev
>
>         > Cc:Yun Gao
>
>         > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint
>         After Tasks Finished
>
>         >
>
>         > Hi Till,
>
>         >
>
>         >> 1) Does endOfInput entail sending of the MAX_WATERMARK?
>
>         >>
>
>         >> 2) StreamOperator.finish says to flush all buffered events.
>         Would a> WindowOperator close all windows and emit the results
>         upon calling
>
>         >> finish, for example?
>
>         > 1) currently they are independent but parallel mechanisms.
>         With event time, they are basically the same.
>
>         > 2) it probably should for the sake of processing time windows.
>
>         >
>
>         > Here you are touching the bit of the current design that I
>         like the least. We basically have now three different ways of
>         conveying very similar things:
>
>         > a) sending `MAX_WATERMARK`, used by event time
>         WindowOperator (what about processing time?)
>
>         > b) endInput(), used for example by AsyncWaitOperator to
>         flush it's internal state
>
>         > c) finish(), used for example by ContinuousFileReaderOperator
>
>         >
>
>         > It's a bit messy and I'm not sure if this should be
>         strengthened out? Each one of those has a little bit different
>         semantic/meaning, but at the same time they are very similar.
>         For single input operators `endInput()` and `finish()` are
>         actually the very same thing.
>
>         >
>
>         > Piotrek
>
>         > czw., 15 lip 2021 o 16:47 Till Rohrmann napisał(a):
>
>         > Thanks for updating the FLIP. Based on the new section about
>
>         > stop-with-savepoint [--drain] I got two other questions:
>
>         >
>
>         > 1) Does endOfInput entail sending of the MAX_WATERMARK?
>
>         >
>
>         > 2) StreamOperator.finish says to flush all buffered events.
>         Would a
>
>         > WindowOperator close all windows and emit the results upon
>         calling
>
>         > finish, for example?
>
>         >
>
>         > Cheers,
>
>         > Till
>
>         >
>
>         > On Thu, Jul 15, 2021 at 10:15 AM Till Rohrmann wrote:
>
>         >
>
>         > > Thanks a lot for your answers and clarifications Yun.
>
>         > >
>
>         > > 1+2) Agreed, this can be a future improvement if this
>         becomes a problem.
>
>         > >
>
>         > > 3) Great, this will help a lot with understanding the FLIP.
>
>         > >
>
>         > > Cheers,
>
>         > > Till
>
>         > >
>
>         > > On Wed, Jul 14, 2021 at 5:41 PM Yun Gao
>
>         > > wrote:
>
>         > >
>
>         > >> Hi Till,
>
>         > >>
>
>         > >> Very thanks for the review and comments!
>
>         > >>
>
>         > >> 1) First I think in fact we could be able to do the
>         computation outside
>
>         > >> of the main thread,
>
>         > >> and the current implementation mainly due to the
>         computation is in
>
>         > >> general fast and we
>
>         > >> initially want to have a simplified first version.
>
>         > >>
>
>         > >> The main requirement here is to have a constant view of
>         the state of the
>
>         > >> tasks, otherwise
>
>         > >> for example if we have A -> B, if A is running when we
>         check if we need
>
>         > >> to trigger A, we will
>
>         > >> mark A as have to trigger, but if A gets to finished when
>         we check B, we
>
>         > >> will also mark B as
>
>         > >> have to trigger, then B will receive both rpc trigger and
>         checkpoint
>
>         > >> barrier, which would break
>
>         > >> some assumption on the task side and complicate the
>         implementation.
>
>         > >>
>
>         > >> But to cope this issue, we in fact could first have a
>         snapshot of the
>
>         > >> tasks' state and then do the
>
>         > >> computation, both the two step do not need to be in the
>         main thread.
>
>         > >>
>
>         > >> 2) For the computation logic, in fact currently we
>         benefit a lot from
>
>         > >> some shortcuts on all-to-all
>
>         > >> edges and job vertex with all tasks running, these
>         shortcuts could do
>
>         > >> checks on the job vertex level
>
>         > >> first and skip some job vertices as a whole. With this
>         optimization we
>
>         > >> have a O(V) algorithm, and the
>
>         > >> current running time of the worst case for a job with
>         320,000 tasks is
>
>         > >> less than 100ms. For
>
>         > >> daily graph sizes the time would be further reduced linearly.
>
>         > >>
>
>         > >> If we do the computation based on the last triggered
>         tasks, we may not
>
>         > >> easily encode this information
>
>         > >> into the shortcuts on the job vertex level. And since the
>         time seems to
>
>         > >> be short, perhaps it is enough
>
>         > >> to do re-computation from the scratch in consideration of
>         the tradeoff
>
>         > >> between the performance and
>
>         > >> the complexity ?
>
>         > >>
>
>         > >> 3) We are going to emit the EndOfInput event exactly
>         after the finish()
>
>         > >> method and before the last
>
>         > >> snapshotState() method so that we could shut down the
>         whole topology with
>
>         > >> a single final checkpoint.
>
>         > >> Very sorry for not include enough details for this part
>         and I'll
>
>         > >> complement the FLIP with the details on
>
>         > >> the process of the final checkpoint / savepoint.
>
>         > >>
>
>         > >> Best,
>
>         > >> Yun
>
>         > >>
>
>         > >>
>
>         > >>
>
>         > >>
>         ------------------------------------------------------------------
>
>         > >> From:Till Rohrmann
>
>         > >> Send Time:2021 Jul. 14 (Wed.) 22:05
>
>         > >> To:dev
>
>         > >> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint
>         After Tasks
>
>         > >> Finished
>
>         > >>
>
>         > >> Hi everyone,
>
>         > >>
>
>         > >> I am a bit late to the voting party but let me ask three
>         questions:
>
>         > >>
>
>         > >> 1) Why do we execute the trigger plan computation in the
>         main thread if we
>
>         > >> cannot guarantee that all tasks are still running when
>         triggering the
>
>         > >> checkpoint? Couldn't we do the computation in a different
>         thread in order
>
>         > >> to relieve the main thread a bit.
>
>         > >>
>
>         > >> 2) The implementation of the
>         DefaultCheckpointPlanCalculator seems to go
>
>         > >> over the whole topology for every calculation. Wouldn't
>         it be more
>
>         > >> efficient to maintain the set of current tasks to trigger
>         and check
>
>         > >> whether
>
>         > >> anything has changed and if so check the succeeding tasks
>         until we have
>
>         > >> found the current checkpoint trigger frontier?
>
>         > >>
>
>         > >> 3) When are we going to send the endOfInput events to a
>         downstream task?
>
>         > >> If
>
>         > >> this happens after we call finish on the upstream
>         operator but before
>
>         > >> snapshotState then it would be possible to shut down the
>         whole topology
>
>         > >> with a single final checkpoint. I think this part could
>         benefit from a bit
>
>         > >> more detailed description in the FLIP.
>
>         > >>
>
>         > >> Cheers,
>
>         > >> Till
>
>         > >>
>
>         > >> On Fri, Jul 2, 2021 at 8:36 AM Yun Gao
>
>         > >> wrote:
>
>         > >>
>
>         > >> > Hi there,
>
>         > >> >
>
>         > >> > Since the voting time of FLIP-147[1] has passed, I'm
>         closing the vote
>
>         > >> now.
>
>         > >> >
>
>         > >> > There were seven +1 votes ( 6 / 7 are bindings) and no
>         -1 votes:
>
>         > >> >
>
>         > >> > - Dawid Wysakowicz (binding)
>
>         > >> > - Piotr Nowojski(binding)
>
>         > >> > - Jiangang Liu (binding)
>
>         > >> > - Arvid Heise (binding)
>
>         > >> > - Jing Zhang (binding)
>
>         > >> > - Leonard Xu (non-binding)
>
>         > >> > - Guowei Ma (binding)
>
>         > >> >
>
>         > >> > Thus I'm happy to announce that the update to the
>         FLIP-147 is accepted.
>
>         > >> >
>
>         > >> > Very thanks everyone!
>
>         > >> >
>
>         > >> > Best,
>
>         > >> > Yun
>
>         > >> >
>
>         > >> > [1] https://cwiki.apache.org/confluence/x/mw-ZCQ
>
>         > >>
>
>         > >>
>
>         >
>
>
>

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to