Hello everyone,

I have a convoluted problem.

I am implementing a KeyedProcessFunction that keeps some non-serializable
"state" in memory, in a transient Map (key = stream key, value = the
non-serializable "state").

I can extract a serializable representation to put in Flink state, and I
can load my in-memory "state" from the Flink state. But these operations
are expensive.

Initializing the in-memory "state" is relatively easy. I do it lazily, in
processElement(), on the first record for the key.

The problem is saving the in-memory "state" to Flink state.
I need to do it only before the state snapshot. But KeyedProcessFunction
has no entrypoint called before the state snapshot.
I cannot use CheckpointedFunction.snapshotState(), because it does not work
for keyed state.

Any suggestions?

Note that I cannot use operator state nor a broadcast state.
Processing is keyed. Every processed record modifies the in-memory "state"
of that key. If the job rescale, the state of the key must follow the
partition.


Regards
Lorenzo

Reply via email to