Hi Simon, You don't need to write any code to checkpoint the Keyed State. It is done automatically by Flink. Just remove the `snapshoteState(..)` and `restoreState(..)` methods.
Cheers, Max On Wed, Jun 1, 2016 at 4:00 PM, simon peyer <simon.pe...@soom-it.ch> wrote: > Hi Max > > I'm using a keyby but would like to store the state. > > Thus what's the way to go? > > How do I have to handle the state in option 2). > > Could you give an example? > > Thanks > --Simon > >> On 01 Jun 2016, at 15:55, Maximilian Michels <m...@apache.org> wrote: >> >> Hi Simon, >> >> There are two types of state: >> >> >> 1) Keyed State >> >> The state you access via `getRuntimeContext().getState(..)` is scoped >> by key. If no key is in the scope, the key is null and update >> operations won't work. Use a `keyBy(..)` before your map function to >> partition the state by key. The state is automatically checkpointed by >> Flink. >> >> 2) Operator State >> >> This state is kept per parallel instance of the operator. You >> implement the Checkpointed interface and use the `snapshotState(..)` >> and `restoreState(..)` methods to checkpoint the state. >> >> >> I think you want to use one of the two. Although it is possible to use >> both, it looks like you're confusing the two in your example. >> >> Cheers, >> Max >> >> On Wed, Jun 1, 2016 at 3:06 PM, simon peyer <simon.pe...@soom-it.ch> wrote: >>> Hi together >>> >>> I did implement a little pipeline, which has some statefull computation: >>> >>> Conntaing a function which extends RichFlatMapFunction and Checkpointed. >>> >>> The state is stored in the field: >>> >>> private var state_item: ValueState[Option[Pathsection]] = null >>> >>> >>> override def open(conf: Configuration):Unit = { >>> log.info("Open a new Checkpointed FlatMap function with configuration: >>> {}", conf) >>> state_item = getRuntimeContext.getState(new ValueStateDescriptor("State >>> of Pathsection", >>> (Option(Pathsection)).getClass.asInstanceOf[Class[Option[Pathsection]]], >>> None)) >>> } >>> >>> >>> override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): >>> Option[Pathsection] = { >>> log.debug("Snapshote State with checkpointId: {} at Timestamp {}", >>> checkpointId, checkpointTimestamp) >>> removeOldEntries(checkpointTimestamp) >>> state_item.value() >>> } >>> >>> override def restoreState(state: Option[Pathsection]):Unit = { >>> >>> >>> >>> if (state == null){ >>> log.debug("Restore Snapshot: Null") >>> } >>> else if(state == None){ >>> log.debug("Restore Snapshot: None") >>> } >>> else if (state_item == null){ >>> log.debug("State Item not initialized") >>> } >>> else{ >>> state_item.update(state) >>> } >>> } >>> >>> But when I do run this computation and get the program to fail, I get the >>> following error: >>> >>> >>> java.lang.Exception: Could not restore checkpointed state to operators and >>> functions >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.lang.Exception: Failed to restore state to function: No key >>> available. >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449) >>> ... 3 more >>> Caused by: java.lang.RuntimeException: No key available. >>> at >>> org.apache.flink.runtime.state.memory.MemValueState.update(MemValueState.java:69) >>> at ....Function which has the Checkpointed thingy >>> (CheckpointedIncrAddrPositions.scala:68) >>> >>> What am I missing? >>> >>> Thanks >>> Simon >>> >