Thank you Yanfei for addressing all the questions! > I'm not sure if I understand your question. In Java, this case(modifying the local local variable) is not allowed[1], but there are ways to get around the limitation of lambda. In this case, the modification to x may be concurrent, which needs to be handled carefully.
I think you got my question, and I did not realize that is not even allowed to modify some externally scoped variable in a lambda. I guess the point is that it is possible, but the user would really need to be willing to do it and "shoot him/herself in the foot". > an implicit fact in sync API is that "event timer fire" would execute before "the subsequent records of watermark", but in out-of-order mode(async API), the execution order between them is not guaranteed Got it, what I don't get exactly is what type of inconsistency/issue the user could face. For example, If the user now would not be sure that elements end up being in correct windows, I am afraid this would somewhat simply hinder the watermark concept as a whole. What do you think? Thank you. On Mar 21, 2024 at 14:27 +0100, Yanfei Lei <fredia...@gmail.com>, wrote: > Thanks for your reading and valuable comments! > > > 1) About locking VS reference counting: I would like to clear out which > > mechanism prevents what: > The `KeyAccountingUnit` implements locking behavior on keys and > ensures 2 state requests on the same key happen in order. > Double-locking the same key does not result in deadlocks (thanks to > the `previous == record` condition in your pseudo-code), so, the same > callback chain can update/read multiple times the same piece of state. > On the other side we have the reference counting mechanism that is > used to understand whether a record has been fully processed, i.e., > all state invocations have been carried out. > Here is the question: am I correct if we say that key accounting is > needed for out-of-order while reference counting is needed for > checkpointing and watermarking? > > > Regarding the "deadlock" of `KeyAccountingUnit`: good catch, we will > emphasize this in FLIP, the KeyAccountingUnitis reentrant, so the > state requests of the same record can update/read multiple times > without deadlock. > > Regarding the question: records, checkpoint barriers and watermarks > can be regarded as inputs, this FLIP discusses the *order* between all > inputs, in simple terms, the inputs of the same key that arrive first > need to be executed first. > > And the `KeyAccountingUnit` and reference counting work together to > preserve the order, when the reference counting mechanism recognizes a > record has been fully processed, the record will be removed from the > `KeyAccountingUnit`. The checkpoint or watermark would start util all > the reference counting of arrived inputs reach zero. > > > > 2) Number of mails: > Do you end up having two mails? > > > Yes, there are two mails in this case. > > > 3) Would this change something on the consistency guarantees provided? > I guess not, as, the lock is held in any case until the value on the > state hasn't been updated. > Could lead to any inconsistency (most probably the state would be updated to > 0). > > > Yes, the results of the two cases you mentioned are as you analyzed. > The result of the first case is 1, and the result of the second case > is 0. > No matter which case it is, the next `processElement` with the same > key will be executed after the code in this `processElement` is > completely executed. > > Therefore it wouldn't lead to inconsistency. > > > 4) On the local variables/attributes: > > I'm not sure if I understand your question. In Java, this > case(modifying the local local variable) is not allowed[1], but there > are ways to get around the limitation of lambda. > In this case, the modification to x may be concurrent, which needs to > be handled carefully. > > 5) On watermarks: > It seems that, in order to achieve a good throughput, out-of-order > mode should be used. > In the FLIP I could not understand well how many things could go wrong > if that one is used. > Could you please clarify that? > > A typical example is the order between "event timer fire" and "the > subsequent records of watermark". > Although the semantics of watermarks do not define the sequence > between a watermark and subsequent records, an implicit fact in sync > API is that "event timer fire" would execute before "the subsequent > records of watermark", but in out-of-order mode(async API), the > execution order between them is not guaranteed. > There also are some related discussions in FLIP-423[2,3] proposed by > Yunfeng Zhou and Xintong Song. > > [1] > https://stackoverflow.com/questions/30026824/modifying-local-variable-from-inside-lambda > [2] https://lists.apache.org/thread/djsnybs9whzrt137z3qmxdwn031o93gn > [3] https://lists.apache.org/thread/986zxq1k9rv3vkbk39yw16g24o6h83mz > > <lorenzo.affe...@ververica.com> 于2024年3月21日周四 19:29写道: > > > > Thank you everybody for the questions and answers (especially Yanfei Lei), > > it was very instructive to go over the discussion. > > I am gonna add some questions on top of what happened and add some thoughts > > as well below. > > > > 1) About locking VS reference counting: > > I would like to clear out which mechanism prevents what: > > The `KeyAccountingUnit` implements locking behavior on keys and ensures 2 > > state requests on the same key happen in order. Double-locking the same key > > does not result in deadlocks (thanks to the `previous == record` condition > > in your pseudo-code), so, the same callback chain can update/read multiple > > times the same piece of state. > > On the other side we have the reference counting mechanism that is used to > > understand whether a record has been fully processed, i.e., all state > > invocations have been carried out. > > Here is the question: am I correct if we say that key accounting is needed > > for out-of-order while reference counting is needed for checkpointing and > > watermarking? > > > > 2) Number of mails: > > To expand on what Jing Ge already asked, in the example code in the FLIP: > > > > ``` > > state.value().then( > > val -> { > > return state.update(val + 1).then( > > empty -> { > > out.collect(val + 1); > > }; > > } > > } > > ``` > > Do you end up having two mails?: > > > > first wrapping `val -> {...}` > > second wrapping `empty -> {...}` > > > > > > Did I get it correctly? > > > > 3) On the guarantees provided by the async execution framework: > > Always referring to your example, say that when `val -> {...}` gets called > > the state value is 0. > > The callback will update to 1 and register another callback to collect > > while still holding the lock on that piece of state, and preventing > > somebody else to read the value during the entire process (implementing > > atomicity for a transaction). > > Now, say the code changes to: > > > > ``` > > state.value().then( > > val -> { > > state.update(val + 1); > > out.collect(val + 1); > > } > > } > > ``` > > > > Would this change something on the consistency guarantees provided? > > I guess not, as, the lock is held in any case until the value on the state > > hasn't been updated. > > > > This, instead (similar to your example): > > > > ``` > > int x = 0; > > > > state.value().then( val -> { x = val + 1; } ); > > state.update(x); > > out.collect(x); > > ``` > > > > Could lead to any inconsistency (most probably the state would be updated > > to 0). > > > > 4) On the local variables/attributes: > > The last example above exemplifies one of my concerns: what about the > > values enclosed in the callbacks? > > That seems a bit counter-intuitive and brittle from a user perspective. > > In the example we have a function-level variable, but what about fields: > > > > ``` > > int x = 0; > > > > void processElement(...) { > > state.value().then( val -> { x += val; } ); > > ... > > out.collect(x); > > } > > ``` > > > > What could happen here? > > Every callback would enclose the current value of the field? > > Don't know exactly where I am heading, but it seems quite > > complex/convoluted :) > > > > 5) On watermarks: > > It seems that, in order to achieve a good throughput, out-of-order mode > > should be used. > > In the FLIP I could not understand well how many things could go wrong if > > that one is used. > > Could you please clarify that? > > > > Thank you for your availability and your great work! > > > > On Mar 19, 2024 at 10:51 +0100, Yanfei Lei <fredia...@gmail.com>, wrote: > > > > Hi everyone, > > > > Thanks for your valuable discussion and feedback! > > > > Our discussions have been going on for a while and there have been no > > new comments for several days. So I would like to start a vote after > > 72 hours. > > > > Please let me know if you have any concerns, thanks! > > > > Yanfei Lei <fredia...@gmail.com> 于2024年3月13日周三 12:54写道: > > > > > > Hi Jing, > > Thanks for the reply and follow up. > > > > > > What is the benefit for users to build a chain of mails instead of just > > > > one mail(it is still async)? > > > > > > Just to make sure we're on the same page, I try to paraphrase your question: > > A `then()` call will be encapsulated as a callback mail. Your question > > is whether we can call then() as little as possible to reduce the > > overhead of encapsulating it into a mail. > > > > In general, whether to call `then()` depends on the user's data > > dependencies. The operations in a chain of `then()` are strictly > > ordered. > > > > > > > > The following is an example without data dependencies, if written in > > the form of a `then` chain: > > stateA.update(1).then(stateB.update(2).then(stateC.update(3))); > > > > The execution order is: > > ``` > > stateA update 1 -> stateB update 2-> stateC update 3 > > ``` > > > > If written in the form without `then()` call, they will be placed in a > > "mail/mailboxDefaultAction", and each state request will still be > > executed asynchronously: > > ``` > > stateA.update(1); > > stateB.update(2); > > stateC.update(3); > > ``` > > > > The order in which they are executed is undefined and may be: > > ``` > > - stateA update 1 -> stateB update 2-> stateC update 3 > > - stateB update 2 -> stateC update 3-> stateA update 1 > > - stateC update 3 -> stateA update 1-> stateB update 2 > > ... > > ``` > > And the final results are "stateA = 1, stateB = 2, stateC = 3". In > > this case, the two ways of writing are equivalent. > > > > > > > > If there are data dependencies, for example: > > ``` > > stateA.update(1).then(stateA.update(2)) > > ``` > > > > Then the execution order is: > > ``` > > stateA update 1 -> stateA update 2 > > ``` > > > > If written in the form without `then()` call: > > ``` > > stateA.update(1); > > stateA.update(2); > > ``` > > > > The order in which they are executed is undefined and may be: > > ``` > > - stateA update 1 -> stateA update 2 > > - stateA update 2-> stateA update 1 > > ``` > > The final result may be "stateA = 1" *OR* "stateA = 2". In this case, > > the way without `then()` chain to limit the execution order, and the > > results may be wrong. > > > > In summary, how many mails are encapsulated depends on how the user > > writes the code, and how the user writes the code depends on their > > data dependencies. [1][2] may be helpful for asynchronous programming > > practice. > > > > > > > > I was wondering if exceptions in the mail chain would have an impact on > > > > the reference counting? > > > > > > We will catch exceptions that can be handled, they don't have impacts > > on the reference counting. > > For exceptions that cannot be handled, we will directly fail the job. > > > > > > Maybe a UT to cover all kinds of cases, i.e. happy paths and unhappy > > > > paths, would make sense. > > > > > > Nice suggestions, we will add a UT to cover those cases. > > > > > > [1] > > https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html > > [2] https://www.codingjunkie.net/completable-futures-part1/ > > > > Jing Ge <j...@ververica.com.invalid> 于2024年3月13日周三 07:05写道: > > > > > > > > > > Hi Yanfei, > > > > > > > > Thanks for your clarification! Now I got a much clear picture and I am > > > > still trying to understand your thoughts for some of those questions: > > > > > > > > > > > > > > > > How many mails are encapsulated depends on how the user writes the > > > > > > code. The statements in a `then()` will be wrapped into a mail. > > > > > > StateFuture is a restricted version of CompletableFuture, their > > > > > > basic > > > > > > semantics are consistent. > > > > > > > > > > > > > > > > Conceptually, users can write a chain of many async calls, i.e. many > > > > then() > > > > calls. And all these calls for Record A must be executed in order, while > > > > Record B should stay at the Blocking buffer. What is the benefit for > > > > users > > > > to build a chain of mails instead of just one mail(it is still async)? > > > > Is > > > > there any best practices or guidelines to teach/tell users when and how > > > > many async calls in a chain could/should be built? > > > > > > > > > > > > The challenge arises in determining when all the processing logic > > > > > > associated with Record A is fully executed. To address this, we have > > > > adopted a reference counting mechanism that tracks ongoing operations > > > > (either processing input or executing a callback) related to a single > > > > record. > > > > > > > > > > > > We describe this in the "Error handling"[2] section. This FLIP also > > > > > > adopts the design from FLIP-368, ensuring that all state interfaces > > > > > > throw unchecked exceptions and, consequently, do not declare any > > > > > > exceptions in their signatures. In cases where an exception occurs > > > > > > while accessing the state, the job should fail. > > > > > > > > > > > > > > > > My question was not about how exceptions will be defined. I am not sure > > > > how > > > > unchecked exceptions handling will be implemented. I was wondering if > > > > exceptions in the mail chain would have an impact on the reference > > > > counting? E.g. in Fig 5, if an exception happened in the value(), > > > > update(int), or function within then(), any -1 counting might be missed? > > > > Maybe a UT to cover all kinds of cases, i.e. happy paths and unhappy > > > > paths, > > > > would make sense. > > > > > > > > Best regards, > > > > Jing > > > > > > > > On Mon, Mar 11, 2024 at 3:58 AM Yanfei Lei <fredia...@gmail.com> wrote: > > > > > > > > > > > > Hi Jing, > > > > > > > > > > > > Thanks for your thoughtful feedback! > > > > > > > > > > > > > > > > does it mean that there will be three mails for Read, Update, > > > > > > > > and Output > > > > > > > > ? > > > > > > > > > > > > With this example, there are two mails. The Read is processed by > > > > > > `mailboxDefaultAction`[1], and the Update and Output are > > > > > > encapsulated > > > > > > as mail. > > > > > > > > > > > > > > > > does it make sense to encapsulate one mail instead of 3 mails > > > > > > > > with more > > > > > > > > overhead? > > > > > > > > > > > > > > > > > > > > > > > > > > How many mails are encapsulated depends on how the user writes the > > > > > > code. The statements in a `then()` will be wrapped into a mail. > > > > > > StateFuture is a restricted version of CompletableFuture, their > > > > > > basic > > > > > > semantics are consistent. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Would you like to add more description for cases when exceptions > > > > > > > > happened? E.g. when reading or/and updating State throws > > > > > > IOExceptions. > > > > > > > > > > > > > > > > > > > > > > > > > > We describe this in the "Error handling"[2] section. This FLIP also > > > > > > adopts the design from FLIP-368, ensuring that all state interfaces > > > > > > throw unchecked exceptions and, consequently, do not declare any > > > > > > exceptions in their signatures. In cases where an exception occurs > > > > > > while accessing the state, the job should fail. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Is it correct to understand that AEC is stateless? > > > > > > > > > > > > > > Great perspective, yes, it can be understood that way. > > > > > > AEC is a task-level component. When the job fails or is restarted, > > > > > > the > > > > > > runtime status in AEC will be reset. > > > > > > In fact, we have considered taking a snapshot of the status in AEC > > > > > > and > > > > > > storing it in a checkpoint like "unaligned checkpoint", but since > > > > > > callback cannot be serialized, this idea is not feasible for the > > > > > > time > > > > > > being. > > > > > > > > > > > > > > > > would you like to add Pseudo-code for the inFilghtReocordNum > > > > > > > > decrement > > > > > > > > to help us understand the logic better? > > > > > > > > > > > > This part of the code is a bit scattered, we will try to abstract a > > > > > > pseudo-code. You can first refer to the RecordContext-related code > > > > > > [3] > > > > > > in the PoC to understand it. > > > > > > > > > > > > [1] > > > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java#L81 > > > > > > [2] > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ErrorHandling > > > > > > [3] > > > > > > https://github.com/ververica/flink-poc/blob/disagg-poc-2/flink-runtime/src/main/java/org/apache/flink/runtime/state/async/RecordContext.java#L77 > > > > > > > > > > > > > > > > > > Best, > > > > > > Yanfei > > > > > > > > > > > > Jing Ge <j...@ververica.com.invalid> 于2024年3月10日周日 23:47写道: > > > > > > > > > > > > > > > > > > Hi Yanfei, > > > > > > > > > > > > > > > > Thanks for your proposal! The FLIP contains a lot of great new > > > > > > > > ideas. I'd > > > > > > > > like to ask some questions to make sure we are on the same page. > > > > > > > > > > > > > > > > > > > > For the asynchronous interface, Record A should run with > > > > > > > > > > Read, Update > > > > > > > > and > > > > > > > > > > Output, while Record B should stay at the Blocking buffer. > > > > > > > > > > > > > > > > 1. With this example, does it mean that there will be three > > > > > > > > mails for > > > > > > > > Read, > > > > > > > > > > Update, and Output ? > > > > > > > > 2. If yes, since the Read, Update, and Output have to be > > > > > > > > executed before > > > > > > > > Record B, does it make sense to encapsulate one mail instead of > > > > > > > > 3 mails > > > > > > > > with more overhead? There must be some thoughts behind the > > > > > > > > design. Look > > > > > > > > forward to it. > > > > > > > > > > > > > > > > > > > > The challenge arises in determining when all the processing > > > > > > > > > > logic > > > > > > > > > > associated with Record A is fully executed. To address this, we > > > > > > > > have > > > > > > > > adopted a reference counting mechanism that tracks ongoing > > > > > > > > operations > > > > > > > > (either processing input or executing a callback) related to a > > > > > > > > single > > > > > > > > record. > > > > > > > > > > > > > > > > The design reminds me of the JVM reference counting for GC. > > > > > > > > Would you > > > > > > > > like > > > > > > > > > > to add more description for cases when exceptions happened? > > > > > > > > E.g. when > > > > > > > > reading or/and updating State throws IOExceptions. > > > > > > > > > > > > > > > > > > > > In more detail, AEC uses a inFilghtReocordNum variable to > > > > > > > > > > trace the > > > > > > > > > > current number of records in progress. Every time the AEC > > > > > > > > receives a new > > > > > > > > record, the inFilghtReocordNum increases by 1; when all > > > > > > > > processing and > > > > > > > > callback for this record have completed, the inFilghtReocordNum > > > > > > > > decreases > > > > > > > > > > by 1. When processing one checkpoint mail, the current task > > > > > > > > thread will > > > > > > > > give up the time slice through the yield() method of the mailbox > > > > > > > > executor, > > > > > > > > > > so that the ongoing state request’s callback and the blocking > > > > > > > > state > > > > > > > > requests will be drained first until inFlightRecordNum reduces > > > > > > > > to 0. > > > > > > > > > > > > > > > > 1. Speaking of draining, is it correct to understand that AEC is > > > > > > > > stateless? > > > > > > > > > > E.g. AEC could be easily scaled out if it became a bottleneck. > > > > > > > > 2. There are Pseudo-code for the inFilghtReocordNum increment, > > > > > > > > would you > > > > > > > > like to add Pseudo-code for the inFilghtReocordNum decrement to > > > > > > > > help us > > > > > > > > understand the logic better? > > > > > > > > > > > > > > > > The FLIP shows overall a great design! +1 for it! Looking > > > > > > > > forward to your > > > > > > > > thoughts, thanks! > > > > > > > > > > > > > > > > Best regards, > > > > > > > > Jing > > > > > > > > > > > > > > > > On Thu, Mar 7, 2024 at 10:05 AM Yanfei Lei > > > > > > > > <fredia...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > > Hi devs, > > > > > > > > > > > > > > > > > > > > I'd like to start a discussion on FLIP-425: Asynchronous > > > > > > > > > > Execution > > > > > > > > > > Model[1], which is a sub-FLIP of FLIP-423: Disaggregated > > > > > > > > > > State Storage > > > > > > > > > > and Management[2]. > > > > > > > > > > > > > > > > > > > > FLIP-425 introduces a non-blocking execution model > > > > > > > > > > leveraging the > > > > > > > > > > asynchronous APIs introduced in FLIP-424[3]. > > > > > > > > > > For the whole story please read the FLIP-423[2], and this > > > > > > > > > > thread is > > > > > > > > > > aimed to discuss the details of "FLIP-425: Asynchronous > > > > > > > > > > Execution > > > > > > > > > > Model". > > > > > > > > > > > > > > > > > > > > Regarding the details of this FLIP, there have been some > > > > > > > > > > discussions > > > > > > > > > > here[4], mainly focusing on framework overhead profiling, > > > > > > > > > > watermark > > > > > > > > > > processing, etc. Please see link[4] for the context. > > > > > > > > > > > > > > > > > > > > Looking forward to hearing from you! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] https://cwiki.apache.org/confluence/x/S4p3EQ > > > > > > > > > > [2] https://cwiki.apache.org/confluence/x/R4p3EQ > > > > > > > > > > [3] https://cwiki.apache.org/confluence/x/SYp3EQ > > > > > > > > > > [4] > > > > > > > > > > https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0 > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Yanfei > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > Best, > > Yanfei > > > > > > > > > > -- > > Best, > > Yanfei > > > > -- > Best, > Yanfei