Hi everyone, It seems there is no further discussion about the current proposal, I will start a vote next Monday.
Best, Zakelly On Tue, Aug 6, 2024 at 6:22 PM Zakelly Lan <zakelly....@gmail.com> wrote: > Hi Piotr, > > Thanks for your explanation! Now I roughly get your point. > > All in all, I would be hesitant to allow users to declare callbacks >> everywhere, >> as it might lead to some unforeseen problems or readability issues? >> I don't see a good motivation for them neither in `open`, `setup` nor >> `snapshotState`. >> Those methods are called without keyed context. Additionally the only >> heavy >> logic >> that I have seen in both `open` and `setup` was one-off state migration >> (when the way >> the operator defines its state changes between Flink versions). Which is >> rare and >> is fine for me to be a synchronous process. > > > Note that the declaration of callback is not necessary to go under keyed > context, but the invocation of callback is. Anyway, it's only a fledgling > idea of allowing users to declare callbacks in `open` and invoke them in > other places. I'm not going to introduce it for now. > Currently, we are strictly restricting the use of callbacks under keyed > context. We will refrain from introducing interfaces until we have a clear > understanding of their use cases. I think we have a consensus to keep > `snapshotState` synchronous at present. > > > Best, > Zakelly > > On Tue, Aug 6, 2024 at 5:16 PM Piotr Nowojski <pnowoj...@apache.org> > wrote: > >> Hi again, >> >> Thanks for further explanation! >> >> > I briefly revisit all SQL operator implementations, and it seems >> feasible >> > for all scenarios. I'm not sure if this answers your question. >> >> I think you misunderstood what I meant. Initially I was wondering if we >> should allow for >> those async callbacks to be declared and used in mailbox callbacks. >> However >> in my >> second email: >> >> > I realised that AsynWaitOperator is using only java heap >> > variables in the callbacks, that are only copied to Flink's state in the >> > `snapshotState` call. So a need for declarative/async mailbox callbacks >> looks to >> > be entirely theoretical at the moment. >> >> In other words, I don't think we need to worry about mailbox callbacks, at >> least >> not right now. Maybe in the future, once we will find some good motivation >> for them? >> >> > Leaving aside specific examples, I'm thinking of allowing users to >> access >> > the `DeclarationContext` in `open` or `setup` of the operator, which >> means >> > they can declare any callback or function they want. >> >> All in all, I would be hesitant to allow users to declare callbacks >> everywhere, >> as it might lead to some unforeseen problems or readability issues? >> >> I don't see a good motivation for them neither in `open`, `setup` nor >> `snapshotState`. >> Those methods are called without keyed context. Additionally the only >> heavy >> logic >> that I have seen in both `open` and `setup` was one-off state migration >> (when the way >> the operator defines its state changes between Flink versions). Which is >> rare and >> is fine for me to be a synchronous process. >> >> So all in all, I'm fine with the current proposal. >> >> Best, >> Piotrek >> >> >> wt., 6 sie 2024 o 10:55 Zakelly Lan <zakelly....@gmail.com> napisał(a): >> >> > Hi Piotr, >> > >> > Thanks for your reply! >> > >> > >> > Because the remote state backend/async state accesses are only supported >> > > on the keyed state? >> > >> > >> > Yes. And since we only have a heap implementation of Operator State, >> > offloading them to async threads seems pointless. Currently we keep the >> > interface in sync-style as it is. >> > >> > No, I didn't mean the function accessing the operator's state, but if >> the >> > > operator >> > > itself accessed its own state in a mailbox callback. However now that >> I >> > > tried to >> > > searching for an example, I realised that AsynWaitOperator is using >> > > only java heap >> > > variables in the callbacks, that are only copied to Flink's state in >> the >> > > `snapshotState` call. So a need for declarative/async mailbox >> callbacks >> > > looks to >> > > be entirely theoretical at the moment. >> > >> > >> > Leaving aside specific examples, I'm thinking of allowing users to >> access >> > the `DeclarationContext` in `open` or `setup` of the operator, which >> means >> > they can declare any callback or function they want. And those callbacks >> > can be invoked theoretically everywhere. For example, they can do: >> > ``` >> > class CustomOperator extends AbstractStreamOperator { >> > >> > private NamedCallback callback; >> > >> > public void open() { >> > callback = getDeclareContext().declare( >> > (e) -> { // some consumer logic } >> > ); >> > } >> > >> > public void someFunction() { >> > valueState.value(callback); >> > } >> > } >> > ``` >> > >> > Apart from that, there will be another internal API support processing >> > something under a keyed context: >> > ``` >> > asyncProcessWithKey(key, () -> { // some logic}); >> > // the function will be processed in mailbox; >> > // equivalent to >> > // keyedStateBackend.setCurrentKey(k) and do something >> > ``` >> > >> > I briefly revisit all SQL operator implementations, and it seems >> feasible >> > for all scenarios. I'm not sure if this answers your question. >> > >> > >> > Best, >> > Zakelly >> > >> > On Tue, Aug 6, 2024 at 4:12 PM Piotr Nowojski <pnowoj...@apache.org> >> > wrote: >> > >> > > Hi Zakelly, >> > > >> > > Thanks for your responses! >> > > >> > > > IIUC, the AsyncWaitOperator and AsyncScalarFunction are all >> non-keyed, >> > so >> > > > we don't need to worry too much >> > > >> > > Because the remote state backend/async state accesses are only >> supported >> > > on the keyed state? >> > > >> > > > Furthermore, as I mentioned, we currently >> > > > do not permit any asynchronous threads to access the state. >> > > >> > > No, I didn't mean the function accessing the operator's state, but if >> the >> > > operator >> > > itself accessed its own state in a mailbox callback. However now that >> I >> > > tried to >> > > searching for an example, I realised that AsynWaitOperator is using >> > > only java heap >> > > variables in the callbacks, that are only copied to Flink's state in >> the >> > > `snapshotState` call. So a need for declarative/async mailbox >> callbacks >> > > looks to >> > > be entirely theoretical at the moment. >> > > >> > > Best, >> > > Piotrek >> > > >> > > wt., 6 sie 2024 o 07:33 Zakelly Lan <zakelly....@gmail.com> >> napisał(a): >> > > >> > > > Hi Piotr, >> > > > >> > > > Thanks for your comments! PTAL at my answers: >> > > > >> > > > AFAIU it means async state requests can be used basically >> everywhere, >> > but >> > > > > unless >> > > > > they are coming from declaring process in process elements/timers, >> > they >> > > > > won't be >> > > > > interruptible and will have to be drained before checkpoint >> > commences? >> > > > > Right? >> > > > >> > > > >> > > > Yes, they should be declared by APIs provided by current FLIP before >> > they >> > > > can be snapshotable and interruptible. However, the state APIs can >> only >> > > be >> > > > used by the logic on the task thread or equivalently the same >> thread. >> > > > >> > > > I'm not worried about `snapshotState`, I don't think there are >> any/many >> > > > > operators >> > > > > doing some heavy logic there. I'm more thinking about mailbox >> > actions, >> > > > for >> > > > > example >> > > > > used in AsyncWaitOperator (and hence AsyncScalarFunction). >> > > > >> > > > >> > > > IIUC, the AsyncWaitOperator and AsyncScalarFunction are all >> non-keyed, >> > so >> > > > we don't need to worry too much. Furthermore, as I mentioned, we >> > > currently >> > > > do not permit any asynchronous threads to access the state. Perhaps >> we >> > > can >> > > > address how async user-defined functions could access the state >> later. >> > > > >> > > > Why the callbacks have to be named? I presume we could have some >> > > > > deterministic name >> > > > > generator (`(i++).toString()`) and support checkpointing anonymous >> > > > > callbacks. Have you >> > > > > considered and rejected that for some reason? >> > > > >> > > > >> > > > Let me clarify this. I think you are talking about declaring a >> callback >> > > > without a name provided? This is actually supported. All the >> explicit >> > > > `withName` could be omitted, and a name generator, as you mentioned, >> > will >> > > > generate a name for a callback only if the user utilizes the APIs of >> > this >> > > > FLIP. These callbacks are still considered 'named' because the >> > > > auto-assigned names are accounted for. >> > > > >> > > > As far as I understand, if the user is accessing non declared local >> > > > > variables, that won't throw any >> > > > > exception/error (because how could that be done?), it would just >> > behave >> > > > > strangely, with shared >> > > > > local variables having seemingly random values? >> > > > > >> > > > >> > > > Yes, after recovery, the shared local variables would lost the >> value at >> > > > time of checkpoint. I have implemented a detection that >> automatically >> > > falls >> > > > back to draining before the checkpoint if a callback captures an >> > > anonymous >> > > > shared variable. Additionally, if users are certain that the >> variable >> > is >> > > > safe (e.g. it is read-only or just for statistics), they could >> annotate >> > > the >> > > > variable and pass the check, thus the fallback won't take effect. >> PoC >> > of >> > > > the detection is here[1]. >> > > > >> > > > > The request's callback should depend on a single request rather >> than >> > > > > being associated with >> > > > > > multiple requests (for example, not designed for operations like >> > > > > thenCombine() or >> > > > > > StateFutureUtils.combineAll() or any iteration). >> > > > >> > > > >> > > > > Could you elaborate a bit more on what's the problem? >> > > > >> > > > >> > > > Well this is a matter of implementation complexity. Currently, we >> only >> > > note >> > > > down the request and its callback in a checkpoint. If we want to >> > support >> > > > multiple-path combinations, we will need to record the relationships >> > > > between requests and track the progress of each path, as there may >> be >> > > > multiple stages within each path. >> > > > >> > > > >> > > > [1] >> > > > >> > > > >> > > >> > >> https://github.com/apache/flink/pull/24719/files#diff-eae23736b1bb0d4a654c04669105ad3c9bc9c39a8e265693e0134b7787aa2a09R53-R69 >> > > > >> > > > Best, >> > > > Zakelly >> > > > >> > > >> > >> >