Thanks David, this is very helpful. I'm glad that it's not just that I had missed something obvious from the (generally very clear) documentation. I found various features that felt almost right (e.g. the priority queue behind Timers) but nothing that did the job. The temporal state idea does sound a very handy feature to have.
On Thu, 24 Sep 2020, at 08:50, David Anderson wrote: > Steven, > > I'm pretty sure this is a scenario that doesn't have an obvious good > solution. As you have discovered, the window API isn't much help; using a > process function does make sense. The challenge is finding a data structure > to use in keyed state that can be efficiently accessed and updated. > > One option would be to use MapState, where the keys are timestamps (longs) > and the values are lists of the events with the given timestamps (or just a > count of those events, if that's sufficient). If you then use the RocksDB > state backend, you can leverage an implementation detail of that state > backend, which is that you can iterate over the entries in order, sorted by > the key (the serialized, binary key), which in the case of keys that are > longs, will do the right thing. Also, with the RocksDB state backend, you > only have to do ser/de to access and update individual entries -- and not the > entire map. > > It's not exactly pretty to rely on this, and some of us have been giving some > thought to adding a temporal state type to Flink that would make these > scenarios feasible to implement efficiently on all of the state backends, but > for now, this may be the best solution. > > Regards, > David > > On Wed, Sep 23, 2020 at 12:42 PM Steven Murdoch <ste...@lists.murdoch.is> > wrote: >> Hello, >> >> I am trying to do something that seems like it should be quite simple but I >> haven’t found an efficient way to do this with Flink and I expect I’m >> missing something obvious here. >> >> The task is that I would like to process a sequence of events when a certain >> number appear within a keyed event-time window. There will be many keys but >> events within each keyed window will normally be quite sparse. >> >> My first guess was to use Flink’s sliding windowing functionality. However >> my concern is that events are duplicated for each window. I would like to be >> precise about timing so every event would trigger hundreds of copies of an >> event in hundreds of windows, most which are then discarded because there >> are insufficient events. >> >> My next guess was to use a process function, and maintain a queue of events >> as the state. When an event occurred I would add it to the queue and then >> remove any events which fell off the end of my window. I thought ListState >> would help here, but that appears to not allow items to be removed. >> >> I then thought about using a ValueState with some queue data structure. >> However my understanding is that changes to a ValueState result in the >> entire object being copied and so would be quite inefficient and best >> avoided. >> >> Finally I thought about trying to just maintain a series of timers – >> incrementing on an event and decrementing on its expiry. However I then hit >> the problem of timer coalescing. If an event occurs at the same time as its >> predecessor, the timer will not get set so the counter will get incremented >> but never decremented. >> >> What I’m doing seems like it would be a common task but none of the options >> look good, so I feel I’m missing something. Could anyone offer some advice >> on how to handle this case? >> >> Thanks in advance. >> >> Best wishes, >> Steven