Hi, Indeed a good catch, and a valid issue exactly because of the stateful nature of the trigger and eviction policies.
I agree with the suggested approach that this should be configurable for the discretizers (and could be set through the API). As for the default behaviour, I am not 100%. It could be done in a way that empty buffers (triggers and evictions associated with them) don't get the NotifyOnLastGlobalElement call. That would reduce the overhead. Cheers, Gyula On Thu, May 28, 2015 at 3:48 PM, Márton Balassi <balassi.mar...@gmail.com> wrote: > Thanks for debugging this Gabor, indeed a good catch. > > I am not so sure about surfacing it in the API though - it seems very > specific for the session windowing case. I am also wondering whether maybe > this should actually be the default behavior - if there are already empty > windows for a group why not drop the previous states? > > On Thu, May 28, 2015 at 3:01 PM, Gábor Gévay <gga...@gmail.com> wrote: > > > Hi, > > > > At Ericsson, we are implementing something similar to what the > > SessionWindowing example does: > > > > There are events belonging to phone calls (sessions), and every event > > has a call_id, which tells us which call it belongs to. At the end of > > every call, a large event has to be emitted that contains some > > aggregated information about the call. Furthermore, the events that > > mark the end of the calls don't always reach our system, so the > > sessions have to timeout, just like in the example. > > > > Therefore, I have experimented a bit with the SessionWindowing > > example, and there is a problem: The trigger policy objects belonging > > to already terminated sessions are kept in memory, and also > > NotifyOnLastGlobalElement gets called on them. So, the application is > > eating up more and more memory, and is also getting slower. > > > > I understand that Flink can't just simply discard all state belonging > > to empty groups, as it has no way of knowing whether the user supplied > > policy wants to trigger in the future (perhaps based on some state > > collected before it first triggered). > > > > Therefore, I propose the following addition to the API: > > WindowedDataStream would get a method called something like > > dropEmptyGroups, by which the user could tell Flink to automatically > > discard all state belonging to a group, when the window becomes empty. > > > > The implementation could look like the following: dropEmptyGroups() > > would set a flag, and at the end of StreamDiscretizer.evict, if the > > flag is true and bufferSize has just become 0, then this > > StreamDiscretizer would be removed from the groupedDiscretizers map of > > GroupedStreamDiscretizer. (StreamDiscretizer would need a new field > > set at creation to have a reference to the GroupedStreamDiscretizer > > that contains it.) (And GroupedStreamDiscretizer.makeNewGroup would > > just run again if an element would later appear in a dropped group > > (but this won't happen in this example).) What do you think? > > > > Best regards, > > Gabor > > >