Hello everyone, I have a convoluted problem.
I am implementing a KeyedProcessFunction that keeps some non-serializable "state" in memory, in a transient Map (key = stream key, value = the non-serializable "state"). I can extract a serializable representation to put in Flink state, and I can load my in-memory "state" from the Flink state. But these operations are expensive. Initializing the in-memory "state" is relatively easy. I do it lazily, in processElement(), on the first record for the key. The problem is saving the in-memory "state" to Flink state. I need to do it only before the state snapshot. But KeyedProcessFunction has no entrypoint called before the state snapshot. I cannot use CheckpointedFunction.snapshotState(), because it does not work for keyed state. Any suggestions? Note that I cannot use operator state nor a broadcast state. Processing is keyed. Every processed record modifies the in-memory "state" of that key. If the job rescale, the state of the key must follow the partition. Regards Lorenzo