Thanks a lot everyone. I have the user data ingested from kafka and it is keyed by userid. There are around 80 parallel flatmap operator instances after keyby and there are around few million users. The map state includes userid as the key and some value. I guess I will try the approach that Aljoscha has mentioned and see how it works.
On Tue, Sep 5, 2017 at 8:17 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > > This is mostly correct, but you cannot register a timer in open() because > we don't have an active key there. Only in process() and onTimer() can you > register a timer. > > In your case, I would suggest to somehow clamp the timestamp to the > nearest 2 minute (or whatever) interval or to keep an extra ValueState that > tells you whether you already registered a timer. > > Best, > Aljoscha > > On 5. Sep 2017, at 16:55, Kien Truong <duckientru...@gmail.com> wrote: > > Hi, > > You can register a processing time timer inside the onTimer and the open > function to have a timer that run periodically. > > Pseudo-code example: > > ValueState<Long> lastRuntime; > > void open() { > ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000); > } > > void onTimer() { > // Run the periodic task > if (lastRuntime.get() + 60000 == timeStamp) { > periodicTask(); > } > // Re-register the processing time timer timer > lastRuntime.setValue(timeStamp); > ctx.timerService().registerProcessingTimeTimer(current.timestamp + 60000); > } > > void periodicTask() > > > For the second question, timer are already scoped by key, so you can keep > a lastModified variable as a ValueState, > then compare it to the timestamp provided by the timer to see if the > current key should be evicted. > Checkout the example on the ProcessFunction page. > > https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/ > process_function.html > > Best regards, > Kien > > On 9/5/2017 11:49 AM, Navneeth Krishnan wrote: > > Hi All, > > I have a streaming pipeline which is keyed by userid and then to a flatmap > function. I need to clear the state after sometime and I was looking at > process function for it. > > Inside the process element function if I register a timer wouldn't it > create a timer for each incoming message? > > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.timestamp + 60000); > > How can I get something like a clean up task that runs every 2 mins and > evicts all stale data? Also is there a way to get the key inside onTimer > function so that I know which key has to be evicted? > > Thanks, > Navneeth > > >