Hi Simon,

You don't need to write any code to checkpoint the Keyed State. It is
done automatically by Flink. Just remove the `snapshoteState(..)` and
`restoreState(..)` methods.

Cheers,
Max

On Wed, Jun 1, 2016 at 4:00 PM, simon peyer <simon.pe...@soom-it.ch> wrote:
> Hi Max
>
> I'm using a keyby but would like to store the state.
>
> Thus what's the way to go?
>
> How do I have to handle the state in option 2).
>
> Could you give an example?
>
> Thanks
> --Simon
>
>> On 01 Jun 2016, at 15:55, Maximilian Michels <m...@apache.org> wrote:
>>
>> Hi Simon,
>>
>> There are two types of state:
>>
>>
>> 1) Keyed State
>>
>> The state you access via `getRuntimeContext().getState(..)` is scoped
>> by key. If no key is in the scope, the key is null and update
>> operations won't work. Use a `keyBy(..)` before your map function to
>> partition the state by key. The state is automatically checkpointed by
>> Flink.
>>
>> 2) Operator State
>>
>> This state is kept per parallel instance of the operator. You
>> implement the Checkpointed interface and use the `snapshotState(..)`
>> and `restoreState(..)` methods to checkpoint the state.
>>
>>
>> I think you want to use one of the two. Although it is possible to use
>> both, it looks like you're confusing the two in your example.
>>
>> Cheers,
>> Max
>>
>> On Wed, Jun 1, 2016 at 3:06 PM, simon peyer <simon.pe...@soom-it.ch> wrote:
>>> Hi together
>>>
>>> I did implement a little pipeline, which has some statefull computation:
>>>
>>> Conntaing a function which extends RichFlatMapFunction and Checkpointed.
>>>
>>> The state is stored in the field:
>>>
>>>  private var state_item: ValueState[Option[Pathsection]] = null
>>>
>>>
>>>  override def open(conf: Configuration):Unit = {
>>>    log.info("Open a new Checkpointed FlatMap function with configuration:
>>> {}", conf)
>>>    state_item = getRuntimeContext.getState(new ValueStateDescriptor("State
>>> of Pathsection",
>>> (Option(Pathsection)).getClass.asInstanceOf[Class[Option[Pathsection]]],
>>> None))
>>>  }
>>>
>>>
>>>  override def snapshotState(checkpointId: Long, checkpointTimestamp: Long):
>>> Option[Pathsection] = {
>>>    log.debug("Snapshote State with checkpointId: {} at Timestamp {}",
>>> checkpointId, checkpointTimestamp)
>>>    removeOldEntries(checkpointTimestamp)
>>>    state_item.value()
>>>  }
>>>
>>>  override def restoreState(state: Option[Pathsection]):Unit = {
>>>
>>>
>>>
>>>    if (state == null){
>>>      log.debug("Restore Snapshot: Null")
>>>    }
>>>    else if(state == None){
>>>      log.debug("Restore Snapshot: None")
>>>    }
>>>    else if (state_item == null){
>>>      log.debug("State Item not initialized")
>>>    }
>>>    else{
>>>      state_item.update(state)
>>>    }
>>>  }
>>>
>>> But when I do run this computation and get the program to fail, I get the
>>> following error:
>>>
>>>
>>> java.lang.Exception: Could not restore checkpointed state to operators and
>>> functions
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.Exception: Failed to restore state to function: No key
>>> available.
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449)
>>> ... 3 more
>>> Caused by: java.lang.RuntimeException: No key available.
>>> at
>>> org.apache.flink.runtime.state.memory.MemValueState.update(MemValueState.java:69)
>>> at ....Function which has the Checkpointed thingy
>>> (CheckpointedIncrAddrPositions.scala:68)
>>>
>>> What am I missing?
>>>
>>> Thanks
>>> Simon
>>>
>

Reply via email to