Hi Piotr, Thanks for your comments! PTAL at my answers:
AFAIU it means async state requests can be used basically everywhere, but > unless > they are coming from declaring process in process elements/timers, they > won't be > interruptible and will have to be drained before checkpoint commences? > Right? Yes, they should be declared by APIs provided by current FLIP before they can be snapshotable and interruptible. However, the state APIs can only be used by the logic on the task thread or equivalently the same thread. I'm not worried about `snapshotState`, I don't think there are any/many > operators > doing some heavy logic there. I'm more thinking about mailbox actions, for > example > used in AsyncWaitOperator (and hence AsyncScalarFunction). IIUC, the AsyncWaitOperator and AsyncScalarFunction are all non-keyed, so we don't need to worry too much. Furthermore, as I mentioned, we currently do not permit any asynchronous threads to access the state. Perhaps we can address how async user-defined functions could access the state later. Why the callbacks have to be named? I presume we could have some > deterministic name > generator (`(i++).toString()`) and support checkpointing anonymous > callbacks. Have you > considered and rejected that for some reason? Let me clarify this. I think you are talking about declaring a callback without a name provided? This is actually supported. All the explicit `withName` could be omitted, and a name generator, as you mentioned, will generate a name for a callback only if the user utilizes the APIs of this FLIP. These callbacks are still considered 'named' because the auto-assigned names are accounted for. As far as I understand, if the user is accessing non declared local > variables, that won't throw any > exception/error (because how could that be done?), it would just behave > strangely, with shared > local variables having seemingly random values? > Yes, after recovery, the shared local variables would lost the value at time of checkpoint. I have implemented a detection that automatically falls back to draining before the checkpoint if a callback captures an anonymous shared variable. Additionally, if users are certain that the variable is safe (e.g. it is read-only or just for statistics), they could annotate the variable and pass the check, thus the fallback won't take effect. PoC of the detection is here[1]. > The request's callback should depend on a single request rather than > being associated with > > multiple requests (for example, not designed for operations like > thenCombine() or > > StateFutureUtils.combineAll() or any iteration). > Could you elaborate a bit more on what's the problem? Well this is a matter of implementation complexity. Currently, we only note down the request and its callback in a checkpoint. If we want to support multiple-path combinations, we will need to record the relationships between requests and track the progress of each path, as there may be multiple stages within each path. [1] https://github.com/apache/flink/pull/24719/files#diff-eae23736b1bb0d4a654c04669105ad3c9bc9c39a8e265693e0134b7787aa2a09R53-R69 Best, Zakelly