Hi Piotr,

Thanks for your reply!


Because the remote state backend/async state accesses are only supported
> on the keyed state?


Yes. And since we only have a heap implementation of Operator State,
offloading them to async threads seems pointless. Currently we keep the
interface in sync-style as it is.

No, I didn't mean the function accessing the operator's state, but if the
> operator
> itself accessed its own state in a mailbox callback. However now that I
> tried to
> searching for an example, I realised that AsynWaitOperator is using
> only java heap
> variables in the callbacks, that are only copied to Flink's state in the
> `snapshotState` call. So a need for declarative/async mailbox callbacks
> looks to
> be entirely theoretical at the moment.


Leaving aside specific examples, I'm thinking of allowing users to access
the `DeclarationContext` in `open` or `setup` of the operator, which means
they can declare any callback or function they want. And those callbacks
can be invoked theoretically everywhere. For example, they can do:
```
class CustomOperator extends AbstractStreamOperator {

  private NamedCallback callback;

  public void open() {
    callback = getDeclareContext().declare(
       (e) -> {  // some consumer logic }
     );
  }

  public void someFunction() {
    valueState.value(callback);
  }
}
```

Apart from that, there will be another internal API support processing
something under a keyed context:
```
asyncProcessWithKey(key, () -> { // some logic});
// the function will be processed in mailbox;
// equivalent to
// keyedStateBackend.setCurrentKey(k) and do something
```

I briefly revisit all SQL operator implementations, and it seems feasible
for all scenarios. I'm not sure if this answers your question.


Best,
Zakelly

On Tue, Aug 6, 2024 at 4:12 PM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Hi Zakelly,
>
> Thanks for your responses!
>
> > IIUC, the AsyncWaitOperator and AsyncScalarFunction are all non-keyed, so
> > we don't need to worry too much
>
> Because the remote state backend/async state accesses are only supported
> on the keyed state?
>
> > Furthermore, as I mentioned, we currently
> > do not permit any asynchronous threads to access the state.
>
> No, I didn't mean the function accessing the operator's state, but if the
> operator
> itself accessed its own state in a mailbox callback. However now that I
> tried to
> searching for an example, I realised that AsynWaitOperator is using
> only java heap
> variables in the callbacks, that are only copied to Flink's state in the
> `snapshotState` call. So a need for declarative/async mailbox callbacks
> looks to
> be entirely theoretical at the moment.
>
> Best,
> Piotrek
>
> wt., 6 sie 2024 o 07:33 Zakelly Lan <zakelly....@gmail.com> napisał(a):
>
> > Hi Piotr,
> >
> > Thanks for your comments! PTAL at my answers:
> >
> > AFAIU it means async state requests can be used basically everywhere, but
> > > unless
> > > they are coming from declaring process in process elements/timers, they
> > > won't be
> > > interruptible and will have to be drained before checkpoint commences?
> > > Right?
> >
> >
> > Yes, they should be declared by APIs provided by current FLIP before they
> > can be snapshotable and interruptible. However, the state APIs can only
> be
> > used by the logic on the task thread or equivalently the same thread.
> >
> > I'm not worried about `snapshotState`, I don't think there are any/many
> > > operators
> > > doing some heavy logic there. I'm more thinking about mailbox actions,
> > for
> > > example
> > > used in AsyncWaitOperator (and hence AsyncScalarFunction).
> >
> >
> > IIUC, the AsyncWaitOperator and AsyncScalarFunction are all non-keyed, so
> > we don't need to worry too much. Furthermore, as I mentioned, we
> currently
> > do not permit any asynchronous threads to access the state. Perhaps we
> can
> > address how async user-defined functions could access the state later.
> >
> > Why the callbacks have to be named? I presume we could have some
> > > deterministic name
> > > generator (`(i++).toString()`) and support checkpointing anonymous
> > > callbacks. Have you
> > > considered and rejected that for some reason?
> >
> >
> > Let me clarify this. I think you are talking about declaring a callback
> > without a name provided? This is actually supported. All the explicit
> > `withName` could be omitted, and a name generator, as you mentioned, will
> > generate a name for a callback only if the user utilizes the APIs of this
> > FLIP. These callbacks are still considered 'named' because the
> > auto-assigned names are accounted for.
> >
> > As far as I understand, if the user is accessing non declared local
> > > variables, that won't throw any
> > > exception/error (because how could that be done?), it would just behave
> > > strangely, with shared
> > > local variables having seemingly random values?
> > >
> >
> > Yes, after recovery, the shared local variables would lost the value at
> > time of checkpoint. I have implemented a detection that automatically
> falls
> > back to draining before the checkpoint if a callback captures an
> anonymous
> > shared variable. Additionally, if users are certain that the variable is
> > safe (e.g. it is read-only or just for statistics), they could annotate
> the
> > variable and pass the check, thus the fallback won't take effect. PoC of
> > the detection is here[1].
> >
> > > The request's callback should depend on a single request rather than
> > > being associated with
> > > > multiple requests (for example, not designed for operations like
> > > thenCombine() or
> > > > StateFutureUtils.combineAll() or any iteration).
> >
> >
> > > Could you elaborate a bit more on what's the problem?
> >
> >
> > Well this is a matter of implementation complexity. Currently, we only
> note
> > down the request and its callback in a checkpoint. If we want to support
> > multiple-path combinations, we will need to record the relationships
> > between requests and track the progress of each path, as there may be
> > multiple stages within each path.
> >
> >
> > [1]
> >
> >
> https://github.com/apache/flink/pull/24719/files#diff-eae23736b1bb0d4a654c04669105ad3c9bc9c39a8e265693e0134b7787aa2a09R53-R69
> >
> > Best,
> > Zakelly
> >
>

Reply via email to