Hi Piotrek, It looks good to me. Thanks for the update!
Best, Zakelly On Thu, May 23, 2024 at 7:04 PM Piotr Nowojski <piotr.nowoj...@gmail.com> wrote: > Hi Zakelly, > > I've thought about it a bit more, and I think only `#execute()` methods > make the most sense to be used when implementing operators (and > interruptible mails), so I will just add `MailOptions` parameters only to > them. If necessary, we can add more in the future. > > I have updated the FLIP. If it looks good to you, I would start a voting > thread today/tomorrow. > > Best, > Piotrek > > czw., 23 maj 2024 o 09:00 Zakelly Lan <zakelly....@gmail.com> napisał(a): > > > Hi Piotrek, > > > > Well, compared to this plan, I prefer your previous one, which is more in > > line with the intuition for executors' API, by calling `execute` > directly. > > Before the variants get too much, I'd suggest we only do minimum change > for > > only "interruptible". > > > > My original thinking is, doubling each method could result in a scenario > > where new methods lack callers. But like you said, for the sake of > > completeness, I could accept the doubling method plan. > > > > > > Thanks & Best, > > Zakelly > > > > On Wed, May 22, 2024 at 5:05 PM Piotr Nowojski <pnowoj...@apache.org> > > wrote: > > > > > Hi Zakelly, > > > > > > > I suggest not doubling the existing methods. Only providing the > > following > > > one is enough > > > > > > In that case I would prefer to have a complete set of the methods for > the > > > sake of completeness. If the number of variants is/would be getting too > > > much, we could convert the class into a builder? > > > > > > > > > > > > mailboxExecutor.execute(myThrowingRunnable).setInterriptuble().description("bla > > > %d").arg(42).submit(); > > > > > > It could be done in both in the future, if we would ever need to add > even > > > more methods, or I could do it now. WDYT? > > > > > > Best, > > > Piotrek > > > > > > śr., 22 maj 2024 o 08:48 Zakelly Lan <zakelly....@gmail.com> > napisał(a): > > > > > > > Hi Piotrek, > > > > > > > > `MailOptions` looks good to me. I suggest not doubling the existing > > > > methods. Only providing the following one is enough: > > > > > > > > void execute( > > > > > MailOptions mailOptions, > > > > > ThrowingRunnable<? extends Exception> command, > > > > > String descriptionFormat, > > > > > Object... descriptionArgs); > > > > > > > > > > > > WDYT? > > > > > > > > > > > > Best, > > > > Zakelly > > > > > > > > > > > > On Wed, May 22, 2024 at 12:53 AM Piotr Nowojski < > pnowoj...@apache.org> > > > > wrote: > > > > > > > > > Hi Zakelly and others, > > > > > > > > > > > 1. I'd suggest also providing `isInterruptable()` in `Mail`, and > > the > > > > > > continuation mail will return true. The FLIP-425 will leverage > this > > > > queue > > > > > > to execute some state requests, and when the cp arrives, the > > operator > > > > may > > > > > > call `yield()` to drain. It may happen that the continuation mail > > is > > > > > called > > > > > > again in `yield()`. By checking `isInterruptable()`, we can skip > > this > > > > > mail > > > > > > and re-enqueue. > > > > > > > > > > Do you have some suggestions on how `isInterruptible` should be > > > defined? > > > > > Do we have to double the amount of methods in the > `MailboxExecutor`, > > to > > > > > provide versions of the existing methods, that would enqueue > > > > > "interruptible" > > > > > versions of mails? Something like: > > > > > > > > > > default void execute(ThrowingRunnable<? extends Exception> > > command, > > > > > String description) { > > > > > execute(DEFAULT_OPTIONS, command, description); > > > > > } > > > > > > > > > > default void execute(MailOptions options, ThrowingRunnable<? > > > extends > > > > > Exception> command, String description) { > > > > > execute(options, command, description, EMPTY_ARGS); > > > > > } > > > > > > > > > > default void execute( > > > > > ThrowingRunnable<? extends Exception> command, > > > > > String descriptionFormat, > > > > > Object... descriptionArgs) { > > > > > execute(DEFAULT_OPTIONS, command, descriptionFormat, > > > > > descriptionArgs); > > > > > } > > > > > > > > > > void execute( > > > > > MailOptions options, > > > > > ThrowingRunnable<? extends Exception> command, > > > > > String descriptionFormat, > > > > > Object... descriptionArgs); > > > > > > > > > > public static class MailOptions { > > > > > (...) > > > > > public MailOptions() { > > > > > } > > > > > > > > > > MailOptions setIsInterruptible() { > > > > > this.isInterruptible = true; > > > > > return this; > > > > > } > > > > > } > > > > > > > > > > And usage would be like this: > > > > > > > > > > mailboxExecutor.execute(new MailOptions().setIsInterruptible(), () > > -> { > > > > > foo(); }, "foo"); > > > > > > > > > > ? > > > > > > > > > > Best, > > > > > Piotrek > > > > > > > > > > czw., 16 maj 2024 o 11:26 Rui Fan <1996fan...@gmail.com> > napisał(a): > > > > > > > > > > > Hi Piotr, > > > > > > > > > > > > > we checked in the firing timers benchmark [1] and we didn't > > observe > > > > any > > > > > > > performance regression. > > > > > > > > > > > > Thanks for the feedback, it's good news to hear that. I didn't > > notice > > > > > > we already have fireProcessingTimers benchmark. > > > > > > > > > > > > If so, we can follow it after this FLIP is merged. > > > > > > > > > > > > +1 for this FLIP. > > > > > > > > > > > > Best, > > > > > > Rui > > > > > > > > > > > > On Thu, May 16, 2024 at 5:13 PM Piotr Nowojski < > > pnowoj...@apache.org > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi Zakelly, > > > > > > > > > > > > > > > I'm suggesting skipping the continuation mail during draining > > of > > > > > async > > > > > > > state access. > > > > > > > > > > > > > > I see. That makes sense to me now. I will later update the > FLIP. > > > > > > > > > > > > > > > the code path will become more complex after this FLIP > > > > > > > due to the addition of shouldIntterupt() checks, right? > > > > > > > > > > > > > > Yes, that's correct. > > > > > > > > > > > > > > > If so, it's better to add a benchmark to check whether the > job > > > > > > > > performance regresses when one job has a lot of timers. > > > > > > > > If the performance regresses too much, we need to re-consider > > it. > > > > > > > > Of course, I hope the performance is fine. > > > > > > > > > > > > > > I had the same concerns when initially David Moravek proposed > > this > > > > > > > solution, > > > > > > > but we checked in the firing timers benchmark [1] and we didn't > > > > observe > > > > > > any > > > > > > > performance regression. > > > > > > > > > > > > > > Best, > > > > > > > Piotrek > > > > > > > > > > > > > > [1] > > > http://flink-speed.xyz/timeline/?ben=fireProcessingTimers&env=3 > > > > > > > > > > > > > > > > > > > > > > > > > > > > wt., 7 maj 2024 o 09:47 Rui Fan <1996fan...@gmail.com> > > napisał(a): > > > > > > > > > > > > > > > Hi Piotr, > > > > > > > > > > > > > > > > Overall this FLIP is fine for me. I have a minor concern: > > > > > > > > IIUC, the code path will become more complex after this FLIP > > > > > > > > due to the addition of shouldIntterupt() checks, right? > > > > > > > > > > > > > > > > If so, it's better to add a benchmark to check whether the > job > > > > > > > > performance regresses when one job has a lot of timers. > > > > > > > > If the performance regresses too much, we need to re-consider > > it. > > > > > > > > Of course, I hope the performance is fine. > > > > > > > > > > > > > > > > Best, > > > > > > > > Rui > > > > > > > > > > > > > > > > On Mon, May 6, 2024 at 6:30 PM Zakelly Lan < > > > zakelly....@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Piotr, > > > > > > > > > > > > > > > > > > I'm saying the scenario where things happen in the > following > > > > order: > > > > > > > > > 1. advance watermark and process timers. > > > > > > > > > 2. the cp arrives and interrupts the timer processing, > after > > > this > > > > > the > > > > > > > > > continuation mail is in the mailbox queue. > > > > > > > > > 3. `snapshotState` is called, where the async state access > > > > > responses > > > > > > > will > > > > > > > > > be drained by calling `tryYield()` [1]. —— What if the > > > > continuation > > > > > > > mail > > > > > > > > is > > > > > > > > > triggered by `tryYield()`? > > > > > > > > > > > > > > > > > > I'm suggesting skipping the continuation mail during > draining > > > of > > > > > > async > > > > > > > > > state access. > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/flink/blob/1904b215e36e4fd48e48ece7ffdf2f1470653130/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java#L305 > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Zakelly > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, May 6, 2024 at 6:00 PM Piotr Nowojski < > > > > > pnowoj...@apache.org> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Zakelly, > > > > > > > > > > > > > > > > > > > > Can you elaborate a bit more on what you have in mind? > How > > > > > marking > > > > > > > > mails > > > > > > > > > as > > > > > > > > > > interruptible helps with something? If an incoming async > > > state > > > > > > access > > > > > > > > > > response comes, it could just request to interrupt any > > > > currently > > > > > > > > ongoing > > > > > > > > > > computations, regardless the currently executed mail is > or > > is > > > > not > > > > > > > > > > interruptible. > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Piotrek > > > > > > > > > > > > > > > > > > > > pon., 6 maj 2024 o 06:33 Zakelly Lan < > > zakelly....@gmail.com> > > > > > > > > napisał(a): > > > > > > > > > > > > > > > > > > > > > Hi Piotr, > > > > > > > > > > > > > > > > > > > > > > Thanks for the improvement, overall +1 for this. I'd > > leave > > > a > > > > > > minor > > > > > > > > > > comment: > > > > > > > > > > > > > > > > > > > > > > 1. I'd suggest also providing `isInterruptable()` in > > > `Mail`, > > > > > and > > > > > > > the > > > > > > > > > > > continuation mail will return true. The FLIP-425 will > > > > leverage > > > > > > this > > > > > > > > > queue > > > > > > > > > > > to execute some state requests, and when the cp > arrives, > > > the > > > > > > > operator > > > > > > > > > may > > > > > > > > > > > call `yield()` to drain. It may happen that the > > > continuation > > > > > mail > > > > > > > is > > > > > > > > > > called > > > > > > > > > > > again in `yield()`. By checking `isInterruptable()`, we > > can > > > > > skip > > > > > > > this > > > > > > > > > > mail > > > > > > > > > > > and re-enqueue. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > Zakelly > > > > > > > > > > > > > > > > > > > > > > On Wed, May 1, 2024 at 4:35 PM Yanfei Lei < > > > > fredia...@gmail.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Thanks for your answers, Piotrek. I got it now. +1 > for > > > > this > > > > > > > > > > improvement. > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > Yanfei > > > > > > > > > > > > > > > > > > > > > > > > Stefan Richter <srich...@confluent.io.invalid> > > > > 于2024年4月30日周二 > > > > > > > > > 21:30写道: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the improvement proposal, I’m +1 for the > > > > change! > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > Stefan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 30. Apr 2024, at 15:23, Roman Khachatryan < > > > > > > > ro...@apache.org > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the proposal, I definitely see the > need > > > for > > > > > this > > > > > > > > > > > > improvement, +1. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > Roman > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Apr 30, 2024 at 3:11 PM Piotr Nowojski < > > > > > > > > > > pnowoj...@apache.org > > > > > > > > > > > > <mailto:pnowoj...@apache.org>> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > >> Hi Yanfei, > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> Thanks for the feedback! > > > > > > > > > > > > > >> > > > > > > > > > > > > > >>> 1. Currently when AbstractStreamOperator or > > > > > > > > > > > AbstractStreamOperatorV2 > > > > > > > > > > > > > >>> processes a watermark, the watermark will be > sent > > > to > > > > > > > > > downstream, > > > > > > > > > > if > > > > > > > > > > > > > >>> the `InternalTimerServiceImpl#advanceWatermark` > > is > > > > > > > > interrupted, > > > > > > > > > > > when > > > > > > > > > > > > > >>> is the watermark sent downstream? > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> The watermark would be outputted by an operator > > only > > > > > once > > > > > > > all > > > > > > > > > > > relevant > > > > > > > > > > > > > >> timers are fired. > > > > > > > > > > > > > >> In other words, if firing of timers is > > interrupted a > > > > > > > > > continuation > > > > > > > > > > > > mail to > > > > > > > > > > > > > >> continue firing those > > > > > > > > > > > > > >> interrupted timers is created. Watermark will be > > > > emitted > > > > > > > > > > downstream > > > > > > > > > > > > at the > > > > > > > > > > > > > >> end of that > > > > > > > > > > > > > >> continuation mail. > > > > > > > > > > > > > >> > > > > > > > > > > > > > >>> 2. IIUC, processing-timer's firing is also > > > > encapsulated > > > > > > > into > > > > > > > > > mail > > > > > > > > > > > and > > > > > > > > > > > > > >>> executed in mailbox. Is processing-timer > allowed > > to > > > > be > > > > > > > > > > interrupted? > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> Yes, both firing processing and even time timers > > > share > > > > > the > > > > > > > > same > > > > > > > > > > code > > > > > > > > > > > > and > > > > > > > > > > > > > >> both will > > > > > > > > > > > > > >> support interruptions in the same way. Actually > > I've > > > > > > renamed > > > > > > > > the > > > > > > > > > > > FLIP > > > > > > > > > > > > from > > > > > > > > > > > > > >> > > > > > > > > > > > > > >>> Interruptible watermarks processing > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> to: > > > > > > > > > > > > > >> > > > > > > > > > > > > > >>> Interruptible timers firing > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> to make this more clear. > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> Best, > > > > > > > > > > > > > >> Piotrek > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> wt., 30 kwi 2024 o 06:08 Yanfei Lei < > > > > > fredia...@gmail.com> > > > > > > > > > > > napisał(a): > > > > > > > > > > > > > >> > > > > > > > > > > > > > >>> Hi Piotrek, > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> Thanks for this proposal. It looks like it will > > > > shorten > > > > > > the > > > > > > > > > > > > checkpoint > > > > > > > > > > > > > >>> duration, especially in the case of back > > pressure. > > > +1 > > > > > for > > > > > > > it! > > > > > > > > > > I'd > > > > > > > > > > > > > >>> like to ask some questions to understand your > > > > thoughts > > > > > > more > > > > > > > > > > > > precisely. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> 1. Currently when AbstractStreamOperator or > > > > > > > > > > > AbstractStreamOperatorV2 > > > > > > > > > > > > > >>> processes a watermark, the watermark will be > sent > > > to > > > > > > > > > downstream, > > > > > > > > > > if > > > > > > > > > > > > > >>> the `InternalTimerServiceImpl#advanceWatermark` > > is > > > > > > > > interrupted, > > > > > > > > > > > when > > > > > > > > > > > > > >>> is the watermark sent downstream? > > > > > > > > > > > > > >>> 2. IIUC, processing-timer's firing is also > > > > encapsulated > > > > > > > into > > > > > > > > > mail > > > > > > > > > > > and > > > > > > > > > > > > > >>> executed in mailbox. Is processing-timer > allowed > > to > > > > be > > > > > > > > > > interrupted? > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> Best regards, > > > > > > > > > > > > > >>> Yanfei > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> Piotr Nowojski <pnowoj...@apache.org> > > > 于2024年4月29日周一 > > > > > > > 21:57写道: > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>>> > > > > > > > > > > > > > >>>> Hi all, > > > > > > > > > > > > > >>>> > > > > > > > > > > > > > >>>> I would like to start a discussion on > FLIP-443: > > > > > > > > Interruptible > > > > > > > > > > > > watermark > > > > > > > > > > > > > >>>> processing. > > > > > > > > > > > > > >>>> > > > > > > > > > > > > > >>>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/qgn9EQ&source=gmail-imap&ust=1715088370000000&usg=AOvVaw0eTZDvLwdZUDai5GqoSGrD > > > > > > > > > > > > > >>>> > > > > > > > > > > > > > >>>> This proposal tries to make Flink's subtask > > thread > > > > > more > > > > > > > > > > responsive > > > > > > > > > > > > when > > > > > > > > > > > > > >>>> processing watermarks/firing timers, and make > > > those > > > > > > > > operations > > > > > > > > > > > > > >>>> interruptible/break them apart into smaller > > steps. > > > > At > > > > > > the > > > > > > > > same > > > > > > > > > > > time, > > > > > > > > > > > > > >> the > > > > > > > > > > > > > >>>> proposed solution could be potentially adopted > > in > > > > > other > > > > > > > > places > > > > > > > > > > in > > > > > > > > > > > > the > > > > > > > > > > > > > >>> code > > > > > > > > > > > > > >>>> base as well, to solve similar problems with > > other > > > > > > > > > flatMap-like > > > > > > > > > > > > > >> operators > > > > > > > > > > > > > >>>> (non windowed joins, aggregations, > CepOperator, > > > > ...). > > > > > > > > > > > > > >>>> > > > > > > > > > > > > > >>>> I'm looking forward to your thoughts. > > > > > > > > > > > > > >>>> > > > > > > > > > > > > > >>>> Best, > > > > > > > > > > > > > >>>> Piotrek > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >