I recently noticed something about windows: they retain (in state) every element that they receive regardless of whether the user provides a fold/reduce function. I can tell that such an approach is necessary in order for evictors to work, but I'm not sure if there are other reasons.
I'll describe a use case where this approach is not optimal, and then maybe we can discuss ways to get around it or possible modifications to Flink. My jobs include windows that are wider than the frequency at which we want updates. For example, I might have a window that is one day long, but I might want an updated value to be emitted from that window within (say) one processing-time minute of a new event being assigned to it. I can accomplish that with a trigger that has processing-time delay FIRE as well as event-time FIRE_AND_PURGE. Next, I want to gather those items into a bigger window: perhaps a month or a year wide. My fold function can ensure that multiple events from an upstream window overwrite each other so that they are not counted multiple times. However, as I mentioned, the wide window's state will hold all the events: all the processing-time fires as well as the final event from the upstream FIRE_AND_PURGE. That will make the state bigger than it needs to be. With regard to solutions within the bounds of the existing framework, I am considering using a regular fold() operation instead of a long window. The fold function would be responsible for performing the eviction that the window was previously responsible for. I could implement that as a RichFoldFunction with a ReducingState. The main difference is that there would be no triggering involved (incoming items would immediately result in reduce() emitting a new aggregate). I could also possibly implement my own operator. Are there other/better options I have not considered? Is it desirable to improve support for this use case within Flink? I can imagine that other people may want to get incremental/ongoing results from their windows as data comes in instead of waiting for the watermark to purge the window. In general, they might want better control over the window state. If so, what would the solution look like? Perhaps we could allow users to specify an additional method to the window operator which extracts the identity of any new event, and then Flink would ensure that new events overwrite existing events within the window state, preventing it from growing unnecessarily. Or, perhaps there is a way to do it based on the identity of the window that produces the event? Or, more generally, perhaps we could allow user provided fold/reduce functions to eagerly reduce the state of the window, although that might impact the evictor feature? Thanks for your thoughts, Shannon