Hi Piotr, Overall this FLIP is fine for me. I have a minor concern: IIUC, the code path will become more complex after this FLIP due to the addition of shouldIntterupt() checks, right?
If so, it's better to add a benchmark to check whether the job performance regresses when one job has a lot of timers. If the performance regresses too much, we need to re-consider it. Of course, I hope the performance is fine. Best, Rui On Mon, May 6, 2024 at 6:30 PM Zakelly Lan <zakelly....@gmail.com> wrote: > Hi Piotr, > > I'm saying the scenario where things happen in the following order: > 1. advance watermark and process timers. > 2. the cp arrives and interrupts the timer processing, after this the > continuation mail is in the mailbox queue. > 3. `snapshotState` is called, where the async state access responses will > be drained by calling `tryYield()` [1]. —— What if the continuation mail is > triggered by `tryYield()`? > > I'm suggesting skipping the continuation mail during draining of async > state access. > > > [1] > > https://github.com/apache/flink/blob/1904b215e36e4fd48e48ece7ffdf2f1470653130/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java#L305 > > Best, > Zakelly > > > On Mon, May 6, 2024 at 6:00 PM Piotr Nowojski <pnowoj...@apache.org> > wrote: > > > Hi Zakelly, > > > > Can you elaborate a bit more on what you have in mind? How marking mails > as > > interruptible helps with something? If an incoming async state access > > response comes, it could just request to interrupt any currently ongoing > > computations, regardless the currently executed mail is or is not > > interruptible. > > > > Best, > > Piotrek > > > > pon., 6 maj 2024 o 06:33 Zakelly Lan <zakelly....@gmail.com> napisał(a): > > > > > Hi Piotr, > > > > > > Thanks for the improvement, overall +1 for this. I'd leave a minor > > comment: > > > > > > 1. I'd suggest also providing `isInterruptable()` in `Mail`, and the > > > continuation mail will return true. The FLIP-425 will leverage this > queue > > > to execute some state requests, and when the cp arrives, the operator > may > > > call `yield()` to drain. It may happen that the continuation mail is > > called > > > again in `yield()`. By checking `isInterruptable()`, we can skip this > > mail > > > and re-enqueue. > > > > > > > > > Best, > > > Zakelly > > > > > > On Wed, May 1, 2024 at 4:35 PM Yanfei Lei <fredia...@gmail.com> wrote: > > > > > > > Thanks for your answers, Piotrek. I got it now. +1 for this > > improvement. > > > > > > > > Best, > > > > Yanfei > > > > > > > > Stefan Richter <srich...@confluent.io.invalid> 于2024年4月30日周二 > 21:30写道: > > > > > > > > > > > > > > > Thanks for the improvement proposal, I’m +1 for the change! > > > > > > > > > > Best, > > > > > Stefan > > > > > > > > > > > > > > > > > > > > > On 30. Apr 2024, at 15:23, Roman Khachatryan <ro...@apache.org> > > > wrote: > > > > > > > > > > > > Thanks for the proposal, I definitely see the need for this > > > > improvement, +1. > > > > > > > > > > > > Regards, > > > > > > Roman > > > > > > > > > > > > > > > > > > On Tue, Apr 30, 2024 at 3:11 PM Piotr Nowojski < > > pnowoj...@apache.org > > > > <mailto:pnowoj...@apache.org>> wrote: > > > > > > > > > > > >> Hi Yanfei, > > > > > >> > > > > > >> Thanks for the feedback! > > > > > >> > > > > > >>> 1. Currently when AbstractStreamOperator or > > > AbstractStreamOperatorV2 > > > > > >>> processes a watermark, the watermark will be sent to > downstream, > > if > > > > > >>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted, > > > when > > > > > >>> is the watermark sent downstream? > > > > > >> > > > > > >> The watermark would be outputted by an operator only once all > > > relevant > > > > > >> timers are fired. > > > > > >> In other words, if firing of timers is interrupted a > continuation > > > > mail to > > > > > >> continue firing those > > > > > >> interrupted timers is created. Watermark will be emitted > > downstream > > > > at the > > > > > >> end of that > > > > > >> continuation mail. > > > > > >> > > > > > >>> 2. IIUC, processing-timer's firing is also encapsulated into > mail > > > and > > > > > >>> executed in mailbox. Is processing-timer allowed to be > > interrupted? > > > > > >> > > > > > >> Yes, both firing processing and even time timers share the same > > code > > > > and > > > > > >> both will > > > > > >> support interruptions in the same way. Actually I've renamed the > > > FLIP > > > > from > > > > > >> > > > > > >>> Interruptible watermarks processing > > > > > >> > > > > > >> to: > > > > > >> > > > > > >>> Interruptible timers firing > > > > > >> > > > > > >> to make this more clear. > > > > > >> > > > > > >> Best, > > > > > >> Piotrek > > > > > >> > > > > > >> wt., 30 kwi 2024 o 06:08 Yanfei Lei <fredia...@gmail.com> > > > napisał(a): > > > > > >> > > > > > >>> Hi Piotrek, > > > > > >>> > > > > > >>> Thanks for this proposal. It looks like it will shorten the > > > > checkpoint > > > > > >>> duration, especially in the case of back pressure. +1 for it! > > I'd > > > > > >>> like to ask some questions to understand your thoughts more > > > > precisely. > > > > > >>> > > > > > >>> 1. Currently when AbstractStreamOperator or > > > AbstractStreamOperatorV2 > > > > > >>> processes a watermark, the watermark will be sent to > downstream, > > if > > > > > >>> the `InternalTimerServiceImpl#advanceWatermark` is interrupted, > > > when > > > > > >>> is the watermark sent downstream? > > > > > >>> 2. IIUC, processing-timer's firing is also encapsulated into > mail > > > and > > > > > >>> executed in mailbox. Is processing-timer allowed to be > > interrupted? > > > > > >>> > > > > > >>> Best regards, > > > > > >>> Yanfei > > > > > >>> > > > > > >>> Piotr Nowojski <pnowoj...@apache.org> 于2024年4月29日周一 21:57写道: > > > > > >>> > > > > > >>>> > > > > > >>>> Hi all, > > > > > >>>> > > > > > >>>> I would like to start a discussion on FLIP-443: Interruptible > > > > watermark > > > > > >>>> processing. > > > > > >>>> > > > > > >>>> > > > > > > > > > > https://www.google.com/url?q=https://cwiki.apache.org/confluence/x/qgn9EQ&source=gmail-imap&ust=1715088370000000&usg=AOvVaw0eTZDvLwdZUDai5GqoSGrD > > > > > >>>> > > > > > >>>> This proposal tries to make Flink's subtask thread more > > responsive > > > > when > > > > > >>>> processing watermarks/firing timers, and make those operations > > > > > >>>> interruptible/break them apart into smaller steps. At the same > > > time, > > > > > >> the > > > > > >>>> proposed solution could be potentially adopted in other places > > in > > > > the > > > > > >>> code > > > > > >>>> base as well, to solve similar problems with other > flatMap-like > > > > > >> operators > > > > > >>>> (non windowed joins, aggregations, CepOperator, ...). > > > > > >>>> > > > > > >>>> I'm looking forward to your thoughts. > > > > > >>>> > > > > > >>>> Best, > > > > > >>>> Piotrek > > > > > > > > > > > > > > >