Hi,

for most windows, all state is cleared through FIRE_AND_PURGE, except for 
windows that are subtypes of merging windows, such as session windows. Here, 
the state still remembers the window itself until the watermark passes the 
session timeout+allowed lateness. This is done so that elements that fall into 
the window after firing can still resurrect the window’s information, see 
WindowOperator.clearAllState(). Only after that, all state from the session 
window is removed. Looking in Aljoscha, who might have more ideas about the 
best ways to implement your use case.

Best,
Stefan

> Am 22.07.2018 um 18:19 schrieb Ashish Pokharel <ashish...@yahoo.com>:
> 
> One more attempt to get some feedback on this. It basically boils down to 
> using High-Level Window API in scenarios where keys are unbounded / infinite 
> but can be expired after certain time. From what we have observed (solution 2 
> below), some properties of keys are still in state (guessing key itself and 
> watermarks etc). Is there any way to clean these up as FIRE_AND_PURGE trigger 
> doesn’t seem to do it? I am of an option that even if we end up using HDFS or 
> RocksDB backed State, we would think we would still want to clean those up. 
> Any suggestions on this before we start re-writing our apps to start using 
> Low-Level Process APIs in general? 
> 
> Thanks, Ashish
> 
>> On Jul 2, 2018, at 10:47 AM, ashish pok <ashish...@yahoo.com 
>> <mailto:ashish...@yahoo.com>> wrote:
>> 
>> All,
>> 
>> I have been doing a little digging on this and to Stefan's earlier point 
>> incrementing memory (not necessarily leak) was essentially because of keys 
>> that were incrementing as I was using time buckets concatenated with actual 
>> key to make unique sessions.
>> 
>> Taking a couple of steps back, use case is very simple tumbling window of 15 
>> mins by keys. Stream can be viewed simply as:
>> 
>>     <timestamp>|<key>|<value>
>> 
>> We have a few of these type of pipelines and one catch here is we wanted to 
>> create an app which can process historical and current data. HIstorical data 
>> is mainly because users adhoc request for "backfill". In order to easily 
>> manage processing pipeline, we are making no distinction between historical 
>> and current data as processing is based on event time. 
>> 
>> 1) Of course, easiest way to solve this problem is to create TumblingWindow 
>> of 15mins with some allowed lateness. One issue here was watermarks are 
>> moved forward and backfill data appeared to be viewed as late arrival data, 
>> which is a correct behavior from Flink perspective but seems to be causing 
>> issues in how we are trying to handle streams.
>> 
>> 2) Another issue is our data collectors are highly distributed - we 
>> regularly get data from later event time buckets faster than older buckets. 
>> Also, it is also more consistent to actually create 15min buckets using 
>> concept of Session instead. So I am creating a key with 
>> <timestamp_floor_15mins>|<key> and a session gap of say 10 mins. This works 
>> perfectly from business logic perspective. However, now I am introducing 
>> quite a lot of keys which based on my heap dumps seem to be hanging around 
>> causing memory issues.
>> 
>> 3) We converted the apps to a Process function and manage all states using 
>> key defined in step (2) and registering/unregistering timeouts. 
>> 
>> Solution (3) seems to be working pretty stable from memory perspective. 
>> However, it just feels like with so much high-level APIs available, we are 
>> not using them properly and keep reverting back to low level Process APIs - 
>> in the last month we have migrated about 5 or 6 apps to Process now :) 
>> 
>> For solution (2) it feels like any other Session aggregation use case will 
>> have the issue of keys hanging around (eg: for click streams with user 
>> sessions etc). Isn't there a way to clear those session windows? Sorry, I 
>> just feel like we are missing something simple and have been reverting to 
>> low level APIs instead.
>> 
>> Thanks, Ashish
> 

Reply via email to