Hi Max

I'm using a keyby but would like to store the state.

Thus what's the way to go?

How do I have to handle the state in option 2).

Could you give an example?

Thanks
--Simon

> On 01 Jun 2016, at 15:55, Maximilian Michels <m...@apache.org> wrote:
> 
> Hi Simon,
> 
> There are two types of state:
> 
> 
> 1) Keyed State
> 
> The state you access via `getRuntimeContext().getState(..)` is scoped
> by key. If no key is in the scope, the key is null and update
> operations won't work. Use a `keyBy(..)` before your map function to
> partition the state by key. The state is automatically checkpointed by
> Flink.
> 
> 2) Operator State
> 
> This state is kept per parallel instance of the operator. You
> implement the Checkpointed interface and use the `snapshotState(..)`
> and `restoreState(..)` methods to checkpoint the state.
> 
> 
> I think you want to use one of the two. Although it is possible to use
> both, it looks like you're confusing the two in your example.
> 
> Cheers,
> Max
> 
> On Wed, Jun 1, 2016 at 3:06 PM, simon peyer <simon.pe...@soom-it.ch> wrote:
>> Hi together
>> 
>> I did implement a little pipeline, which has some statefull computation:
>> 
>> Conntaing a function which extends RichFlatMapFunction and Checkpointed.
>> 
>> The state is stored in the field:
>> 
>>  private var state_item: ValueState[Option[Pathsection]] = null
>> 
>> 
>>  override def open(conf: Configuration):Unit = {
>>    log.info("Open a new Checkpointed FlatMap function with configuration:
>> {}", conf)
>>    state_item = getRuntimeContext.getState(new ValueStateDescriptor("State
>> of Pathsection",
>> (Option(Pathsection)).getClass.asInstanceOf[Class[Option[Pathsection]]],
>> None))
>>  }
>> 
>> 
>>  override def snapshotState(checkpointId: Long, checkpointTimestamp: Long):
>> Option[Pathsection] = {
>>    log.debug("Snapshote State with checkpointId: {} at Timestamp {}",
>> checkpointId, checkpointTimestamp)
>>    removeOldEntries(checkpointTimestamp)
>>    state_item.value()
>>  }
>> 
>>  override def restoreState(state: Option[Pathsection]):Unit = {
>> 
>> 
>> 
>>    if (state == null){
>>      log.debug("Restore Snapshot: Null")
>>    }
>>    else if(state == None){
>>      log.debug("Restore Snapshot: None")
>>    }
>>    else if (state_item == null){
>>      log.debug("State Item not initialized")
>>    }
>>    else{
>>      state_item.update(state)
>>    }
>>  }
>> 
>> But when I do run this computation and get the program to fail, I get the
>> following error:
>> 
>> 
>> java.lang.Exception: Could not restore checkpointed state to operators and
>> functions
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.Exception: Failed to restore state to function: No key
>> available.
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449)
>> ... 3 more
>> Caused by: java.lang.RuntimeException: No key available.
>> at
>> org.apache.flink.runtime.state.memory.MemValueState.update(MemValueState.java:69)
>> at ....Function which has the Checkpointed thingy
>> (CheckpointedIncrAddrPositions.scala:68)
>> 
>> What am I missing?
>> 
>> Thanks
>> Simon
>> 

Reply via email to