Hi Max I'm using a keyby but would like to store the state.
Thus what's the way to go? How do I have to handle the state in option 2). Could you give an example? Thanks --Simon > On 01 Jun 2016, at 15:55, Maximilian Michels <m...@apache.org> wrote: > > Hi Simon, > > There are two types of state: > > > 1) Keyed State > > The state you access via `getRuntimeContext().getState(..)` is scoped > by key. If no key is in the scope, the key is null and update > operations won't work. Use a `keyBy(..)` before your map function to > partition the state by key. The state is automatically checkpointed by > Flink. > > 2) Operator State > > This state is kept per parallel instance of the operator. You > implement the Checkpointed interface and use the `snapshotState(..)` > and `restoreState(..)` methods to checkpoint the state. > > > I think you want to use one of the two. Although it is possible to use > both, it looks like you're confusing the two in your example. > > Cheers, > Max > > On Wed, Jun 1, 2016 at 3:06 PM, simon peyer <simon.pe...@soom-it.ch> wrote: >> Hi together >> >> I did implement a little pipeline, which has some statefull computation: >> >> Conntaing a function which extends RichFlatMapFunction and Checkpointed. >> >> The state is stored in the field: >> >> private var state_item: ValueState[Option[Pathsection]] = null >> >> >> override def open(conf: Configuration):Unit = { >> log.info("Open a new Checkpointed FlatMap function with configuration: >> {}", conf) >> state_item = getRuntimeContext.getState(new ValueStateDescriptor("State >> of Pathsection", >> (Option(Pathsection)).getClass.asInstanceOf[Class[Option[Pathsection]]], >> None)) >> } >> >> >> override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): >> Option[Pathsection] = { >> log.debug("Snapshote State with checkpointId: {} at Timestamp {}", >> checkpointId, checkpointTimestamp) >> removeOldEntries(checkpointTimestamp) >> state_item.value() >> } >> >> override def restoreState(state: Option[Pathsection]):Unit = { >> >> >> >> if (state == null){ >> log.debug("Restore Snapshot: Null") >> } >> else if(state == None){ >> log.debug("Restore Snapshot: None") >> } >> else if (state_item == null){ >> log.debug("State Item not initialized") >> } >> else{ >> state_item.update(state) >> } >> } >> >> But when I do run this computation and get the program to fail, I get the >> following error: >> >> >> java.lang.Exception: Could not restore checkpointed state to operators and >> functions >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.Exception: Failed to restore state to function: No key >> available. >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449) >> ... 3 more >> Caused by: java.lang.RuntimeException: No key available. >> at >> org.apache.flink.runtime.state.memory.MemValueState.update(MemValueState.java:69) >> at ....Function which has the Checkpointed thingy >> (CheckpointedIncrAddrPositions.scala:68) >> >> What am I missing? >> >> Thanks >> Simon >>