Let's not get all dramatic :D If we don't call any methods on the empty groups we can still keep them off-memory in a persistent storage with a lazy checkpoint/state-access logic with practically 0 memory overhead.
Automatically dropping everything will break a lot of programs without people noticing. On Thu, May 28, 2015 at 7:48 PM, Gábor Gévay <gga...@gmail.com> wrote: > Hi, > > I would vote for making the default behaviour to drop all state for > empty groups, and allow a configuration to set the current behaviour > instead. This issue will probably have a paragraph in the > documentation, but if someone overlooks this, then there is potential > for a greater disaster with the current behaviour, then with dropping: > - If someone is expecting to have the states preserved, then he will > probably immediately notice that something is wrong (because his logic > that required the states will totally not work). > - However, if someone is expecting that the states for empty groups > just disappear (or doesn't even think about what happens with empty > groups), then he might only notice the memleak and slowdown later > (probably in production), which will be very annoying to debug at that > point. > > Best regards, > Gabor > > > > 2015-05-28 19:23 GMT+02:00 Gyula Fóra <gyula.f...@gmail.com>: > > Hi, > > > > Indeed a good catch, and a valid issue exactly because of the stateful > > nature of the trigger and eviction policies. > > > > I agree with the suggested approach that this should be configurable for > > the discretizers (and could be set through the API). > > > > As for the default behaviour, I am not 100%. It could be done in a way > that > > empty buffers (triggers and evictions associated with them) don't get the > > NotifyOnLastGlobalElement call. That would reduce the overhead. > > > > Cheers, > > Gyula > > > > On Thu, May 28, 2015 at 3:48 PM, Márton Balassi < > balassi.mar...@gmail.com> > > wrote: > > > >> Thanks for debugging this Gabor, indeed a good catch. > >> > >> I am not so sure about surfacing it in the API though - it seems very > >> specific for the session windowing case. I am also wondering whether > maybe > >> this should actually be the default behavior - if there are already > empty > >> windows for a group why not drop the previous states? > >> > >> On Thu, May 28, 2015 at 3:01 PM, Gábor Gévay <gga...@gmail.com> wrote: > >> > >> > Hi, > >> > > >> > At Ericsson, we are implementing something similar to what the > >> > SessionWindowing example does: > >> > > >> > There are events belonging to phone calls (sessions), and every event > >> > has a call_id, which tells us which call it belongs to. At the end of > >> > every call, a large event has to be emitted that contains some > >> > aggregated information about the call. Furthermore, the events that > >> > mark the end of the calls don't always reach our system, so the > >> > sessions have to timeout, just like in the example. > >> > > >> > Therefore, I have experimented a bit with the SessionWindowing > >> > example, and there is a problem: The trigger policy objects belonging > >> > to already terminated sessions are kept in memory, and also > >> > NotifyOnLastGlobalElement gets called on them. So, the application is > >> > eating up more and more memory, and is also getting slower. > >> > > >> > I understand that Flink can't just simply discard all state belonging > >> > to empty groups, as it has no way of knowing whether the user supplied > >> > policy wants to trigger in the future (perhaps based on some state > >> > collected before it first triggered). > >> > > >> > Therefore, I propose the following addition to the API: > >> > WindowedDataStream would get a method called something like > >> > dropEmptyGroups, by which the user could tell Flink to automatically > >> > discard all state belonging to a group, when the window becomes empty. > >> > > >> > The implementation could look like the following: dropEmptyGroups() > >> > would set a flag, and at the end of StreamDiscretizer.evict, if the > >> > flag is true and bufferSize has just become 0, then this > >> > StreamDiscretizer would be removed from the groupedDiscretizers map of > >> > GroupedStreamDiscretizer. (StreamDiscretizer would need a new field > >> > set at creation to have a reference to the GroupedStreamDiscretizer > >> > that contains it.) (And GroupedStreamDiscretizer.makeNewGroup would > >> > just run again if an element would later appear in a dropped group > >> > (but this won't happen in this example).) What do you think? > >> > > >> > Best regards, > >> > Gabor > >> > > >> >