Hi Max Thanks for your answer. We have some states, on some keys, which we would like to delete after a certain time. And since there is no option at the moment to put an "expiriece" date on it, I just use the snapshot function to test and verify if the current key is still in some threshold.
So I would like to have an option to perodically check the timestamp, and remove old entries from the state. Therefore I used the KeyedStream to key by a id. Can I use the internal function to do the snapshoting, but use the snapshot function to do some cleanup on the states? Thanks Simon > On 01 Jun 2016, at 16:29, Maximilian Michels <m...@apache.org> wrote: > > Hi Simon, > > You don't need to write any code to checkpoint the Keyed State. It is > done automatically by Flink. Just remove the `snapshoteState(..)` and > `restoreState(..)` methods. > > Cheers, > Max > > On Wed, Jun 1, 2016 at 4:00 PM, simon peyer <simon.pe...@soom-it.ch> wrote: >> Hi Max >> >> I'm using a keyby but would like to store the state. >> >> Thus what's the way to go? >> >> How do I have to handle the state in option 2). >> >> Could you give an example? >> >> Thanks >> --Simon >> >>> On 01 Jun 2016, at 15:55, Maximilian Michels <m...@apache.org> wrote: >>> >>> Hi Simon, >>> >>> There are two types of state: >>> >>> >>> 1) Keyed State >>> >>> The state you access via `getRuntimeContext().getState(..)` is scoped >>> by key. If no key is in the scope, the key is null and update >>> operations won't work. Use a `keyBy(..)` before your map function to >>> partition the state by key. The state is automatically checkpointed by >>> Flink. >>> >>> 2) Operator State >>> >>> This state is kept per parallel instance of the operator. You >>> implement the Checkpointed interface and use the `snapshotState(..)` >>> and `restoreState(..)` methods to checkpoint the state. >>> >>> >>> I think you want to use one of the two. Although it is possible to use >>> both, it looks like you're confusing the two in your example. >>> >>> Cheers, >>> Max >>> >>> On Wed, Jun 1, 2016 at 3:06 PM, simon peyer <simon.pe...@soom-it.ch> wrote: >>>> Hi together >>>> >>>> I did implement a little pipeline, which has some statefull computation: >>>> >>>> Conntaing a function which extends RichFlatMapFunction and Checkpointed. >>>> >>>> The state is stored in the field: >>>> >>>> private var state_item: ValueState[Option[Pathsection]] = null >>>> >>>> >>>> override def open(conf: Configuration):Unit = { >>>> log.info("Open a new Checkpointed FlatMap function with configuration: >>>> {}", conf) >>>> state_item = getRuntimeContext.getState(new ValueStateDescriptor("State >>>> of Pathsection", >>>> (Option(Pathsection)).getClass.asInstanceOf[Class[Option[Pathsection]]], >>>> None)) >>>> } >>>> >>>> >>>> override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): >>>> Option[Pathsection] = { >>>> log.debug("Snapshote State with checkpointId: {} at Timestamp {}", >>>> checkpointId, checkpointTimestamp) >>>> removeOldEntries(checkpointTimestamp) >>>> state_item.value() >>>> } >>>> >>>> override def restoreState(state: Option[Pathsection]):Unit = { >>>> >>>> >>>> >>>> if (state == null){ >>>> log.debug("Restore Snapshot: Null") >>>> } >>>> else if(state == None){ >>>> log.debug("Restore Snapshot: None") >>>> } >>>> else if (state_item == null){ >>>> log.debug("State Item not initialized") >>>> } >>>> else{ >>>> state_item.update(state) >>>> } >>>> } >>>> >>>> But when I do run this computation and get the program to fail, I get the >>>> following error: >>>> >>>> >>>> java.lang.Exception: Could not restore checkpointed state to operators and >>>> functions >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>> at java.lang.Thread.run(Thread.java:745) >>>> Caused by: java.lang.Exception: Failed to restore state to function: No key >>>> available. >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449) >>>> ... 3 more >>>> Caused by: java.lang.RuntimeException: No key available. >>>> at >>>> org.apache.flink.runtime.state.memory.MemValueState.update(MemValueState.java:69) >>>> at ....Function which has the Checkpointed thingy >>>> (CheckpointedIncrAddrPositions.scala:68) >>>> >>>> What am I missing? >>>> >>>> Thanks >>>> Simon >>>> >>