One more attempt to get some feedback on this. It basically boils down to using High-Level Window API in scenarios where keys are unbounded / infinite but can be expired after certain time. From what we have observed (solution 2 below), some properties of keys are still in state (guessing key itself and watermarks etc). Is there any way to clean these up as FIRE_AND_PURGE trigger doesn’t seem to do it? I am of an option that even if we end up using HDFS or RocksDB backed State, we would think we would still want to clean those up. Any suggestions on this before we start re-writing our apps to start using Low-Level Process APIs in general?
Thanks, Ashish > On Jul 2, 2018, at 10:47 AM, ashish pok <ashish...@yahoo.com> wrote: > > All, > > I have been doing a little digging on this and to Stefan's earlier point > incrementing memory (not necessarily leak) was essentially because of keys > that were incrementing as I was using time buckets concatenated with actual > key to make unique sessions. > > Taking a couple of steps back, use case is very simple tumbling window of 15 > mins by keys. Stream can be viewed simply as: > > <timestamp>|<key>|<value> > > We have a few of these type of pipelines and one catch here is we wanted to > create an app which can process historical and current data. HIstorical data > is mainly because users adhoc request for "backfill". In order to easily > manage processing pipeline, we are making no distinction between historical > and current data as processing is based on event time. > > 1) Of course, easiest way to solve this problem is to create TumblingWindow > of 15mins with some allowed lateness. One issue here was watermarks are moved > forward and backfill data appeared to be viewed as late arrival data, which > is a correct behavior from Flink perspective but seems to be causing issues > in how we are trying to handle streams. > > 2) Another issue is our data collectors are highly distributed - we regularly > get data from later event time buckets faster than older buckets. Also, it is > also more consistent to actually create 15min buckets using concept of > Session instead. So I am creating a key with <timestamp_floor_15mins>|<key> > and a session gap of say 10 mins. This works perfectly from business logic > perspective. However, now I am introducing quite a lot of keys which based on > my heap dumps seem to be hanging around causing memory issues. > > 3) We converted the apps to a Process function and manage all states using > key defined in step (2) and registering/unregistering timeouts. > > Solution (3) seems to be working pretty stable from memory perspective. > However, it just feels like with so much high-level APIs available, we are > not using them properly and keep reverting back to low level Process APIs - > in the last month we have migrated about 5 or 6 apps to Process now :) > > For solution (2) it feels like any other Session aggregation use case will > have the issue of keys hanging around (eg: for click streams with user > sessions etc). Isn't there a way to clear those session windows? Sorry, I > just feel like we are missing something simple and have been reverting to low > level APIs instead. > > Thanks, Ashish