Thanks David, this is very helpful. I'm glad that it's not just that I had 
missed something obvious from the (generally very clear) documentation. I found 
various features that felt almost right (e.g. the priority queue behind Timers) 
but nothing that did the job. The temporal state idea does sound a very handy 
feature to have.

On Thu, 24 Sep 2020, at 08:50, David Anderson wrote:
> Steven,
> 
> I'm pretty sure this is a scenario that doesn't have an obvious good 
> solution. As you have discovered, the window API isn't much help; using a 
> process function does make sense. The challenge is finding a data structure 
> to use in keyed state that can be efficiently accessed and updated.
> 
> One option would be to use MapState, where the keys are timestamps (longs) 
> and the values are lists of the events with the given timestamps (or just a 
> count of those events, if that's sufficient). If you then use the RocksDB 
> state backend, you can leverage an implementation detail of that state 
> backend, which is that you can iterate over the entries in order, sorted by 
> the key (the serialized, binary key), which in the case of keys that are 
> longs, will do the right thing. Also, with the RocksDB state backend, you 
> only have to do ser/de to access and update individual entries -- and not the 
> entire map.
> 
> It's not exactly pretty to rely on this, and some of us have been giving some 
> thought to adding a temporal state type to Flink that would make these 
> scenarios feasible to implement efficiently on all of the state backends, but 
> for now, this may be the best solution.
> 
> Regards,
> David
> 
> On Wed, Sep 23, 2020 at 12:42 PM Steven Murdoch <ste...@lists.murdoch.is> 
> wrote:
>> Hello,
>> 
>> I am trying to do something that seems like it should be quite simple but I 
>> haven’t found an efficient way to do this with Flink and I expect I’m 
>> missing something obvious here. 
>> 
>> The task is that I would like to process a sequence of events when a certain 
>> number appear within a keyed event-time window. There will be many keys but 
>> events within each keyed window will normally be quite sparse. 
>> 
>> My first guess was to use Flink’s sliding windowing functionality. However 
>> my concern is that events are duplicated for each window. I would like to be 
>> precise about timing so every event would trigger hundreds of copies of an 
>> event in hundreds of windows, most which are then discarded because there 
>> are insufficient events. 
>> 
>> My next guess was to use a process function, and maintain a queue of events 
>> as the state. When an event occurred I would add it to the queue and then 
>> remove any events which fell off the end of my window. I thought ListState 
>> would help here, but that appears to not allow items to be removed.
>> 
>> I then thought about using a ValueState with some queue data structure. 
>> However my understanding is that changes to a ValueState result in the 
>> entire object being copied and so would be quite inefficient and best 
>> avoided. 
>> 
>> Finally I thought about trying to just maintain a series of timers – 
>> incrementing on an event and decrementing on its expiry. However I then hit 
>> the problem of timer coalescing. If an event occurs at the same time as its 
>> predecessor, the timer will not get set so the counter will get incremented 
>> but never decremented. 
>> 
>> What I’m doing seems like it would be a common task but none of the options 
>> look good, so I feel I’m missing something. Could anyone offer some advice 
>> on how to handle this case?
>> 
>> Thanks in advance. 
>> 
>> Best wishes,
>> Steven

Reply via email to