Hi again, Thanks for further explanation!
> I briefly revisit all SQL operator implementations, and it seems feasible > for all scenarios. I'm not sure if this answers your question. I think you misunderstood what I meant. Initially I was wondering if we should allow for those async callbacks to be declared and used in mailbox callbacks. However in my second email: > I realised that AsynWaitOperator is using only java heap > variables in the callbacks, that are only copied to Flink's state in the > `snapshotState` call. So a need for declarative/async mailbox callbacks looks to > be entirely theoretical at the moment. In other words, I don't think we need to worry about mailbox callbacks, at least not right now. Maybe in the future, once we will find some good motivation for them? > Leaving aside specific examples, I'm thinking of allowing users to access > the `DeclarationContext` in `open` or `setup` of the operator, which means > they can declare any callback or function they want. All in all, I would be hesitant to allow users to declare callbacks everywhere, as it might lead to some unforeseen problems or readability issues? I don't see a good motivation for them neither in `open`, `setup` nor `snapshotState`. Those methods are called without keyed context. Additionally the only heavy logic that I have seen in both `open` and `setup` was one-off state migration (when the way the operator defines its state changes between Flink versions). Which is rare and is fine for me to be a synchronous process. So all in all, I'm fine with the current proposal. Best, Piotrek wt., 6 sie 2024 o 10:55 Zakelly Lan <zakelly....@gmail.com> napisał(a): > Hi Piotr, > > Thanks for your reply! > > > Because the remote state backend/async state accesses are only supported > > on the keyed state? > > > Yes. And since we only have a heap implementation of Operator State, > offloading them to async threads seems pointless. Currently we keep the > interface in sync-style as it is. > > No, I didn't mean the function accessing the operator's state, but if the > > operator > > itself accessed its own state in a mailbox callback. However now that I > > tried to > > searching for an example, I realised that AsynWaitOperator is using > > only java heap > > variables in the callbacks, that are only copied to Flink's state in the > > `snapshotState` call. So a need for declarative/async mailbox callbacks > > looks to > > be entirely theoretical at the moment. > > > Leaving aside specific examples, I'm thinking of allowing users to access > the `DeclarationContext` in `open` or `setup` of the operator, which means > they can declare any callback or function they want. And those callbacks > can be invoked theoretically everywhere. For example, they can do: > ``` > class CustomOperator extends AbstractStreamOperator { > > private NamedCallback callback; > > public void open() { > callback = getDeclareContext().declare( > (e) -> { // some consumer logic } > ); > } > > public void someFunction() { > valueState.value(callback); > } > } > ``` > > Apart from that, there will be another internal API support processing > something under a keyed context: > ``` > asyncProcessWithKey(key, () -> { // some logic}); > // the function will be processed in mailbox; > // equivalent to > // keyedStateBackend.setCurrentKey(k) and do something > ``` > > I briefly revisit all SQL operator implementations, and it seems feasible > for all scenarios. I'm not sure if this answers your question. > > > Best, > Zakelly > > On Tue, Aug 6, 2024 at 4:12 PM Piotr Nowojski <pnowoj...@apache.org> > wrote: > > > Hi Zakelly, > > > > Thanks for your responses! > > > > > IIUC, the AsyncWaitOperator and AsyncScalarFunction are all non-keyed, > so > > > we don't need to worry too much > > > > Because the remote state backend/async state accesses are only supported > > on the keyed state? > > > > > Furthermore, as I mentioned, we currently > > > do not permit any asynchronous threads to access the state. > > > > No, I didn't mean the function accessing the operator's state, but if the > > operator > > itself accessed its own state in a mailbox callback. However now that I > > tried to > > searching for an example, I realised that AsynWaitOperator is using > > only java heap > > variables in the callbacks, that are only copied to Flink's state in the > > `snapshotState` call. So a need for declarative/async mailbox callbacks > > looks to > > be entirely theoretical at the moment. > > > > Best, > > Piotrek > > > > wt., 6 sie 2024 o 07:33 Zakelly Lan <zakelly....@gmail.com> napisał(a): > > > > > Hi Piotr, > > > > > > Thanks for your comments! PTAL at my answers: > > > > > > AFAIU it means async state requests can be used basically everywhere, > but > > > > unless > > > > they are coming from declaring process in process elements/timers, > they > > > > won't be > > > > interruptible and will have to be drained before checkpoint > commences? > > > > Right? > > > > > > > > > Yes, they should be declared by APIs provided by current FLIP before > they > > > can be snapshotable and interruptible. However, the state APIs can only > > be > > > used by the logic on the task thread or equivalently the same thread. > > > > > > I'm not worried about `snapshotState`, I don't think there are any/many > > > > operators > > > > doing some heavy logic there. I'm more thinking about mailbox > actions, > > > for > > > > example > > > > used in AsyncWaitOperator (and hence AsyncScalarFunction). > > > > > > > > > IIUC, the AsyncWaitOperator and AsyncScalarFunction are all non-keyed, > so > > > we don't need to worry too much. Furthermore, as I mentioned, we > > currently > > > do not permit any asynchronous threads to access the state. Perhaps we > > can > > > address how async user-defined functions could access the state later. > > > > > > Why the callbacks have to be named? I presume we could have some > > > > deterministic name > > > > generator (`(i++).toString()`) and support checkpointing anonymous > > > > callbacks. Have you > > > > considered and rejected that for some reason? > > > > > > > > > Let me clarify this. I think you are talking about declaring a callback > > > without a name provided? This is actually supported. All the explicit > > > `withName` could be omitted, and a name generator, as you mentioned, > will > > > generate a name for a callback only if the user utilizes the APIs of > this > > > FLIP. These callbacks are still considered 'named' because the > > > auto-assigned names are accounted for. > > > > > > As far as I understand, if the user is accessing non declared local > > > > variables, that won't throw any > > > > exception/error (because how could that be done?), it would just > behave > > > > strangely, with shared > > > > local variables having seemingly random values? > > > > > > > > > > Yes, after recovery, the shared local variables would lost the value at > > > time of checkpoint. I have implemented a detection that automatically > > falls > > > back to draining before the checkpoint if a callback captures an > > anonymous > > > shared variable. Additionally, if users are certain that the variable > is > > > safe (e.g. it is read-only or just for statistics), they could annotate > > the > > > variable and pass the check, thus the fallback won't take effect. PoC > of > > > the detection is here[1]. > > > > > > > The request's callback should depend on a single request rather than > > > > being associated with > > > > > multiple requests (for example, not designed for operations like > > > > thenCombine() or > > > > > StateFutureUtils.combineAll() or any iteration). > > > > > > > > > > Could you elaborate a bit more on what's the problem? > > > > > > > > > Well this is a matter of implementation complexity. Currently, we only > > note > > > down the request and its callback in a checkpoint. If we want to > support > > > multiple-path combinations, we will need to record the relationships > > > between requests and track the progress of each path, as there may be > > > multiple stages within each path. > > > > > > > > > [1] > > > > > > > > > https://github.com/apache/flink/pull/24719/files#diff-eae23736b1bb0d4a654c04669105ad3c9bc9c39a8e265693e0134b7787aa2a09R53-R69 > > > > > > Best, > > > Zakelly > > > > > >