Hi everyone,

It seems there is no further discussion about the current proposal, I will
start a vote next Monday.


Best,
Zakelly

On Tue, Aug 6, 2024 at 6:22 PM Zakelly Lan <zakelly....@gmail.com> wrote:

> Hi Piotr,
>
> Thanks for your explanation! Now I roughly get your point.
>
> All in all, I would be hesitant to allow users to declare callbacks
>> everywhere,
>> as it might lead to some unforeseen problems or readability issues?
>> I don't see a good motivation for them neither in `open`, `setup` nor
>> `snapshotState`.
>> Those methods are called without keyed context. Additionally the only
>> heavy
>> logic
>> that I have seen in both `open` and `setup` was one-off state migration
>> (when the way
>> the operator defines its state changes between Flink versions). Which is
>> rare and
>> is fine for me to be a synchronous process.
>
>
> Note that the declaration of callback is not necessary to go under keyed
> context, but the invocation of callback is. Anyway, it's only a fledgling
> idea of allowing users to declare callbacks in `open` and invoke them in
> other places. I'm not going to introduce it for now.
> Currently, we are strictly restricting the use of callbacks under keyed
> context. We will refrain from introducing interfaces until we have a clear
> understanding of their use cases. I think we have a consensus to keep
> `snapshotState` synchronous at present.
>
>
> Best,
> Zakelly
>
> On Tue, Aug 6, 2024 at 5:16 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>> Hi again,
>>
>> Thanks for further explanation!
>>
>> > I briefly revisit all SQL operator implementations, and it seems
>> feasible
>> > for all scenarios. I'm not sure if this answers your question.
>>
>> I think you misunderstood what I meant. Initially I was wondering if we
>> should allow for
>> those async callbacks to be declared and used in mailbox callbacks.
>> However
>> in my
>> second email:
>>
>> > I realised that AsynWaitOperator is using only java heap
>> > variables in the callbacks, that are only copied to Flink's state in the
>> > `snapshotState` call. So a need for declarative/async mailbox callbacks
>> looks to
>> > be entirely theoretical at the moment.
>>
>> In other words, I don't think we need to worry about mailbox callbacks, at
>> least
>> not right now. Maybe in the future, once we will find some good motivation
>> for them?
>>
>> > Leaving aside specific examples, I'm thinking of allowing users to
>> access
>> > the `DeclarationContext` in `open` or `setup` of the operator, which
>> means
>> > they can declare any callback or function they want.
>>
>> All in all, I would be hesitant to allow users to declare callbacks
>> everywhere,
>> as it might lead to some unforeseen problems or readability issues?
>>
>> I don't see a good motivation for them neither in `open`, `setup` nor
>> `snapshotState`.
>> Those methods are called without keyed context. Additionally the only
>> heavy
>> logic
>> that I have seen in both `open` and `setup` was one-off state migration
>> (when the way
>> the operator defines its state changes between Flink versions). Which is
>> rare and
>> is fine for me to be a synchronous process.
>>
>> So all in all, I'm fine with the current proposal.
>>
>> Best,
>> Piotrek
>>
>>
>> wt., 6 sie 2024 o 10:55 Zakelly Lan <zakelly....@gmail.com> napisał(a):
>>
>> > Hi Piotr,
>> >
>> > Thanks for your reply!
>> >
>> >
>> > Because the remote state backend/async state accesses are only supported
>> > > on the keyed state?
>> >
>> >
>> > Yes. And since we only have a heap implementation of Operator State,
>> > offloading them to async threads seems pointless. Currently we keep the
>> > interface in sync-style as it is.
>> >
>> > No, I didn't mean the function accessing the operator's state, but if
>> the
>> > > operator
>> > > itself accessed its own state in a mailbox callback. However now that
>> I
>> > > tried to
>> > > searching for an example, I realised that AsynWaitOperator is using
>> > > only java heap
>> > > variables in the callbacks, that are only copied to Flink's state in
>> the
>> > > `snapshotState` call. So a need for declarative/async mailbox
>> callbacks
>> > > looks to
>> > > be entirely theoretical at the moment.
>> >
>> >
>> > Leaving aside specific examples, I'm thinking of allowing users to
>> access
>> > the `DeclarationContext` in `open` or `setup` of the operator, which
>> means
>> > they can declare any callback or function they want. And those callbacks
>> > can be invoked theoretically everywhere. For example, they can do:
>> > ```
>> > class CustomOperator extends AbstractStreamOperator {
>> >
>> >   private NamedCallback callback;
>> >
>> >   public void open() {
>> >     callback = getDeclareContext().declare(
>> >        (e) -> {  // some consumer logic }
>> >      );
>> >   }
>> >
>> >   public void someFunction() {
>> >     valueState.value(callback);
>> >   }
>> > }
>> > ```
>> >
>> > Apart from that, there will be another internal API support processing
>> > something under a keyed context:
>> > ```
>> > asyncProcessWithKey(key, () -> { // some logic});
>> > // the function will be processed in mailbox;
>> > // equivalent to
>> > // keyedStateBackend.setCurrentKey(k) and do something
>> > ```
>> >
>> > I briefly revisit all SQL operator implementations, and it seems
>> feasible
>> > for all scenarios. I'm not sure if this answers your question.
>> >
>> >
>> > Best,
>> > Zakelly
>> >
>> > On Tue, Aug 6, 2024 at 4:12 PM Piotr Nowojski <pnowoj...@apache.org>
>> > wrote:
>> >
>> > > Hi Zakelly,
>> > >
>> > > Thanks for your responses!
>> > >
>> > > > IIUC, the AsyncWaitOperator and AsyncScalarFunction are all
>> non-keyed,
>> > so
>> > > > we don't need to worry too much
>> > >
>> > > Because the remote state backend/async state accesses are only
>> supported
>> > > on the keyed state?
>> > >
>> > > > Furthermore, as I mentioned, we currently
>> > > > do not permit any asynchronous threads to access the state.
>> > >
>> > > No, I didn't mean the function accessing the operator's state, but if
>> the
>> > > operator
>> > > itself accessed its own state in a mailbox callback. However now that
>> I
>> > > tried to
>> > > searching for an example, I realised that AsynWaitOperator is using
>> > > only java heap
>> > > variables in the callbacks, that are only copied to Flink's state in
>> the
>> > > `snapshotState` call. So a need for declarative/async mailbox
>> callbacks
>> > > looks to
>> > > be entirely theoretical at the moment.
>> > >
>> > > Best,
>> > > Piotrek
>> > >
>> > > wt., 6 sie 2024 o 07:33 Zakelly Lan <zakelly....@gmail.com>
>> napisał(a):
>> > >
>> > > > Hi Piotr,
>> > > >
>> > > > Thanks for your comments! PTAL at my answers:
>> > > >
>> > > > AFAIU it means async state requests can be used basically
>> everywhere,
>> > but
>> > > > > unless
>> > > > > they are coming from declaring process in process elements/timers,
>> > they
>> > > > > won't be
>> > > > > interruptible and will have to be drained before checkpoint
>> > commences?
>> > > > > Right?
>> > > >
>> > > >
>> > > > Yes, they should be declared by APIs provided by current FLIP before
>> > they
>> > > > can be snapshotable and interruptible. However, the state APIs can
>> only
>> > > be
>> > > > used by the logic on the task thread or equivalently the same
>> thread.
>> > > >
>> > > > I'm not worried about `snapshotState`, I don't think there are
>> any/many
>> > > > > operators
>> > > > > doing some heavy logic there. I'm more thinking about mailbox
>> > actions,
>> > > > for
>> > > > > example
>> > > > > used in AsyncWaitOperator (and hence AsyncScalarFunction).
>> > > >
>> > > >
>> > > > IIUC, the AsyncWaitOperator and AsyncScalarFunction are all
>> non-keyed,
>> > so
>> > > > we don't need to worry too much. Furthermore, as I mentioned, we
>> > > currently
>> > > > do not permit any asynchronous threads to access the state. Perhaps
>> we
>> > > can
>> > > > address how async user-defined functions could access the state
>> later.
>> > > >
>> > > > Why the callbacks have to be named? I presume we could have some
>> > > > > deterministic name
>> > > > > generator (`(i++).toString()`) and support checkpointing anonymous
>> > > > > callbacks. Have you
>> > > > > considered and rejected that for some reason?
>> > > >
>> > > >
>> > > > Let me clarify this. I think you are talking about declaring a
>> callback
>> > > > without a name provided? This is actually supported. All the
>> explicit
>> > > > `withName` could be omitted, and a name generator, as you mentioned,
>> > will
>> > > > generate a name for a callback only if the user utilizes the APIs of
>> > this
>> > > > FLIP. These callbacks are still considered 'named' because the
>> > > > auto-assigned names are accounted for.
>> > > >
>> > > > As far as I understand, if the user is accessing non declared local
>> > > > > variables, that won't throw any
>> > > > > exception/error (because how could that be done?), it would just
>> > behave
>> > > > > strangely, with shared
>> > > > > local variables having seemingly random values?
>> > > > >
>> > > >
>> > > > Yes, after recovery, the shared local variables would lost the
>> value at
>> > > > time of checkpoint. I have implemented a detection that
>> automatically
>> > > falls
>> > > > back to draining before the checkpoint if a callback captures an
>> > > anonymous
>> > > > shared variable. Additionally, if users are certain that the
>> variable
>> > is
>> > > > safe (e.g. it is read-only or just for statistics), they could
>> annotate
>> > > the
>> > > > variable and pass the check, thus the fallback won't take effect.
>> PoC
>> > of
>> > > > the detection is here[1].
>> > > >
>> > > > > The request's callback should depend on a single request rather
>> than
>> > > > > being associated with
>> > > > > > multiple requests (for example, not designed for operations like
>> > > > > thenCombine() or
>> > > > > > StateFutureUtils.combineAll() or any iteration).
>> > > >
>> > > >
>> > > > > Could you elaborate a bit more on what's the problem?
>> > > >
>> > > >
>> > > > Well this is a matter of implementation complexity. Currently, we
>> only
>> > > note
>> > > > down the request and its callback in a checkpoint. If we want to
>> > support
>> > > > multiple-path combinations, we will need to record the relationships
>> > > > between requests and track the progress of each path, as there may
>> be
>> > > > multiple stages within each path.
>> > > >
>> > > >
>> > > > [1]
>> > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/pull/24719/files#diff-eae23736b1bb0d4a654c04669105ad3c9bc9c39a8e265693e0134b7787aa2a09R53-R69
>> > > >
>> > > > Best,
>> > > > Zakelly
>> > > >
>> > >
>> >
>>
>

Reply via email to