Thank you Yanfei for addressing all the questions!

> I'm not sure if I understand your question. In Java, this
case(modifying the local local variable) is not allowed[1], but there
are ways to get around the limitation of lambda.
In this case, the modification to x may be concurrent, which needs to
be handled carefully.

I think you got my question, and I did not realize that is not even allowed to 
modify some externally scoped variable in a lambda.
I guess the point is that it is possible, but the user would really need to be 
willing to do it and "shoot him/herself in the foot".

> an implicit fact in sync
API is that "event timer fire" would execute before "the subsequent
records of watermark", but in out-of-order mode(async API), the
execution order between them is not guaranteed

Got it, what I don't get exactly is what type of inconsistency/issue the user 
could face.
For example, If the user now would not be sure that elements end up being in 
correct windows, I am afraid this would somewhat simply hinder the watermark 
concept as a whole. What do you think?

Thank you.

On Mar 21, 2024 at 14:27 +0100, Yanfei Lei <fredia...@gmail.com>, wrote:
> Thanks for your reading and valuable comments!
>
> > 1) About locking VS reference counting: I would like to clear out which 
> > mechanism prevents what:
> The `KeyAccountingUnit` implements locking behavior on keys and
> ensures 2 state requests on the same key happen in order.
> Double-locking the same key does not result in deadlocks (thanks to
> the `previous == record` condition in your pseudo-code), so, the same
> callback chain can update/read multiple times the same piece of state.
> On the other side we have the reference counting mechanism that is
> used to understand whether a record has been fully processed, i.e.,
> all state invocations have been carried out.
> Here is the question: am I correct if we say that key accounting is
> needed for out-of-order while reference counting is needed for
> checkpointing and watermarking?
>
>
> Regarding the "deadlock" of `KeyAccountingUnit`: good catch, we will
> emphasize this in FLIP, the KeyAccountingUnitis reentrant, so the
> state requests of the same record can update/read multiple times
> without deadlock.
>
> Regarding the question: records, checkpoint barriers and watermarks
> can be regarded as inputs, this FLIP discusses the *order* between all
> inputs, in simple terms, the inputs of the same key that arrive first
> need to be executed first.
>
> And the `KeyAccountingUnit` and reference counting work together to
> preserve the order, when the reference counting mechanism recognizes a
> record has been fully processed, the record will be removed from the
> `KeyAccountingUnit`. The checkpoint or watermark would start util all
> the reference counting of arrived inputs reach zero.
>
>
> > 2) Number of mails:
> Do you end up having two mails?
>
>
> Yes, there are two mails in this case.
>
> > 3) Would this change something on the consistency guarantees provided?
> I guess not, as, the lock is held in any case until the value on the
> state hasn't been updated.
> Could lead to any inconsistency (most probably the state would be updated to 
> 0).
>
>
> Yes, the results of the two cases you mentioned are as you analyzed.
> The result of the first case is 1, and the result of the second case
> is 0.
> No matter which case it is, the next `processElement` with the same
> key will be executed after the code in this `processElement` is
> completely executed.
>
> Therefore it wouldn't lead to inconsistency.
>
> > 4) On the local variables/attributes:
>
> I'm not sure if I understand your question. In Java, this
> case(modifying the local local variable) is not allowed[1], but there
> are ways to get around the limitation of lambda.
> In this case, the modification to x may be concurrent, which needs to
> be handled carefully.
>
> 5) On watermarks:
> It seems that, in order to achieve a good throughput, out-of-order
> mode should be used.
> In the FLIP I could not understand well how many things could go wrong
> if that one is used.
> Could you please clarify that?
>
> A typical example is the order between "event timer fire" and "the
> subsequent records of watermark".
> Although the semantics of watermarks do not define the sequence
> between a watermark and subsequent records, an implicit fact in sync
> API is that "event timer fire" would execute before "the subsequent
> records of watermark", but in out-of-order mode(async API), the
> execution order between them is not guaranteed.
> There also are some related discussions in FLIP-423[2,3] proposed by
> Yunfeng Zhou and Xintong Song.
>
> [1] 
> https://stackoverflow.com/questions/30026824/modifying-local-variable-from-inside-lambda
> [2] https://lists.apache.org/thread/djsnybs9whzrt137z3qmxdwn031o93gn
> [3] https://lists.apache.org/thread/986zxq1k9rv3vkbk39yw16g24o6h83mz
>
> <lorenzo.affe...@ververica.com> 于2024年3月21日周四 19:29写道:
> >
> > Thank you everybody for the questions and answers (especially Yanfei Lei), 
> > it was very instructive to go over the discussion.
> > I am gonna add some questions on top of what happened and add some thoughts 
> > as well below.
> >
> > 1) About locking VS reference counting:
> > I would like to clear out which mechanism prevents what:
> > The `KeyAccountingUnit` implements locking behavior on keys and ensures 2 
> > state requests on the same key happen in order. Double-locking the same key 
> > does not result in deadlocks (thanks to the `previous == record` condition 
> > in your pseudo-code), so, the same callback chain can update/read multiple 
> > times the same piece of state.
> > On the other side we have the reference counting mechanism that is used to 
> > understand whether a record has been fully processed, i.e., all state 
> > invocations have been carried out.
> > Here is the question: am I correct if we say that key accounting is needed 
> > for out-of-order while reference counting is needed for checkpointing and 
> > watermarking?
> >
> > 2) Number of mails:
> > To expand on what Jing Ge already asked, in the example code in the FLIP:
> >
> > ```
> > state.value().then(
> > val -> {
> > return state.update(val + 1).then(
> > empty -> {
> > out.collect(val + 1);
> > };
> > }
> > }
> > ```
> > Do you end up having two mails?:
> >
> > first wrapping `val -> {...}`
> > second wrapping `empty -> {...}`
> >
> >
> > Did I get it correctly?
> >
> > 3) On the guarantees provided by the async execution framework:
> > Always referring to your example, say that when `val -> {...}` gets called 
> > the state value is 0.
> > The callback will update to 1 and register another callback to collect 
> > while still holding the lock on that piece of state, and preventing 
> > somebody else to read the value during the entire process (implementing 
> > atomicity for a transaction).
> > Now, say the code changes to:
> >
> > ```
> > state.value().then(
> > val -> {
> > state.update(val + 1);
> > out.collect(val + 1);
> > }
> > }
> > ```
> >
> > Would this change something on the consistency guarantees provided?
> > I guess not, as, the lock is held in any case until the value on the state 
> > hasn't been updated.
> >
> > This, instead (similar to your example):
> >
> > ```
> > int x = 0;
> >
> > state.value().then( val -> { x = val + 1; } );
> > state.update(x);
> > out.collect(x);
> > ```
> >
> > Could lead to any inconsistency (most probably the state would be updated 
> > to 0).
> >
> > 4) On the local variables/attributes:
> > The last example above exemplifies one of my concerns: what about the 
> > values enclosed in the callbacks?
> > That seems a bit counter-intuitive and brittle from a user perspective.
> > In the example we have a function-level variable, but what about fields:
> >
> > ```
> > int x = 0;
> >
> > void processElement(...) {
> > state.value().then( val -> { x += val; } );
> > ...
> > out.collect(x);
> > }
> > ```
> >
> > What could happen here?
> > Every callback would enclose the current value of the field?
> > Don't know exactly where I am heading, but it seems quite 
> > complex/convoluted :)
> >
> > 5) On watermarks:
> > It seems that, in order to achieve a good throughput, out-of-order mode 
> > should be used.
> > In the FLIP I could not understand well how many things could go wrong if 
> > that one is used.
> > Could you please clarify that?
> >
> > Thank you for your availability and your great work!
> >
> > On Mar 19, 2024 at 10:51 +0100, Yanfei Lei <fredia...@gmail.com>, wrote:
> >
> > Hi everyone,
> >
> > Thanks for your valuable discussion and feedback!
> >
> > Our discussions have been going on for a while and there have been no
> > new comments for several days. So I would like to start a vote after
> > 72 hours.
> >
> > Please let me know if you have any concerns, thanks!
> >
> > Yanfei Lei <fredia...@gmail.com> 于2024年3月13日周三 12:54写道:
> >
> >
> > Hi Jing,
> > Thanks for the reply and follow up.
> >
> > > > What is the benefit for users to build a chain of mails instead of just 
> > > > one mail(it is still async)?
> >
> >
> > Just to make sure we're on the same page, I try to paraphrase your question:
> > A `then()` call will be encapsulated as a callback mail. Your question
> > is whether we can call then() as little as possible to reduce the
> > overhead of encapsulating it into a mail.
> >
> > In general, whether to call `then()` depends on the user's data
> > dependencies. The operations in a chain of `then()` are strictly
> > ordered.
> >
> >
> >
> > The following is an example without data dependencies, if written in
> > the form of a `then` chain:
> > stateA.update(1).then(stateB.update(2).then(stateC.update(3)));
> >
> > The execution order is:
> > ```
> > stateA update 1 -> stateB update 2-> stateC update 3
> > ```
> >
> > If written in the form without `then()` call, they will be placed in a
> > "mail/mailboxDefaultAction", and each state request will still be
> > executed asynchronously:
> > ```
> > stateA.update(1);
> > stateB.update(2);
> > stateC.update(3);
> > ```
> >
> > The order in which they are executed is undefined and may be:
> > ```
> > - stateA update 1 -> stateB update 2-> stateC update 3
> > - stateB update 2 -> stateC update 3-> stateA update 1
> > - stateC update 3 -> stateA update 1-> stateB update 2
> > ...
> > ```
> > And the final results are "stateA = 1, stateB = 2, stateC = 3". In
> > this case, the two ways of writing are equivalent.
> >
> >
> >
> > If there are data dependencies, for example:
> > ```
> > stateA.update(1).then(stateA.update(2))
> > ```
> >
> > Then the execution order is:
> > ```
> > stateA update 1 -> stateA update 2
> > ```
> >
> > If written in the form without `then()` call:
> > ```
> > stateA.update(1);
> > stateA.update(2);
> > ```
> >
> > The order in which they are executed is undefined and may be:
> > ```
> > - stateA update 1 -> stateA update 2
> > - stateA update 2-> stateA update 1
> > ```
> > The final result may be "stateA = 1" *OR* "stateA = 2". In this case,
> > the way without `then()` chain to limit the execution order, and the
> > results may be wrong.
> >
> > In summary, how many mails are encapsulated depends on how the user
> > writes the code, and how the user writes the code depends on their
> > data dependencies. [1][2] may be helpful for asynchronous programming
> > practice.
> >
> >
> > > > I was wondering if exceptions in the mail chain would have an impact on 
> > > > the reference counting?
> >
> >
> > We will catch exceptions that can be handled, they don't have impacts
> > on the reference counting.
> > For exceptions that cannot be handled, we will directly fail the job.
> >
> > > > Maybe a UT to cover all kinds of cases, i.e. happy paths and unhappy 
> > > > paths, would make sense.
> >
> >
> > Nice suggestions, we will add a UT to cover those cases.
> >
> >
> > [1] 
> > https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html
> > [2] https://www.codingjunkie.net/completable-futures-part1/
> >
> > Jing Ge <j...@ververica.com.invalid> 于2024年3月13日周三 07:05写道:
> >
> > > >
> > > > Hi Yanfei,
> > > >
> > > > Thanks for your clarification! Now I got a much clear picture and I am
> > > > still trying to understand your thoughts for some of those questions:
> > > >
> > > >
> >
> > > > > > How many mails are encapsulated depends on how the user writes the
> > > > > > code. The statements in a `then()` will be wrapped into a mail.
> > > > > > StateFuture is a restricted version of CompletableFuture, their 
> > > > > > basic
> > > > > > semantics are consistent.
> > > > > >
> >
> > > >
> > > > Conceptually, users can write a chain of many async calls, i.e. many 
> > > > then()
> > > > calls. And all these calls for Record A must be executed in order, while
> > > > Record B should stay at the Blocking buffer. What is the benefit for 
> > > > users
> > > > to build a chain of mails instead of just one mail(it is still async)? 
> > > > Is
> > > > there any best practices or guidelines to teach/tell users when and how
> > > > many async calls in a chain could/should be built?
> > > >
> >
> > > > > > The challenge arises in determining when all the processing logic
> >
> > > > associated with Record A is fully executed. To address this, we have
> > > > adopted a reference counting mechanism that tracks ongoing operations
> > > > (either processing input or executing a callback) related to a single
> > > > record.
> > > >
> >
> > > > > > We describe this in the "Error handling"[2] section. This FLIP also
> > > > > > adopts the design from FLIP-368, ensuring that all state interfaces
> > > > > > throw unchecked exceptions and, consequently, do not declare any
> > > > > > exceptions in their signatures. In cases where an exception occurs
> > > > > > while accessing the state, the job should fail.
> > > > > >
> >
> > > >
> > > > My question was not about how exceptions will be defined. I am not sure 
> > > > how
> > > > unchecked exceptions handling will be implemented. I was wondering if
> > > > exceptions in the mail chain would have an impact on the reference
> > > > counting? E.g. in Fig 5, if an exception happened in the value(),
> > > > update(int), or function within then(), any -1 counting might be missed?
> > > > Maybe a UT to cover all kinds of cases, i.e. happy paths and unhappy 
> > > > paths,
> > > > would make sense.
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > On Mon, Mar 11, 2024 at 3:58 AM Yanfei Lei <fredia...@gmail.com> wrote:
> > > >
> >
> > > > > > Hi Jing,
> > > > > >
> > > > > > Thanks for your thoughtful feedback!
> > > > > >
> >
> > > > > > > > does it mean that there will be three mails for Read, Update, 
> > > > > > > > and Output
> >
> > > > > > ?
> > > > > >
> > > > > > With this example, there are two mails. The Read is processed by
> > > > > > `mailboxDefaultAction`[1], and the Update and Output are 
> > > > > > encapsulated
> > > > > > as mail.
> > > > > >
> >
> > > > > > > > does it make sense to encapsulate one mail instead of 3 mails 
> > > > > > > > with more
> >
> > > > > > overhead?
> > > > > >
> > > > > >
> >
> > > >
> >
> > > > > > How many mails are encapsulated depends on how the user writes the
> > > > > > code. The statements in a `then()` will be wrapped into a mail.
> > > > > > StateFuture is a restricted version of CompletableFuture, their 
> > > > > > basic
> > > > > > semantics are consistent.
> > > > > >
> > > > > >
> >
> > > >
> >
> > > > > > > > Would you like to add more description for cases when exceptions
> >
> > > > > > happened? E.g. when reading or/and updating State throws 
> > > > > > IOExceptions.
> > > > > >
> > > > > >
> >
> > > >
> >
> > > > > > We describe this in the "Error handling"[2] section. This FLIP also
> > > > > > adopts the design from FLIP-368, ensuring that all state interfaces
> > > > > > throw unchecked exceptions and, consequently, do not declare any
> > > > > > exceptions in their signatures. In cases where an exception occurs
> > > > > > while accessing the state, the job should fail.
> > > > > >
> >
> > > >
> > > >
> > > >
> >
> > > > > > > > Is it correct to understand that AEC is stateless?
> >
> > > > > >
> > > > > > Great perspective, yes, it can be understood that way.
> > > > > > AEC is a task-level component. When the job fails or is restarted, 
> > > > > > the
> > > > > > runtime status in AEC will be reset.
> > > > > > In fact, we have considered taking a snapshot of the status in AEC 
> > > > > > and
> > > > > > storing it in a checkpoint like "unaligned checkpoint", but since
> > > > > > callback cannot be serialized, this idea is not feasible for the 
> > > > > > time
> > > > > > being.
> > > > > >
> >
> > > > > > > > would you like to add Pseudo-code for the inFilghtReocordNum 
> > > > > > > > decrement
> >
> > > > > > to help us understand the logic better?
> > > > > >
> > > > > > This part of the code is a bit scattered, we will try to abstract a
> > > > > > pseudo-code. You can first refer to the RecordContext-related code 
> > > > > > [3]
> > > > > > in the PoC to understand it.
> > > > > >
> > > > > > [1]
> > > > > > https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java#L81
> > > > > > [2]
> > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ErrorHandling
> > > > > > [3]
> > > > > > https://github.com/ververica/flink-poc/blob/disagg-poc-2/flink-runtime/src/main/java/org/apache/flink/runtime/state/async/RecordContext.java#L77
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Yanfei
> > > > > >
> > > > > > Jing Ge <j...@ververica.com.invalid> 于2024年3月10日周日 23:47写道:
> >
> > > > > > > >
> > > > > > > > Hi Yanfei,
> > > > > > > >
> > > > > > > > Thanks for your proposal! The FLIP contains a lot of great new 
> > > > > > > > ideas. I'd
> > > > > > > > like to ask some questions to make sure we are on the same page.
> > > > > > > >
> >
> > > > > > > > > > For the asynchronous interface, Record A should run with 
> > > > > > > > > > Read, Update
> >
> > > > > > and
> >
> > > > > > > > Output, while Record B should stay at the Blocking buffer.
> > > > > > > >
> > > > > > > > 1. With this example, does it mean that there will be three 
> > > > > > > > mails for
> >
> > > > > > Read,
> >
> > > > > > > > Update, and Output ?
> > > > > > > > 2. If yes, since the Read, Update, and Output have to be 
> > > > > > > > executed before
> > > > > > > > Record B, does it make sense to encapsulate one mail instead of 
> > > > > > > > 3 mails
> > > > > > > > with more overhead? There must be some thoughts behind the 
> > > > > > > > design. Look
> > > > > > > > forward to it.
> > > > > > > >
> >
> > > > > > > > > > The challenge arises in determining when all the processing 
> > > > > > > > > > logic
> >
> > > > > > > > associated with Record A is fully executed. To address this, we 
> > > > > > > > have
> > > > > > > > adopted a reference counting mechanism that tracks ongoing 
> > > > > > > > operations
> > > > > > > > (either processing input or executing a callback) related to a 
> > > > > > > > single
> > > > > > > > record.
> > > > > > > >
> > > > > > > > The design reminds me of the JVM reference counting for GC. 
> > > > > > > > Would you
> >
> > > > > > like
> >
> > > > > > > > to add more description for cases when exceptions happened? 
> > > > > > > > E.g. when
> > > > > > > > reading or/and updating State throws IOExceptions.
> > > > > > > >
> >
> > > > > > > > > > In more detail, AEC uses a inFilghtReocordNum variable to 
> > > > > > > > > > trace the
> >
> > > > > > > > current number of records in progress. Every time the AEC 
> > > > > > > > receives a new
> > > > > > > > record, the inFilghtReocordNum increases by 1; when all 
> > > > > > > > processing and
> > > > > > > > callback for this record have completed, the inFilghtReocordNum
> >
> > > > > > decreases
> >
> > > > > > > > by 1. When processing one checkpoint mail, the current task 
> > > > > > > > thread will
> > > > > > > > give up the time slice through the yield() method of the mailbox
> >
> > > > > > executor,
> >
> > > > > > > > so that the ongoing state request’s callback and the blocking 
> > > > > > > > state
> > > > > > > > requests will be drained first until inFlightRecordNum reduces 
> > > > > > > > to 0.
> > > > > > > >
> > > > > > > > 1. Speaking of draining, is it correct to understand that AEC is
> >
> > > > > > stateless?
> >
> > > > > > > > E.g. AEC could be easily scaled out if it became a bottleneck.
> > > > > > > > 2. There are Pseudo-code for the inFilghtReocordNum increment, 
> > > > > > > > would you
> > > > > > > > like to add Pseudo-code for the inFilghtReocordNum decrement to 
> > > > > > > > help us
> > > > > > > > understand the logic better?
> > > > > > > >
> > > > > > > > The FLIP shows overall a great design! +1 for it! Looking 
> > > > > > > > forward to your
> > > > > > > > thoughts, thanks!
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > > Jing
> > > > > > > >
> > > > > > > > On Thu, Mar 7, 2024 at 10:05 AM Yanfei Lei 
> > > > > > > > <fredia...@gmail.com> wrote:
> > > > > > > >
> >
> > > > > > > > > > Hi devs,
> > > > > > > > > >
> > > > > > > > > > I'd like to start a discussion on FLIP-425: Asynchronous 
> > > > > > > > > > Execution
> > > > > > > > > > Model[1], which is a sub-FLIP of FLIP-423: Disaggregated 
> > > > > > > > > > State Storage
> > > > > > > > > > and Management[2].
> > > > > > > > > >
> > > > > > > > > > FLIP-425 introduces a non-blocking execution model 
> > > > > > > > > > leveraging the
> > > > > > > > > > asynchronous APIs introduced in FLIP-424[3].
> > > > > > > > > > For the whole story please read the FLIP-423[2], and this 
> > > > > > > > > > thread is
> > > > > > > > > > aimed to discuss the details of "FLIP-425: Asynchronous 
> > > > > > > > > > Execution
> > > > > > > > > > Model".
> > > > > > > > > >
> > > > > > > > > > Regarding the details of this FLIP, there have been some 
> > > > > > > > > > discussions
> > > > > > > > > > here[4], mainly focusing on framework overhead profiling, 
> > > > > > > > > > watermark
> > > > > > > > > > processing, etc. Please see link[4] for the context.
> > > > > > > > > >
> > > > > > > > > > Looking forward to hearing from you!
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > [1] https://cwiki.apache.org/confluence/x/S4p3EQ
> > > > > > > > > > [2] https://cwiki.apache.org/confluence/x/R4p3EQ
> > > > > > > > > > [3] https://cwiki.apache.org/confluence/x/SYp3EQ
> > > > > > > > > > [4] 
> > > > > > > > > > https://lists.apache.org/thread/ct8smn6g9y0b8730z7rp9zfpnwmj8vf0
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Yanfei
> > > > > > > > > >
> >
> > > > > >
> >
> >
> >
> >
> > --
> > Best,
> > Yanfei
> >
> >
> >
> >
> > --
> > Best,
> > Yanfei
>
>
>
> --
> Best,
> Yanfei

Reply via email to