Given:
```scala class MyOperator extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction] with CheckpointedFunction { // To hold loaded models @transient private var models: HashMap[(String, String), Model] = _ // For serialization purposes @transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _ ... override def snapshotState(context: FunctionSnapshotContext): Unit = { modelsBytes.clear() // This raises an exception when there is no active key set for ((k, model) <- models) { modelsBytes.put(k, model.toBytes(v)) } } override def initializeState(context: FunctionInitializationContext): Unit = { modelsBytes = context.getKeyedStateStore.getMapState[String, String]( new MapStateDescriptor("modelsBytes", classOf[String], classOf[String]) ) if (context.isRestored) { // restore models from modelsBytes } } } ``` It happens that `modelsBytes.clear()` raises an exception when there is no active key. This happens when I start the application from scratch without any data on the input streams. So, when the time for a checkpoint comes, I get this error: `java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.` However, when the input stream contains data, checkpoints work just fine. I am a bit confused about this because `snapshotState` does not provide a keyed context (contrary to `processElement1` and `processElement2`, where the current key is accessible by doing `ctx.getCurrentKey`) so it seems to me that the calls to `clear` and `put` within `snapshotState` should fail always since they're supposed to work only within a keyed context. Can anyone clarify if this is the expected behaviour actually?