Hi Piotr,

Thanks for your comments! PTAL at my answers:

AFAIU it means async state requests can be used basically everywhere, but
> unless
> they are coming from declaring process in process elements/timers, they
> won't be
> interruptible and will have to be drained before checkpoint commences?
> Right?


Yes, they should be declared by APIs provided by current FLIP before they
can be snapshotable and interruptible. However, the state APIs can only be
used by the logic on the task thread or equivalently the same thread.

I'm not worried about `snapshotState`, I don't think there are any/many
> operators
> doing some heavy logic there. I'm more thinking about mailbox actions, for
> example
> used in AsyncWaitOperator (and hence AsyncScalarFunction).


IIUC, the AsyncWaitOperator and AsyncScalarFunction are all non-keyed, so
we don't need to worry too much. Furthermore, as I mentioned, we currently
do not permit any asynchronous threads to access the state. Perhaps we can
address how async user-defined functions could access the state later.

Why the callbacks have to be named? I presume we could have some
> deterministic name
> generator (`(i++).toString()`) and support checkpointing anonymous
> callbacks. Have you
> considered and rejected that for some reason?


Let me clarify this. I think you are talking about declaring a callback
without a name provided? This is actually supported. All the explicit
`withName` could be omitted, and a name generator, as you mentioned, will
generate a name for a callback only if the user utilizes the APIs of this
FLIP. These callbacks are still considered 'named' because the
auto-assigned names are accounted for.

As far as I understand, if the user is accessing non declared local
> variables, that won't throw any
> exception/error (because how could that be done?), it would just behave
> strangely, with shared
> local variables having seemingly random values?
>

Yes, after recovery, the shared local variables would lost the value at
time of checkpoint. I have implemented a detection that automatically falls
back to draining before the checkpoint if a callback captures an anonymous
shared variable. Additionally, if users are certain that the variable is
safe (e.g. it is read-only or just for statistics), they could annotate the
variable and pass the check, thus the fallback won't take effect. PoC of
the detection is here[1].

> The request's callback should depend on a single request rather than
> being associated with
> > multiple requests (for example, not designed for operations like
> thenCombine() or
> > StateFutureUtils.combineAll() or any iteration).


> Could you elaborate a bit more on what's the problem?


Well this is a matter of implementation complexity. Currently, we only note
down the request and its callback in a checkpoint. If we want to support
multiple-path combinations, we will need to record the relationships
between requests and track the progress of each path, as there may be
multiple stages within each path.


[1]
https://github.com/apache/flink/pull/24719/files#diff-eae23736b1bb0d4a654c04669105ad3c9bc9c39a8e265693e0134b7787aa2a09R53-R69

Best,
Zakelly

Reply via email to