Hi Zakelly,

Thanks for driving this! I also have a couple of questions. But overall the
proposal
LGTM.

>> 2. Besides `processElement`, there are other methods that access state,
>> such as
>> `snapshotState`. Are these methods also not applicable under this FLIP?
>
> Well the async state APIs are available, including the additional ones in
> part1. However the APIs for declaring processing are not.

AFAIU it means async state requests can be used basically everywhere, but
unless
they are coming from declaring process in process elements/timers, they
won't be
interruptible and will have to be drained before checkpoint commences?
Right?

I'm not worried about `snapshotState`, I don't think there are any/many
operators
doing some heavy logic there. I'm more thinking about mailbox actions, for
example
used in AsyncWaitOperator (and hence AsyncScalarFunction).

A couple of other points that I wanted to understand better.

> Try to use named callbacks everywhere. —— If there are some anonymous
callbacks,
> the checkpoint will not proceed until all anonymous callbacks and
corresponding
> requests finish. See next section for more details.

Why the callbacks have to be named? I presume we could have some
deterministic name
generator (`(i++).toString()`) and support checkpointing anonymous
callbacks. Have you
considered and rejected that for some reason?

Or maybe I'm confusing something. By anonymous callbacks, do you mean using
declarative
async API but without explicitly specifying the callback name? Or the
older, already
existing/voted async state api access from `processElement`?

> Try to use declared variables. —— If other local variables captured in
any named
> callbacks, they cannot be properly serialized into the checkpoint.

As far as I understand, if the user is accessing non declared local
variables, that won't throw any
exception/error (because how could that be done?), it would just behave
strangely, with shared
local variables having seemingly random values?

> The request's callback should depend on a single request rather than
being associated with
> multiple requests (for example, not designed for operations like
thenCombine() or
> StateFutureUtils.combineAll() or any iteration).

Could you elaborate a bit more on what's the problem?

Best,
Piotrek

pt., 2 sie 2024 o 11:58 Zakelly Lan <zakelly....@gmail.com> napisał(a):

> Hi Xuyang,
>
> Thanks for your reply!
>
> 1. Since declareProcess is introduced in `Input` and
> > `TwoInputStreamOperator` to
> > align with `processElement`, do we also need to consider modifying other
> > clazzes
> > such as `KeyedProcessFunction` to introduce similar methods?
>
>
> Yes, these APIs will be exposed to the `Function` layer. But since we want
> to keep this internal, some new internal classes will be introduced.
>
> 2. Besides `processElement`, there are other methods that access state,
> > such as
> > `snapshotState`. Are these methods also not applicable under this FLIP?
>
>
> Well the async state APIs are available, including the additional ones in
> part1. However the APIs for declaring processing are not.
>
>
> 3. For chain-style declarations, can they be nested? For example, can
> > another finished
> > declared chain1 be used within a declared chain2?
>
> The lambda can not be nested declared, since we need to actually declare a
> lambda before the real processing. So all the lambdas are required to be
> declared right under the `declareProcess`. But once the chains are
> declared, they can freely be combined together. Reusing or referencing one
> by another one is also allowed.
>
> Hope this resolves your question.
>
> Best,
> Zakelly
>
> On Fri, Aug 2, 2024 at 11:44 AM Xuyang <xyzhong...@163.com> wrote:
>
> > Hi, Zakelly.
> >
> >
> >
> >
> > This FLIP LGTM overall, and I think it makes sense to speed up
> > checkpointing.
> >
> > After reading the entire FLIP, I have the following questions.
> >
> >
> >
> >
> > 1. Since declareProcess is introduced in `Input` and
> > `TwoInputStreamOperator` to
> >
> > align with `processElement`, do we also need to consider modifying other
> > clazzes
> >
> > such as `KeyedProcessFunction` to introduce similar methods?
> >
> >
> >
> >
> > 2. Besides `processElement`, there are other methods that access state,
> > such as
> >
> > `snapshotState`. Are these methods also not applicable under this FLIP?
> >
> >
> >
> >
> > 3. For chain-style declarations, can they be nested? For example, can
> > another finished
> >
> > declared chain1 be used within a declared chain2?
> >
> >
> >
> >
> > --
> >
> >     Best!
> >     Xuyang
> >
> >
> >
> >
> >
> > At 2024-07-30 14:06:08, "Zakelly Lan" <zakelly....@gmail.com> wrote:
> > >Hi devs,
> > >
> > >I would like to initiate a discussion about FLIP-455: Declare async
> state
> > >processing and checkpoint the in-flight requests[1].
> > >
> > >FLIP-423[2] and the related sub-FLIPs introduced the disaggregated state
> > >and async accessing model of state. However, the in-flight state
> requests
> > >(or records) should be drained at the checkpoint, which leads to an
> > >increased checkpoint synchronization delay. The main goal of this FLIP
> is
> > >to snapshot the in-flight requests as part of a checkpoint. This will
> > >accelerate the draining process and decouple the mutual impact between
> > >checkpoints and data processing.
> > >
> > >For your ease of understanding, the FLIP covers three key aspects:
> > >1. (Public API) Additional APIs for conditional branchings under
> > >`StateFuture`
> > >2. (Internal API) An internal API set for declaring and defining the
> > record
> > >processing
> > >3. (Implementation) Method to snapshot the in-flight state requests
> > >For the first two parts, there is also a PoC branch[3] provided.
> > >
> > >Looking forward to hearing from you.
> > >
> > >[1] https://cwiki.apache.org/confluence/x/C4owEg
> > >[2] https://cwiki.apache.org/confluence/x/R4p3EQ
> > >[3] https://github.com/apache/flink/pull/24719
> > >
> > >
> > >Best,
> > >Zakelly
> >
>

Reply via email to