Hi Yun,

In the end, I left the code like this

```
override def snapshotState(context: FunctionSnapshotContext): Unit = {
    for ((k, model) <- models) {
      modelsBytes.put(k, model.toBytes(v))
    }
}
```

I have verified with a simple test that most of the times checkpoints seem
to work fine. However, from time to time, the map state is not saved
properly (getting an empty map state). So, looks like updating the keyed
state like that within the `snapshotState` method is conceptually wrong,
indeed this method does not receive any keyed context to start with. Because
of this, I think the user should not even be allowed to invoke `put` (`nor`
clear) on the map state object. That would help making things less
confusing.

The reason why I am trying to serialize my (keyed state) models inside
`snaphsotState` is because these models are self-evolving and possess their
own (time-varying) state, otherwise I could just serialize them once after
creation on `processElement1` method. So, given this situation, how could I
handle my use case? Ideally, I should only serialize them when checkpoints
are taken, in particular I want to avoid having to serialize them after
every element received in `processElement2` (the state of my models change
with each new element processed here).  Maybe I cannot achieve my goals with
keyed state and need operator state instead. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to