Hi Piotr, Thanks for your explanation! Now I roughly get your point.
All in all, I would be hesitant to allow users to declare callbacks > everywhere, > as it might lead to some unforeseen problems or readability issues? > I don't see a good motivation for them neither in `open`, `setup` nor > `snapshotState`. > Those methods are called without keyed context. Additionally the only heavy > logic > that I have seen in both `open` and `setup` was one-off state migration > (when the way > the operator defines its state changes between Flink versions). Which is > rare and > is fine for me to be a synchronous process. Note that the declaration of callback is not necessary to go under keyed context, but the invocation of callback is. Anyway, it's only a fledgling idea of allowing users to declare callbacks in `open` and invoke them in other places. I'm not going to introduce it for now. Currently, we are strictly restricting the use of callbacks under keyed context. We will refrain from introducing interfaces until we have a clear understanding of their use cases. I think we have a consensus to keep `snapshotState` synchronous at present. Best, Zakelly On Tue, Aug 6, 2024 at 5:16 PM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi again, > > Thanks for further explanation! > > > I briefly revisit all SQL operator implementations, and it seems feasible > > for all scenarios. I'm not sure if this answers your question. > > I think you misunderstood what I meant. Initially I was wondering if we > should allow for > those async callbacks to be declared and used in mailbox callbacks. However > in my > second email: > > > I realised that AsynWaitOperator is using only java heap > > variables in the callbacks, that are only copied to Flink's state in the > > `snapshotState` call. So a need for declarative/async mailbox callbacks > looks to > > be entirely theoretical at the moment. > > In other words, I don't think we need to worry about mailbox callbacks, at > least > not right now. Maybe in the future, once we will find some good motivation > for them? > > > Leaving aside specific examples, I'm thinking of allowing users to access > > the `DeclarationContext` in `open` or `setup` of the operator, which > means > > they can declare any callback or function they want. > > All in all, I would be hesitant to allow users to declare callbacks > everywhere, > as it might lead to some unforeseen problems or readability issues? > > I don't see a good motivation for them neither in `open`, `setup` nor > `snapshotState`. > Those methods are called without keyed context. Additionally the only heavy > logic > that I have seen in both `open` and `setup` was one-off state migration > (when the way > the operator defines its state changes between Flink versions). Which is > rare and > is fine for me to be a synchronous process. > > So all in all, I'm fine with the current proposal. > > Best, > Piotrek > > > wt., 6 sie 2024 o 10:55 Zakelly Lan <zakelly....@gmail.com> napisał(a): > > > Hi Piotr, > > > > Thanks for your reply! > > > > > > Because the remote state backend/async state accesses are only supported > > > on the keyed state? > > > > > > Yes. And since we only have a heap implementation of Operator State, > > offloading them to async threads seems pointless. Currently we keep the > > interface in sync-style as it is. > > > > No, I didn't mean the function accessing the operator's state, but if the > > > operator > > > itself accessed its own state in a mailbox callback. However now that I > > > tried to > > > searching for an example, I realised that AsynWaitOperator is using > > > only java heap > > > variables in the callbacks, that are only copied to Flink's state in > the > > > `snapshotState` call. So a need for declarative/async mailbox callbacks > > > looks to > > > be entirely theoretical at the moment. > > > > > > Leaving aside specific examples, I'm thinking of allowing users to access > > the `DeclarationContext` in `open` or `setup` of the operator, which > means > > they can declare any callback or function they want. And those callbacks > > can be invoked theoretically everywhere. For example, they can do: > > ``` > > class CustomOperator extends AbstractStreamOperator { > > > > private NamedCallback callback; > > > > public void open() { > > callback = getDeclareContext().declare( > > (e) -> { // some consumer logic } > > ); > > } > > > > public void someFunction() { > > valueState.value(callback); > > } > > } > > ``` > > > > Apart from that, there will be another internal API support processing > > something under a keyed context: > > ``` > > asyncProcessWithKey(key, () -> { // some logic}); > > // the function will be processed in mailbox; > > // equivalent to > > // keyedStateBackend.setCurrentKey(k) and do something > > ``` > > > > I briefly revisit all SQL operator implementations, and it seems feasible > > for all scenarios. I'm not sure if this answers your question. > > > > > > Best, > > Zakelly > > > > On Tue, Aug 6, 2024 at 4:12 PM Piotr Nowojski <pnowoj...@apache.org> > > wrote: > > > > > Hi Zakelly, > > > > > > Thanks for your responses! > > > > > > > IIUC, the AsyncWaitOperator and AsyncScalarFunction are all > non-keyed, > > so > > > > we don't need to worry too much > > > > > > Because the remote state backend/async state accesses are only > supported > > > on the keyed state? > > > > > > > Furthermore, as I mentioned, we currently > > > > do not permit any asynchronous threads to access the state. > > > > > > No, I didn't mean the function accessing the operator's state, but if > the > > > operator > > > itself accessed its own state in a mailbox callback. However now that I > > > tried to > > > searching for an example, I realised that AsynWaitOperator is using > > > only java heap > > > variables in the callbacks, that are only copied to Flink's state in > the > > > `snapshotState` call. So a need for declarative/async mailbox callbacks > > > looks to > > > be entirely theoretical at the moment. > > > > > > Best, > > > Piotrek > > > > > > wt., 6 sie 2024 o 07:33 Zakelly Lan <zakelly....@gmail.com> > napisał(a): > > > > > > > Hi Piotr, > > > > > > > > Thanks for your comments! PTAL at my answers: > > > > > > > > AFAIU it means async state requests can be used basically everywhere, > > but > > > > > unless > > > > > they are coming from declaring process in process elements/timers, > > they > > > > > won't be > > > > > interruptible and will have to be drained before checkpoint > > commences? > > > > > Right? > > > > > > > > > > > > Yes, they should be declared by APIs provided by current FLIP before > > they > > > > can be snapshotable and interruptible. However, the state APIs can > only > > > be > > > > used by the logic on the task thread or equivalently the same thread. > > > > > > > > I'm not worried about `snapshotState`, I don't think there are > any/many > > > > > operators > > > > > doing some heavy logic there. I'm more thinking about mailbox > > actions, > > > > for > > > > > example > > > > > used in AsyncWaitOperator (and hence AsyncScalarFunction). > > > > > > > > > > > > IIUC, the AsyncWaitOperator and AsyncScalarFunction are all > non-keyed, > > so > > > > we don't need to worry too much. Furthermore, as I mentioned, we > > > currently > > > > do not permit any asynchronous threads to access the state. Perhaps > we > > > can > > > > address how async user-defined functions could access the state > later. > > > > > > > > Why the callbacks have to be named? I presume we could have some > > > > > deterministic name > > > > > generator (`(i++).toString()`) and support checkpointing anonymous > > > > > callbacks. Have you > > > > > considered and rejected that for some reason? > > > > > > > > > > > > Let me clarify this. I think you are talking about declaring a > callback > > > > without a name provided? This is actually supported. All the explicit > > > > `withName` could be omitted, and a name generator, as you mentioned, > > will > > > > generate a name for a callback only if the user utilizes the APIs of > > this > > > > FLIP. These callbacks are still considered 'named' because the > > > > auto-assigned names are accounted for. > > > > > > > > As far as I understand, if the user is accessing non declared local > > > > > variables, that won't throw any > > > > > exception/error (because how could that be done?), it would just > > behave > > > > > strangely, with shared > > > > > local variables having seemingly random values? > > > > > > > > > > > > > Yes, after recovery, the shared local variables would lost the value > at > > > > time of checkpoint. I have implemented a detection that automatically > > > falls > > > > back to draining before the checkpoint if a callback captures an > > > anonymous > > > > shared variable. Additionally, if users are certain that the variable > > is > > > > safe (e.g. it is read-only or just for statistics), they could > annotate > > > the > > > > variable and pass the check, thus the fallback won't take effect. PoC > > of > > > > the detection is here[1]. > > > > > > > > > The request's callback should depend on a single request rather > than > > > > > being associated with > > > > > > multiple requests (for example, not designed for operations like > > > > > thenCombine() or > > > > > > StateFutureUtils.combineAll() or any iteration). > > > > > > > > > > > > > Could you elaborate a bit more on what's the problem? > > > > > > > > > > > > Well this is a matter of implementation complexity. Currently, we > only > > > note > > > > down the request and its callback in a checkpoint. If we want to > > support > > > > multiple-path combinations, we will need to record the relationships > > > > between requests and track the progress of each path, as there may be > > > > multiple stages within each path. > > > > > > > > > > > > [1] > > > > > > > > > > > > > > https://github.com/apache/flink/pull/24719/files#diff-eae23736b1bb0d4a654c04669105ad3c9bc9c39a8e265693e0134b7787aa2a09R53-R69 > > > > > > > > Best, > > > > Zakelly > > > > > > > > > >