Hi In other words, what's the easiest way to clean up states in flink, if this key may never arrive again?
--Thanks Simon > On 02 Jun 2016, at 10:16, simon peyer <simon.pe...@soom-it.ch> wrote: > > Hi Max > > Thanks for your answer. > We have some states, on some keys, which we would like to delete after a > certain time. > And since there is no option at the moment to put an "expiriece" date on it, > I just use the snapshot function to test and verify if the current key is > still in some threshold. > > So I would like to have an option to perodically check the timestamp, and > remove old entries from the state. > > Therefore I used the KeyedStream to key by a id. > Can I use the internal function to do the snapshoting, but use the snapshot > function to do some cleanup on the states? > > > Thanks > Simon > > >> On 01 Jun 2016, at 16:29, Maximilian Michels <m...@apache.org> wrote: >> >> Hi Simon, >> >> You don't need to write any code to checkpoint the Keyed State. It is >> done automatically by Flink. Just remove the `snapshoteState(..)` and >> `restoreState(..)` methods. >> >> Cheers, >> Max >> >> On Wed, Jun 1, 2016 at 4:00 PM, simon peyer <simon.pe...@soom-it.ch> wrote: >>> Hi Max >>> >>> I'm using a keyby but would like to store the state. >>> >>> Thus what's the way to go? >>> >>> How do I have to handle the state in option 2). >>> >>> Could you give an example? >>> >>> Thanks >>> --Simon >>> >>>> On 01 Jun 2016, at 15:55, Maximilian Michels <m...@apache.org> wrote: >>>> >>>> Hi Simon, >>>> >>>> There are two types of state: >>>> >>>> >>>> 1) Keyed State >>>> >>>> The state you access via `getRuntimeContext().getState(..)` is scoped >>>> by key. If no key is in the scope, the key is null and update >>>> operations won't work. Use a `keyBy(..)` before your map function to >>>> partition the state by key. The state is automatically checkpointed by >>>> Flink. >>>> >>>> 2) Operator State >>>> >>>> This state is kept per parallel instance of the operator. You >>>> implement the Checkpointed interface and use the `snapshotState(..)` >>>> and `restoreState(..)` methods to checkpoint the state. >>>> >>>> >>>> I think you want to use one of the two. Although it is possible to use >>>> both, it looks like you're confusing the two in your example. >>>> >>>> Cheers, >>>> Max >>>> >>>> On Wed, Jun 1, 2016 at 3:06 PM, simon peyer <simon.pe...@soom-it.ch> wrote: >>>>> Hi together >>>>> >>>>> I did implement a little pipeline, which has some statefull computation: >>>>> >>>>> Conntaing a function which extends RichFlatMapFunction and Checkpointed. >>>>> >>>>> The state is stored in the field: >>>>> >>>>> private var state_item: ValueState[Option[Pathsection]] = null >>>>> >>>>> >>>>> override def open(conf: Configuration):Unit = { >>>>> log.info("Open a new Checkpointed FlatMap function with configuration: >>>>> {}", conf) >>>>> state_item = getRuntimeContext.getState(new ValueStateDescriptor("State >>>>> of Pathsection", >>>>> (Option(Pathsection)).getClass.asInstanceOf[Class[Option[Pathsection]]], >>>>> None)) >>>>> } >>>>> >>>>> >>>>> override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): >>>>> Option[Pathsection] = { >>>>> log.debug("Snapshote State with checkpointId: {} at Timestamp {}", >>>>> checkpointId, checkpointTimestamp) >>>>> removeOldEntries(checkpointTimestamp) >>>>> state_item.value() >>>>> } >>>>> >>>>> override def restoreState(state: Option[Pathsection]):Unit = { >>>>> >>>>> >>>>> >>>>> if (state == null){ >>>>> log.debug("Restore Snapshot: Null") >>>>> } >>>>> else if(state == None){ >>>>> log.debug("Restore Snapshot: None") >>>>> } >>>>> else if (state_item == null){ >>>>> log.debug("State Item not initialized") >>>>> } >>>>> else{ >>>>> state_item.update(state) >>>>> } >>>>> } >>>>> >>>>> But when I do run this computation and get the program to fail, I get the >>>>> following error: >>>>> >>>>> >>>>> java.lang.Exception: Could not restore checkpointed state to operators and >>>>> functions >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> Caused by: java.lang.Exception: Failed to restore state to function: No >>>>> key >>>>> available. >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449) >>>>> ... 3 more >>>>> Caused by: java.lang.RuntimeException: No key available. >>>>> at >>>>> org.apache.flink.runtime.state.memory.MemValueState.update(MemValueState.java:69) >>>>> at ....Function which has the Checkpointed thingy >>>>> (CheckpointedIncrAddrPositions.scala:68) >>>>> >>>>> What am I missing? >>>>> >>>>> Thanks >>>>> Simon >>>>> >>> >