Hi Yun, In the end, I left the code like this
``` override def snapshotState(context: FunctionSnapshotContext): Unit = { for ((k, model) <- models) { modelsBytes.put(k, model.toBytes(v)) } } ``` I have verified with a simple test that most of the times checkpoints seem to work fine. However, from time to time, the map state is not saved properly (getting an empty map state). So, looks like updating the keyed state like that within the `snapshotState` method is conceptually wrong, indeed this method does not receive any keyed context to start with. Because of this, I think the user should not even be allowed to invoke `put` (`nor` clear) on the map state object. That would help making things less confusing. The reason why I am trying to serialize my (keyed state) models inside `snaphsotState` is because these models are self-evolving and possess their own (time-varying) state, otherwise I could just serialize them once after creation on `processElement1` method. So, given this situation, how could I handle my use case? Ideally, I should only serialize them when checkpoints are taken, in particular I want to avoid having to serialize them after every element received in `processElement2` (the state of my models change with each new element processed here). Maybe I cannot achieve my goals with keyed state and need operator state instead. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/