Given:
```scala
class MyOperator extends KeyedCoProcessFunction[String, ModelDef, Data,
Prediction]
with CheckpointedFunction {
// To hold loaded models
@transient private var models: HashMap[(String, String), Model] = _
// For serialization purposes
@transient private var modelsBytes: MapState[(String, String),
Array[Bytes]] = _
...
override def snapshotState(context: FunctionSnapshotContext): Unit = {
modelsBytes.clear() // This raises an exception when there is no active
key set
for ((k, model) <- models) {
modelsBytes.put(k, model.toBytes(v))
}
}
override def initializeState(context: FunctionInitializationContext):
Unit = {
modelsBytes = context.getKeyedStateStore.getMapState[String, String](
new MapStateDescriptor("modelsBytes", classOf[String],
classOf[String])
)
if (context.isRestored) {
// restore models from modelsBytes
}
}
}
```
It happens that `modelsBytes.clear()` raises an exception when there is no
active key. This happens when I start the application from scratch without
any data on the input streams. So, when the time for a checkpoint comes, I
get this error:
`java.lang.NullPointerException: No key set. This method should not be
called outside of a keyed context.`
However, when the input stream contains data, checkpoints work just fine. I
am a bit confused about this because `snapshotState` does not provide a
keyed context (contrary to `processElement1` and `processElement2`, where
the current key is accessible by doing `ctx.getCurrentKey`) so it seems to
me that the calls to `clear` and `put` within `snapshotState` should fail
always since they're supposed to work only within a keyed context. Can
anyone clarify if this is the expected behaviour actually?