Hi, At Ericsson, we are implementing something similar to what the SessionWindowing example does:
There are events belonging to phone calls (sessions), and every event has a call_id, which tells us which call it belongs to. At the end of every call, a large event has to be emitted that contains some aggregated information about the call. Furthermore, the events that mark the end of the calls don't always reach our system, so the sessions have to timeout, just like in the example. Therefore, I have experimented a bit with the SessionWindowing example, and there is a problem: The trigger policy objects belonging to already terminated sessions are kept in memory, and also NotifyOnLastGlobalElement gets called on them. So, the application is eating up more and more memory, and is also getting slower. I understand that Flink can't just simply discard all state belonging to empty groups, as it has no way of knowing whether the user supplied policy wants to trigger in the future (perhaps based on some state collected before it first triggered). Therefore, I propose the following addition to the API: WindowedDataStream would get a method called something like dropEmptyGroups, by which the user could tell Flink to automatically discard all state belonging to a group, when the window becomes empty. The implementation could look like the following: dropEmptyGroups() would set a flag, and at the end of StreamDiscretizer.evict, if the flag is true and bufferSize has just become 0, then this StreamDiscretizer would be removed from the groupedDiscretizers map of GroupedStreamDiscretizer. (StreamDiscretizer would need a new field set at creation to have a reference to the GroupedStreamDiscretizer that contains it.) (And GroupedStreamDiscretizer.makeNewGroup would just run again if an element would later appear in a dropped group (but this won't happen in this example).) What do you think? Best regards, Gabor