Hi, I would vote for making the default behaviour to drop all state for empty groups, and allow a configuration to set the current behaviour instead. This issue will probably have a paragraph in the documentation, but if someone overlooks this, then there is potential for a greater disaster with the current behaviour, then with dropping: - If someone is expecting to have the states preserved, then he will probably immediately notice that something is wrong (because his logic that required the states will totally not work). - However, if someone is expecting that the states for empty groups just disappear (or doesn't even think about what happens with empty groups), then he might only notice the memleak and slowdown later (probably in production), which will be very annoying to debug at that point.
Best regards, Gabor 2015-05-28 19:23 GMT+02:00 Gyula Fóra <gyula.f...@gmail.com>: > Hi, > > Indeed a good catch, and a valid issue exactly because of the stateful > nature of the trigger and eviction policies. > > I agree with the suggested approach that this should be configurable for > the discretizers (and could be set through the API). > > As for the default behaviour, I am not 100%. It could be done in a way that > empty buffers (triggers and evictions associated with them) don't get the > NotifyOnLastGlobalElement call. That would reduce the overhead. > > Cheers, > Gyula > > On Thu, May 28, 2015 at 3:48 PM, Márton Balassi <balassi.mar...@gmail.com> > wrote: > >> Thanks for debugging this Gabor, indeed a good catch. >> >> I am not so sure about surfacing it in the API though - it seems very >> specific for the session windowing case. I am also wondering whether maybe >> this should actually be the default behavior - if there are already empty >> windows for a group why not drop the previous states? >> >> On Thu, May 28, 2015 at 3:01 PM, Gábor Gévay <gga...@gmail.com> wrote: >> >> > Hi, >> > >> > At Ericsson, we are implementing something similar to what the >> > SessionWindowing example does: >> > >> > There are events belonging to phone calls (sessions), and every event >> > has a call_id, which tells us which call it belongs to. At the end of >> > every call, a large event has to be emitted that contains some >> > aggregated information about the call. Furthermore, the events that >> > mark the end of the calls don't always reach our system, so the >> > sessions have to timeout, just like in the example. >> > >> > Therefore, I have experimented a bit with the SessionWindowing >> > example, and there is a problem: The trigger policy objects belonging >> > to already terminated sessions are kept in memory, and also >> > NotifyOnLastGlobalElement gets called on them. So, the application is >> > eating up more and more memory, and is also getting slower. >> > >> > I understand that Flink can't just simply discard all state belonging >> > to empty groups, as it has no way of knowing whether the user supplied >> > policy wants to trigger in the future (perhaps based on some state >> > collected before it first triggered). >> > >> > Therefore, I propose the following addition to the API: >> > WindowedDataStream would get a method called something like >> > dropEmptyGroups, by which the user could tell Flink to automatically >> > discard all state belonging to a group, when the window becomes empty. >> > >> > The implementation could look like the following: dropEmptyGroups() >> > would set a flag, and at the end of StreamDiscretizer.evict, if the >> > flag is true and bufferSize has just become 0, then this >> > StreamDiscretizer would be removed from the groupedDiscretizers map of >> > GroupedStreamDiscretizer. (StreamDiscretizer would need a new field >> > set at creation to have a reference to the GroupedStreamDiscretizer >> > that contains it.) (And GroupedStreamDiscretizer.makeNewGroup would >> > just run again if an element would later appear in a dropped group >> > (but this won't happen in this example).) What do you think? >> > >> > Best regards, >> > Gabor >> > >>