Hi Simon,

There are two types of state:


1) Keyed State

The state you access via `getRuntimeContext().getState(..)` is scoped
by key. If no key is in the scope, the key is null and update
operations won't work. Use a `keyBy(..)` before your map function to
partition the state by key. The state is automatically checkpointed by
Flink.

2) Operator State

This state is kept per parallel instance of the operator. You
implement the Checkpointed interface and use the `snapshotState(..)`
and `restoreState(..)` methods to checkpoint the state.


I think you want to use one of the two. Although it is possible to use
both, it looks like you're confusing the two in your example.

Cheers,
Max

On Wed, Jun 1, 2016 at 3:06 PM, simon peyer <simon.pe...@soom-it.ch> wrote:
> Hi together
>
> I did implement a little pipeline, which has some statefull computation:
>
> Conntaing a function which extends RichFlatMapFunction and Checkpointed.
>
> The state is stored in the field:
>
>   private var state_item: ValueState[Option[Pathsection]] = null
>
>
>   override def open(conf: Configuration):Unit = {
>     log.info("Open a new Checkpointed FlatMap function with configuration:
> {}", conf)
>     state_item = getRuntimeContext.getState(new ValueStateDescriptor("State
> of Pathsection",
> (Option(Pathsection)).getClass.asInstanceOf[Class[Option[Pathsection]]],
> None))
>   }
>
>
>   override def snapshotState(checkpointId: Long, checkpointTimestamp: Long):
> Option[Pathsection] = {
>     log.debug("Snapshote State with checkpointId: {} at Timestamp {}",
> checkpointId, checkpointTimestamp)
>     removeOldEntries(checkpointTimestamp)
>     state_item.value()
>   }
>
>   override def restoreState(state: Option[Pathsection]):Unit = {
>
>
>
>     if (state == null){
>       log.debug("Restore Snapshot: Null")
>     }
>     else if(state == None){
>       log.debug("Restore Snapshot: None")
>     }
>     else if (state_item == null){
>       log.debug("State Item not initialized")
>     }
>     else{
>       state_item.update(state)
>     }
>   }
>
> But when I do run this computation and get the program to fail, I get the
> following error:
>
>
> java.lang.Exception: Could not restore checkpointed state to operators and
> functions
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Failed to restore state to function: No key
> available.
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449)
> ... 3 more
> Caused by: java.lang.RuntimeException: No key available.
> at
> org.apache.flink.runtime.state.memory.MemValueState.update(MemValueState.java:69)
> at ....Function which has the Checkpointed thingy
> (CheckpointedIncrAddrPositions.scala:68)
>
> What am I missing?
>
> Thanks
> Simon
>

Reply via email to