Hi again,

Thanks for further explanation!

> I briefly revisit all SQL operator implementations, and it seems feasible
> for all scenarios. I'm not sure if this answers your question.

I think you misunderstood what I meant. Initially I was wondering if we
should allow for
those async callbacks to be declared and used in mailbox callbacks. However
in my
second email:

> I realised that AsynWaitOperator is using only java heap
> variables in the callbacks, that are only copied to Flink's state in the
> `snapshotState` call. So a need for declarative/async mailbox callbacks
looks to
> be entirely theoretical at the moment.

In other words, I don't think we need to worry about mailbox callbacks, at
least
not right now. Maybe in the future, once we will find some good motivation
for them?

> Leaving aside specific examples, I'm thinking of allowing users to access
> the `DeclarationContext` in `open` or `setup` of the operator, which means
> they can declare any callback or function they want.

All in all, I would be hesitant to allow users to declare callbacks
everywhere,
as it might lead to some unforeseen problems or readability issues?

I don't see a good motivation for them neither in `open`, `setup` nor
`snapshotState`.
Those methods are called without keyed context. Additionally the only heavy
logic
that I have seen in both `open` and `setup` was one-off state migration
(when the way
the operator defines its state changes between Flink versions). Which is
rare and
is fine for me to be a synchronous process.

So all in all, I'm fine with the current proposal.

Best,
Piotrek


wt., 6 sie 2024 o 10:55 Zakelly Lan <zakelly....@gmail.com> napisał(a):

> Hi Piotr,
>
> Thanks for your reply!
>
>
> Because the remote state backend/async state accesses are only supported
> > on the keyed state?
>
>
> Yes. And since we only have a heap implementation of Operator State,
> offloading them to async threads seems pointless. Currently we keep the
> interface in sync-style as it is.
>
> No, I didn't mean the function accessing the operator's state, but if the
> > operator
> > itself accessed its own state in a mailbox callback. However now that I
> > tried to
> > searching for an example, I realised that AsynWaitOperator is using
> > only java heap
> > variables in the callbacks, that are only copied to Flink's state in the
> > `snapshotState` call. So a need for declarative/async mailbox callbacks
> > looks to
> > be entirely theoretical at the moment.
>
>
> Leaving aside specific examples, I'm thinking of allowing users to access
> the `DeclarationContext` in `open` or `setup` of the operator, which means
> they can declare any callback or function they want. And those callbacks
> can be invoked theoretically everywhere. For example, they can do:
> ```
> class CustomOperator extends AbstractStreamOperator {
>
>   private NamedCallback callback;
>
>   public void open() {
>     callback = getDeclareContext().declare(
>        (e) -> {  // some consumer logic }
>      );
>   }
>
>   public void someFunction() {
>     valueState.value(callback);
>   }
> }
> ```
>
> Apart from that, there will be another internal API support processing
> something under a keyed context:
> ```
> asyncProcessWithKey(key, () -> { // some logic});
> // the function will be processed in mailbox;
> // equivalent to
> // keyedStateBackend.setCurrentKey(k) and do something
> ```
>
> I briefly revisit all SQL operator implementations, and it seems feasible
> for all scenarios. I'm not sure if this answers your question.
>
>
> Best,
> Zakelly
>
> On Tue, Aug 6, 2024 at 4:12 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
> > Hi Zakelly,
> >
> > Thanks for your responses!
> >
> > > IIUC, the AsyncWaitOperator and AsyncScalarFunction are all non-keyed,
> so
> > > we don't need to worry too much
> >
> > Because the remote state backend/async state accesses are only supported
> > on the keyed state?
> >
> > > Furthermore, as I mentioned, we currently
> > > do not permit any asynchronous threads to access the state.
> >
> > No, I didn't mean the function accessing the operator's state, but if the
> > operator
> > itself accessed its own state in a mailbox callback. However now that I
> > tried to
> > searching for an example, I realised that AsynWaitOperator is using
> > only java heap
> > variables in the callbacks, that are only copied to Flink's state in the
> > `snapshotState` call. So a need for declarative/async mailbox callbacks
> > looks to
> > be entirely theoretical at the moment.
> >
> > Best,
> > Piotrek
> >
> > wt., 6 sie 2024 o 07:33 Zakelly Lan <zakelly....@gmail.com> napisał(a):
> >
> > > Hi Piotr,
> > >
> > > Thanks for your comments! PTAL at my answers:
> > >
> > > AFAIU it means async state requests can be used basically everywhere,
> but
> > > > unless
> > > > they are coming from declaring process in process elements/timers,
> they
> > > > won't be
> > > > interruptible and will have to be drained before checkpoint
> commences?
> > > > Right?
> > >
> > >
> > > Yes, they should be declared by APIs provided by current FLIP before
> they
> > > can be snapshotable and interruptible. However, the state APIs can only
> > be
> > > used by the logic on the task thread or equivalently the same thread.
> > >
> > > I'm not worried about `snapshotState`, I don't think there are any/many
> > > > operators
> > > > doing some heavy logic there. I'm more thinking about mailbox
> actions,
> > > for
> > > > example
> > > > used in AsyncWaitOperator (and hence AsyncScalarFunction).
> > >
> > >
> > > IIUC, the AsyncWaitOperator and AsyncScalarFunction are all non-keyed,
> so
> > > we don't need to worry too much. Furthermore, as I mentioned, we
> > currently
> > > do not permit any asynchronous threads to access the state. Perhaps we
> > can
> > > address how async user-defined functions could access the state later.
> > >
> > > Why the callbacks have to be named? I presume we could have some
> > > > deterministic name
> > > > generator (`(i++).toString()`) and support checkpointing anonymous
> > > > callbacks. Have you
> > > > considered and rejected that for some reason?
> > >
> > >
> > > Let me clarify this. I think you are talking about declaring a callback
> > > without a name provided? This is actually supported. All the explicit
> > > `withName` could be omitted, and a name generator, as you mentioned,
> will
> > > generate a name for a callback only if the user utilizes the APIs of
> this
> > > FLIP. These callbacks are still considered 'named' because the
> > > auto-assigned names are accounted for.
> > >
> > > As far as I understand, if the user is accessing non declared local
> > > > variables, that won't throw any
> > > > exception/error (because how could that be done?), it would just
> behave
> > > > strangely, with shared
> > > > local variables having seemingly random values?
> > > >
> > >
> > > Yes, after recovery, the shared local variables would lost the value at
> > > time of checkpoint. I have implemented a detection that automatically
> > falls
> > > back to draining before the checkpoint if a callback captures an
> > anonymous
> > > shared variable. Additionally, if users are certain that the variable
> is
> > > safe (e.g. it is read-only or just for statistics), they could annotate
> > the
> > > variable and pass the check, thus the fallback won't take effect. PoC
> of
> > > the detection is here[1].
> > >
> > > > The request's callback should depend on a single request rather than
> > > > being associated with
> > > > > multiple requests (for example, not designed for operations like
> > > > thenCombine() or
> > > > > StateFutureUtils.combineAll() or any iteration).
> > >
> > >
> > > > Could you elaborate a bit more on what's the problem?
> > >
> > >
> > > Well this is a matter of implementation complexity. Currently, we only
> > note
> > > down the request and its callback in a checkpoint. If we want to
> support
> > > multiple-path combinations, we will need to record the relationships
> > > between requests and track the progress of each path, as there may be
> > > multiple stages within each path.
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/pull/24719/files#diff-eae23736b1bb0d4a654c04669105ad3c9bc9c39a8e265693e0134b7787aa2a09R53-R69
> > >
> > > Best,
> > > Zakelly
> > >
> >
>

Reply via email to