Given:

```scala
class MyOperator extends KeyedCoProcessFunction[String, ModelDef, Data,
Prediction]
  with CheckpointedFunction {

  // To hold loaded models
  @transient private var models: HashMap[(String, String), Model] = _

  // For serialization purposes
  @transient private var modelsBytes: MapState[(String, String),
Array[Bytes]] = _

  ...

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    modelsBytes.clear() // This raises an exception when there is no active
key set
    for ((k, model) <- models) {
      modelsBytes.put(k, model.toBytes(v))
    }
  }

  override def initializeState(context: FunctionInitializationContext):
Unit = {
    modelsBytes = context.getKeyedStateStore.getMapState[String, String](
      new MapStateDescriptor("modelsBytes", classOf[String],
classOf[String])
    )

    if (context.isRestored) {
      // restore models from modelsBytes
    }
  }

}
```

It happens that `modelsBytes.clear()` raises an exception when there is no
active key. This happens when I start the application from scratch without
any data on the input streams. So, when the time for a checkpoint comes, I
get this error:

`java.lang.NullPointerException: No key set. This method should not be
called outside of a keyed context.`

However, when the input stream contains data, checkpoints work just fine. I
am a bit confused about this because `snapshotState` does not provide a
keyed context (contrary to `processElement1` and `processElement2`, where
the current key is accessible by doing `ctx.getCurrentKey`) so it seems to
me that the calls to `clear` and `put` within `snapshotState` should fail
always since they're supposed to work only within a keyed context. Can
anyone clarify if this is the expected behaviour actually?

Reply via email to