Hi everyone, Thanks for your valuable discussion and feedback!
Our discussions have been going on for a while and there have been no new comments for several days. So I would like to start a vote after 72 hours. Please let me know if you have any concerns, thanks! Yanfei Lei <fredia...@gmail.com> 于2024年3月13日周三 12:54写道: > > Hi Jing, > Thanks for the reply and follow up. > > > What is the benefit for users to build a chain of mails instead of just one > > mail(it is still async)? > > Just to make sure we're on the same page, I try to paraphrase your question: > A `then()` call will be encapsulated as a callback mail. Your question > is whether we can call then() as little as possible to reduce the > overhead of encapsulating it into a mail. > > In general, whether to call `then()` depends on the user's data > dependencies. The operations in a chain of `then()` are strictly > ordered. > > > > The following is an example without data dependencies, if written in > the form of a `then` chain: > stateA.update(1).then(stateB.update(2).then(stateC.update(3))); > > The execution order is: > ``` > stateA update 1 -> stateB update 2-> stateC update 3 > ``` > > If written in the form without `then()` call, they will be placed in a > "mail/mailboxDefaultAction", and each state request will still be > executed asynchronously: > ``` > stateA.update(1); > stateB.update(2); > stateC.update(3); > ``` > > The order in which they are executed is undefined and may be: > ``` > - stateA update 1 -> stateB update 2-> stateC update 3 > - stateB update 2 -> stateC update 3-> stateA update 1 > - stateC update 3 -> stateA update 1-> stateB update 2 > ... > ``` > And the final results are "stateA = 1, stateB = 2, stateC = 3". In > this case, the two ways of writing are equivalent. > > > > If there are data dependencies, for example: > ``` > stateA.update(1).then(stateA.update(2)) > ``` > > Then the execution order is: > ``` > stateA update 1 -> stateA update 2 > ``` > > If written in the form without `then()` call: > ``` > stateA.update(1); > stateA.update(2); > ``` > > The order in which they are executed is undefined and may be: > ``` > - stateA update 1 -> stateA update 2 > - stateA update 2-> stateA update 1 > ``` > The final result may be "stateA = 1" *OR* "stateA = 2". In this case, > the way without `then()` chain to limit the execution order, and the > results may be wrong. > > In summary, how many mails are encapsulated depends on how the user > writes the code, and how the user writes the code depends on their > data dependencies. [1][2] may be helpful for asynchronous programming > practice. > > > > I was wondering if exceptions in the mail chain would have an impact on the > > reference counting? > > We will catch exceptions that can be handled, they don't have impacts > on the reference counting. > For exceptions that cannot be handled, we will directly fail the job. > > > Maybe a UT to cover all kinds of cases, i.e. happy paths and unhappy paths, > > would make sense. > > Nice suggestions, we will add a UT to cover those cases. > > > [1] > https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html > [2] https://www.codingjunkie.net/completable-futures-part1/ > > Jing Ge <j...@ververica.com.invalid> 于2024年3月13日周三 07:05写道: > > > > Hi Yanfei, > > > > Thanks for your clarification! Now I got a much clear picture and I am > > still trying to understand your thoughts for some of those questions: > > > > > > > How many mails are encapsulated depends on how the user writes the > > > code. The statements in a `then()` will be wrapped into a mail. > > > StateFuture is a restricted version of CompletableFuture, their basic > > > semantics are consistent. > > > > > > > Conceptually, users can write a chain of many async calls, i.e. many then() > > calls. And all these calls for Record A must be executed in order, while > > Record B should stay at the Blocking buffer. What is the benefit for users > > to build a chain of mails instead of just one mail(it is still async)? Is > > there any best practices or guidelines to teach/tell users when and how > > many async calls in a chain could/should be built? > > > > > The challenge arises in determining when all the processing logic > > associated with Record A is fully executed. To address this, we have > > adopted a reference counting mechanism that tracks ongoing operations > > (either processing input or executing a callback) related to a single > > record. > > > > > We describe this in the "Error handling"[2] section. This FLIP also > > > adopts the design from FLIP-368, ensuring that all state interfaces > > > throw unchecked exceptions and, consequently, do not declare any > > > exceptions in their signatures. In cases where an exception occurs > > > while accessing the state, the job should fail. > > > > > > > My question was not about how exceptions will be defined. I am not sure how > > unchecked exceptions handling will be implemented. I was wondering if > > exceptions in the mail chain would have an impact on the reference > > counting? E.g. in Fig 5, if an exception happened in the value(), > > update(int), or function within then(), any -1 counting might be missed? > > Maybe a UT to cover all kinds of cases, i.e. happy paths and unhappy paths, > > would make sense. > > > > Best regards, > > Jing > > > > On Mon, Mar 11, 2024 at 3:58 AM Yanfei Lei <fredia...@gmail.com> wrote: > > > > > Hi Jing, > > > > > > Thanks for your thoughtful feedback! > > > > > > > does it mean that there will be three mails for Read, Update, and Output > > > ? > > > > > > With this example, there are two mails. The Read is processed by > > > `mailboxDefaultAction`[1], and the Update and Output are encapsulated > > > as mail. > > > > > > > does it make sense to encapsulate one mail instead of 3 mails with more > > > overhead? > > > > > > > > > > > How many mails are encapsulated depends on how the user writes the > > > code. The statements in a `then()` will be wrapped into a mail. > > > StateFuture is a restricted version of CompletableFuture, their basic > > > semantics are consistent. > > > > > > > > > > > > Would you like to add more description for cases when exceptions > > > happened? E.g. when reading or/and updating State throws IOExceptions. > > > > > > > > > > > We describe this in the "Error handling"[2] section. This FLIP also > > > adopts the design from FLIP-368, ensuring that all state interfaces > > > throw unchecked exceptions and, consequently, do not declare any > > > exceptions in their signatures. In cases where an exception occurs > > > while accessing the state, the job should fail. > > > > > > > > > > > > > Is it correct to understand that AEC is stateless? > > > > > > Great perspective, yes, it can be understood that way. > > > AEC is a task-level component. When the job fails or is restarted, the > > > runtime status in AEC will be reset. > > > In fact, we have considered taking a snapshot of the status in AEC and > > > storing it in a checkpoint like "unaligned checkpoint", but since > > > callback cannot be serialized, this idea is not feasible for the time > > > being. > > > > > > > would you like to add Pseudo-code for the inFilghtReocordNum decrement > > > to help us understand the logic better? > > > > > > This part of the code is a bit scattered, we will try to abstract a > > > pseudo-code. You can first refer to the RecordContext-related code [3] > > > in the PoC to understand it. > > > > > > [1] > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java#L81 > > > [2] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ErrorHandling > > > [3] > > > https://github.com/ververica/flink-poc/blob/disagg-poc-2/flink-runtime/src/main/java/org/apache/flink/runtime/state/async/RecordContext.java#L77 > > > > > > > > > Best, > > > Yanfei > > > > > > Jing Ge <j...@ververica.com.invalid> 于2024年3月10日周日 23:47写道: > > > > > > > > Hi Yanfei, > > > > > > > > Thanks for your proposal! The FLIP contains a lot of great new ideas. > > > > I'd > > > > like to ask some questions to make sure we are on the same page. > > > > > > > > > For the asynchronous interface, Record A should run with Read, Update > > > and > > > > Output, while Record B should stay at the Blocking buffer. > > > > > > > > 1. With this example, does it mean that there will be three mails for > > > Read, > > > > Update, and Output ? > > > > 2. If yes, since the Read, Update, and Output have to be executed before > > > > Record B, does it make sense to encapsulate one mail instead of 3 mails > > > > with more overhead? There must be some thoughts behind the design. Look > > > > forward to it. > > > > > > > > > The challenge arises in determining when all the processing logic > > > > associated with Record A is fully executed. To address this, we have > > > > adopted a reference counting mechanism that tracks ongoing operations > > > > (either processing input or executing a callback) related to a single > > > > record. > > > > > > > > The design reminds me of the JVM reference counting for GC. Would you > > > like > > > > to add more description for cases when exceptions happened? E.g. when > > > > reading or/and updating State throws IOExceptions. > > > > > > > > > In more detail, AEC uses a inFilghtReocordNum variable to trace the > > > > current number of records in progress. Every time the AEC receives a new > > > > record, the inFilghtReocordNum increases by 1; when all processing and > > > > callback for this record have completed, the inFilghtReocordNum > > > decreases > > > > by 1. When processing one checkpoint mail, the current task thread will > > > > give up the time slice through the yield() method of the mailbox > > > executor, > > > > so that the ongoing state request’s callback and the blocking state > > > > requests will be drained first until inFlightRecordNum reduces to 0. > > > > > > > > 1. Speaking of draining, is it correct to understand that AEC is > > > stateless? > > > > E.g. AEC could be easily scaled out if it became a bottleneck. > > > > 2. There are Pseudo-code for the inFilghtReocordNum increment, would you > > > > like to add Pseudo-code for the inFilghtReocordNum decrement to help us > > > > understand the logic better? > > > > > > > > The FLIP shows overall a great design! +1 for it! Looking forward to > > > > your > > > > thoughts, thanks! > > > > > > > > Best regards, > > > > Jing > > > > > > > > On Thu, Mar 7, 2024 at 10:05 AM Yanfei Lei <fredia...@gmail.com> wrote: > > > > > > > > > Hi devs, > > > > > > > > > > I'd like to start a discussion on FLIP-425: Asynchronous Execution > > > > > Model[1], which is a sub-FLIP of FLIP-423: Disaggregated State Storage > > > > > and Management[2]. > > > > > > > > > > FLIP-425 introduces a non-blocking execution model leveraging the > > > > > asynchronous APIs introduced in FLIP-424[3]. > > > > > For the whole story please read the FLIP-423[2], and this thread is > > > > > aimed to discuss the details of "FLIP-425: Asynchronous Execution > > > > > Model". > > > > > > > > > > Regarding the details of this FLIP, there have been some discussions > > > > > here[4], mainly focusing on framework overhead profiling, watermark > > > > > processing, etc. Please see link[4] for the context. > > > > > > > > > > Looking forward to hearing from you! > > > > > > > > > > > > > > > [1] https://cwiki.apache.org/confluence/x/S4p3EQ > > > > > [2] https://cwiki.apache.org/confluence/x/R4p3EQ > > > > > [3] https://cwiki.apache.org/confluence/x/SYp3EQ > > > > > [4] https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0 > > > > > > > > > > Best, > > > > > Yanfei > > > > > > > > > > > > -- > Best, > Yanfei -- Best, Yanfei