Hi together I did implement a little pipeline, which has some statefull computation:
Conntaing a function which extends RichFlatMapFunction and Checkpointed. The state is stored in the field: private var state_item: ValueState[Option[Pathsection]] = null override def open(conf: Configuration):Unit = { log.info("Open a new Checkpointed FlatMap function with configuration: {}", conf) state_item = getRuntimeContext.getState(new ValueStateDescriptor("State of Pathsection", (Option(Pathsection)).getClass.asInstanceOf[Class[Option[Pathsection]]], None)) } override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Option[Pathsection] = { log.debug("Snapshote State with checkpointId: {} at Timestamp {}", checkpointId, checkpointTimestamp) removeOldEntries(checkpointTimestamp) state_item.value() } override def restoreState(state: Option[Pathsection]):Unit = { if (state == null){ log.debug("Restore Snapshot: Null") } else if(state == None){ log.debug("Restore Snapshot: None") } else if (state_item == null){ log.debug("State Item not initialized") } else{ state_item.update(state) } } But when I do run this computation and get the program to fail, I get the following error: java.lang.Exception: Could not restore checkpointed state to operators and functions at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Failed to restore state to function: No key available. at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449) ... 3 more Caused by: java.lang.RuntimeException: No key available. at org.apache.flink.runtime.state.memory.MemValueState.update(MemValueState.java:69) at ....Function which has the Checkpointed thingy (CheckpointedIncrAddrPositions.scala:68) What am I missing? Thanks Simon