Hi everyone,

Thanks for your valuable discussion and feedback!

Our discussions have been going on for a while and there have been no
new comments for several days. So I would like to start a vote after
72 hours.

Please let me know if you have any concerns, thanks!

Yanfei Lei <fredia...@gmail.com> 于2024年3月13日周三 12:54写道:

>
> Hi Jing,
> Thanks for the reply and follow up.
>
> > What is the benefit for users to build a chain of mails instead of just one 
> > mail(it is still async)?
>
> Just to make sure we're on the same page, I try to paraphrase your question:
> A `then()` call will be encapsulated as a callback mail. Your question
> is whether we can call then() as little as possible to reduce the
> overhead of encapsulating it into a mail.
>
> In general, whether to call `then()` depends on the user's data
> dependencies. The operations in a chain of `then()` are strictly
> ordered.
>
>
>
> The following is an example without data dependencies, if written in
> the form of a `then` chain:
> stateA.update(1).then(stateB.update(2).then(stateC.update(3)));
>
> The execution order is:
> ```
> stateA update 1 -> stateB update 2-> stateC update 3
> ```
>
> If written in the form without `then()` call, they will be placed in a
> "mail/mailboxDefaultAction", and each state request will still be
> executed asynchronously:
> ```
> stateA.update(1);
> stateB.update(2);
> stateC.update(3);
> ```
>
> The order in which they are executed is undefined and may be:
> ```
> - stateA update 1 -> stateB update 2-> stateC update 3
> - stateB update 2 -> stateC update 3-> stateA update 1
> - stateC update 3 -> stateA update 1-> stateB update 2
> ...
> ```
> And the final results are "stateA = 1, stateB = 2, stateC = 3". In
> this case, the two ways of writing are equivalent.
>
>
>
> If there are data dependencies, for example:
> ```
> stateA.update(1).then(stateA.update(2))
> ```
>
> Then the execution order is:
> ```
> stateA update 1 -> stateA update 2
> ```
>
> If written in the form without `then()` call:
> ```
> stateA.update(1);
> stateA.update(2);
> ```
>
> The order in which they are executed is undefined and may be:
> ```
> - stateA update 1 -> stateA update 2
> - stateA update 2-> stateA update 1
> ```
> The final result may be "stateA = 1" *OR* "stateA = 2". In this case,
> the way without `then()` chain to limit the execution order, and the
> results may be wrong.
>
> In summary, how many mails are encapsulated depends on how the user
> writes the code, and how the user writes the code depends on their
> data dependencies. [1][2] may be helpful for asynchronous programming
> practice.
>
>
> > I was wondering if exceptions in the mail chain would have an impact on the 
> > reference counting?
>
> We will catch exceptions that can be handled, they don't have impacts
> on the reference counting.
> For exceptions that cannot be handled, we will directly fail the job.
>
> > Maybe a UT to cover all kinds of cases, i.e. happy paths and unhappy paths, 
> > would make sense.
>
> Nice suggestions, we will add a UT to cover those cases.
>
>
> [1] 
> https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html
> [2] https://www.codingjunkie.net/completable-futures-part1/
>
> Jing Ge <j...@ververica.com.invalid> 于2024年3月13日周三 07:05写道:
> >
> > Hi Yanfei,
> >
> > Thanks for your clarification! Now I got a much clear picture and I am
> > still trying to understand your thoughts for some of those questions:
> >
> >
> > > How many mails are encapsulated depends on how the user writes the
> > > code. The statements in a `then()` will be wrapped into a mail.
> > > StateFuture is a restricted version of CompletableFuture, their basic
> > > semantics are consistent.
> > >
> >
> > Conceptually, users can write a chain of many async calls, i.e. many then()
> > calls. And all these calls for Record A must be executed in order, while
> > Record B should stay at the Blocking buffer. What is the benefit for users
> > to build a chain of mails instead of just one mail(it is still async)? Is
> > there any best practices or guidelines to teach/tell users when and how
> > many async calls in a chain could/should be built?
> >
> > > The challenge arises in determining when all the processing logic
> > associated with Record A is fully executed. To address this, we have
> > adopted a reference counting mechanism that tracks ongoing operations
> > (either processing input or executing a callback) related to a single
> > record.
> >
> > > We describe this in the "Error handling"[2] section. This FLIP also
> > > adopts the design from FLIP-368, ensuring that all state interfaces
> > > throw unchecked exceptions and, consequently, do not declare any
> > > exceptions in their signatures. In cases where an exception occurs
> > > while accessing the state, the job should fail.
> > >
> >
> > My question was not about how exceptions will be defined. I am not sure how
> > unchecked exceptions handling will be implemented. I was wondering if
> > exceptions in the mail chain would have an impact on the reference
> > counting? E.g. in Fig 5, if an exception happened in the value(),
> > update(int), or function within then(), any -1 counting might be missed?
> > Maybe a UT to cover all kinds of cases, i.e. happy paths and unhappy paths,
> > would make sense.
> >
> > Best regards,
> > Jing
> >
> > On Mon, Mar 11, 2024 at 3:58 AM Yanfei Lei <fredia...@gmail.com> wrote:
> >
> > > Hi Jing,
> > >
> > > Thanks for your thoughtful feedback!
> > >
> > > > does it mean that there will be three mails for Read, Update, and Output
> > > ?
> > >
> > > With this example, there are two mails. The Read is processed by
> > > `mailboxDefaultAction`[1], and the Update and Output are encapsulated
> > > as mail.
> > >
> > > > does it make sense to encapsulate one mail instead of 3 mails with more
> > > overhead?
> > >
> > >
> >
> > > How many mails are encapsulated depends on how the user writes the
> > > code. The statements in a `then()` will be wrapped into a mail.
> > > StateFuture is a restricted version of CompletableFuture, their basic
> > > semantics are consistent.
> > >
> > >
> >
> > > > Would you like to add more description for cases when exceptions
> > > happened? E.g. when reading or/and updating State throws IOExceptions.
> > >
> > >
> >
> > > We describe this in the "Error handling"[2] section. This FLIP also
> > > adopts the design from FLIP-368, ensuring that all state interfaces
> > > throw unchecked exceptions and, consequently, do not declare any
> > > exceptions in their signatures. In cases where an exception occurs
> > > while accessing the state, the job should fail.
> > >
> >
> >
> >
> > > > Is it correct to understand that AEC is stateless?
> > >
> > > Great perspective, yes, it can be understood that way.
> > > AEC is a task-level component. When the job fails or is restarted, the
> > > runtime status in AEC will be reset.
> > > In fact, we have considered taking a snapshot of the status in AEC and
> > > storing it in a checkpoint like "unaligned checkpoint", but since
> > > callback cannot be serialized, this idea is not feasible for the time
> > > being.
> > >
> > > > would you like to add Pseudo-code for the inFilghtReocordNum decrement
> > > to help us understand the logic better?
> > >
> > > This part of the code is a bit scattered, we will try to abstract a
> > > pseudo-code. You can first refer to the RecordContext-related code [3]
> > > in the PoC to understand it.
> > >
> > > [1]
> > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java#L81
> > > [2]
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ErrorHandling
> > > [3]
> > > https://github.com/ververica/flink-poc/blob/disagg-poc-2/flink-runtime/src/main/java/org/apache/flink/runtime/state/async/RecordContext.java#L77
> > >
> > >
> > > Best,
> > > Yanfei
> > >
> > > Jing Ge <j...@ververica.com.invalid> 于2024年3月10日周日 23:47写道:
> > > >
> > > > Hi Yanfei,
> > > >
> > > > Thanks for your proposal! The FLIP contains a lot of great new ideas. 
> > > > I'd
> > > > like to ask some questions to make sure we are on the same page.
> > > >
> > > > > For the asynchronous interface, Record A should run with Read, Update
> > > and
> > > > Output, while Record B should stay at the Blocking buffer.
> > > >
> > > > 1. With this example, does it mean that there will be three mails for
> > > Read,
> > > > Update, and Output ?
> > > > 2. If yes, since the Read, Update, and Output have to be executed before
> > > > Record B, does it make sense to encapsulate one mail instead of 3 mails
> > > > with more overhead? There must be some thoughts behind the design. Look
> > > > forward to it.
> > > >
> > > > > The challenge arises in determining when all the processing logic
> > > > associated with Record A is fully executed. To address this, we have
> > > > adopted a reference counting mechanism that tracks ongoing operations
> > > > (either processing input or executing a callback) related to a single
> > > > record.
> > > >
> > > > The design reminds me of the JVM reference counting for GC. Would you
> > > like
> > > > to add more description for cases when exceptions happened? E.g. when
> > > > reading or/and updating State throws IOExceptions.
> > > >
> > > > > In more detail, AEC uses a inFilghtReocordNum  variable to trace the
> > > > current number of records in progress. Every time the AEC receives a new
> > > > record, the inFilghtReocordNum  increases by 1; when all processing and
> > > > callback for this record have completed, the inFilghtReocordNum
> > > decreases
> > > > by 1. When processing one checkpoint mail, the current task thread will
> > > > give up the time slice through the yield() method of the mailbox
> > > executor,
> > > > so that the ongoing state request’s callback and the blocking state
> > > > requests will be drained first until inFlightRecordNum reduces to 0.
> > > >
> > > > 1. Speaking of draining, is it correct to understand that AEC is
> > > stateless?
> > > > E.g. AEC could be easily scaled out if it became a bottleneck.
> > > > 2. There are Pseudo-code for the inFilghtReocordNum increment, would you
> > > > like to add Pseudo-code for the inFilghtReocordNum decrement to help us
> > > > understand the logic better?
> > > >
> > > > The FLIP shows overall a great design! +1 for it! Looking forward to 
> > > > your
> > > > thoughts, thanks!
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > On Thu, Mar 7, 2024 at 10:05 AM Yanfei Lei <fredia...@gmail.com> wrote:
> > > >
> > > > > Hi devs,
> > > > >
> > > > > I'd like to start a discussion on FLIP-425: Asynchronous Execution
> > > > > Model[1], which is a sub-FLIP of FLIP-423: Disaggregated State Storage
> > > > > and Management[2].
> > > > >
> > > > > FLIP-425 introduces a non-blocking execution model leveraging the
> > > > > asynchronous APIs introduced in FLIP-424[3].
> > > > > For the whole story please read the FLIP-423[2], and this thread is
> > > > > aimed to discuss the details of "FLIP-425: Asynchronous Execution
> > > > > Model".
> > > > >
> > > > > Regarding the details of this FLIP, there have been some discussions
> > > > > here[4], mainly focusing on framework overhead profiling, watermark
> > > > > processing, etc. Please see link[4] for the context.
> > > > >
> > > > > Looking forward to hearing from you!
> > > > >
> > > > >
> > > > > [1] https://cwiki.apache.org/confluence/x/S4p3EQ
> > > > > [2] https://cwiki.apache.org/confluence/x/R4p3EQ
> > > > > [3] https://cwiki.apache.org/confluence/x/SYp3EQ
> > > > > [4] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0
> > > > >
> > > > > Best,
> > > > > Yanfei
> > > > >
> > >
>
>
>
> --
> Best,
> Yanfei



--
Best,
Yanfei

Reply via email to