Hi together

I did implement a little pipeline, which has some statefull computation:

Conntaing a function which extends RichFlatMapFunction and Checkpointed.

The state is stored in the field:

  private var state_item: ValueState[Option[Pathsection]] = null


  override def open(conf: Configuration):Unit = {
    log.info("Open a new Checkpointed FlatMap function with configuration: {}", 
conf)
    state_item = getRuntimeContext.getState(new ValueStateDescriptor("State of 
Pathsection", 
(Option(Pathsection)).getClass.asInstanceOf[Class[Option[Pathsection]]], None))
  }


  override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): 
Option[Pathsection] = {
    log.debug("Snapshote State with checkpointId: {} at Timestamp {}", 
checkpointId, checkpointTimestamp)
    removeOldEntries(checkpointTimestamp)
    state_item.value()
  }

  override def restoreState(state: Option[Pathsection]):Unit = {
    
    if (state == null){
      log.debug("Restore Snapshot: Null")
    }
    else if(state == None){
      log.debug("Restore Snapshot: None")
    }    
    else if (state_item == null){
      log.debug("State Item not initialized")
    }
    else{
      state_item.update(state)
    }
  }

But when I do run this computation and get the program to fail, I get the 
following error:


java.lang.Exception: Could not restore checkpointed state to operators and 
functions
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Failed to restore state to function: No key 
available.
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449)
        ... 3 more
Caused by: java.lang.RuntimeException: No key available.
        at 
org.apache.flink.runtime.state.memory.MemValueState.update(MemValueState.java:69)
        at ....Function which has the Checkpointed thingy 
(CheckpointedIncrAddrPositions.scala:68)

What am I missing?

Thanks 
Simon

Reply via email to