In a KeyedCoProcessFunction, I am managing a keyed state which consists of
third-party library models. These models are created on reception of new
data on the control stream within `processElement1`. Because the models are
self-evolving, in the sense that have their own internal state, I need to
make sure that they are serialized in `modelsBytes` when their state
changes. My first attempt goes like this:
```scala
class MyOperator
extends KeyedCoProcessFunction[String, Control, Data, Prediction]
with CheckpointedFunction {
// To hold loaded models
@transient private var models: HashMap[String, Model] = _
// For serialization purposes
@transient private var modelsBytes: MapState[String, Array[Bytes]] = _
override def processElement1(control, ctx, ...) {
if (restoreModels) {
restoreModels()
}
// - Create new model out of `control` element
// - Add it to `models` keyed state
}
override def processElement2(data, ctx, ...) {
if (restoreModels) {
restoreModels()
}
// - Send `data` element to the corresponding models
// This will update their internal states
}
override def snapshotState(context: FunctionSnapshotContext): Unit = {
// Suspicious, wishful-thinking code that compiles and runs just fine
for ((k, model) <- models) {
modelsBytes.put(k, model.toBytes(v))
}
}
override def initializeState(context: FunctionInitializationContext): Unit
= {
modelsBytes = context.getKeyedStateStore.getMapState[String](
new MapStateDescriptor("modelsBytes", classOf[String])
)
if (context.isRestored) restoreModels = true
}
}
```
So, the idea is to use `snapshotState` to override the *keyed* state entries
in `modelsBytes`. The reason why I am trying this approach is because
serializing the models (`model.toBytes`) might be an expensive operation.
Therefore, I would prefer to do it once per model when a checkpoint comes.
The problem with this approach is that it might be inherently/conceptually
wrong. Here is why, even if the code within `snapshotState` compiles and
runs just fine, note that I am referring to a keyed state piece without
getting a `keyed` context passed in, so it is not clear at all what key I am
really working on to start with. I have written a small test to verify the
checkpoints, and I have observed that from time to time I get an empty state
back, even if the modelsBytes state entries were updated in `snapshotState`.
So it seems that snapshotting my models like this is not reliable at all.
What confuses me is that the user is perfectly allowed to do this, maybe the
`put` method should raise an exception to make it clear that a keyed state
is required in the first place, otherwise it gives false hope and might lead
to hard-to-spot bugs. As a matter of fact, shouldn't this be considered a
bug?
The other option I have is, of course, to serialize my models in
`processElement2`, after sending new data elements to them. However,
continuously serialzing my models to update `modelsBytes` might be costly.
What would be the most efficient way to handle this scenario?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/