I have now created the JIRA: https://issues.apache.org/jira/browse/FLINK-2181
Best regards, Gabor 2015-06-08 0:55 GMT+02:00 Robert Metzger <rmetz...@apache.org>: > What is the status of this issue? > I think we should at least file a JIRA for it to have it around as a TODO. > > On Thu, May 28, 2015 at 10:01 PM, Gábor Gévay <gga...@gmail.com> wrote: > >> > Let's not get all dramatic :D >> >> Ok, sorry :D >> >> > If we don't call any methods on the empty groups we can still keep them >> > off-memory in a persistent storage with a lazy checkpoint/state-access >> > logic with practically 0 memory overhead. >> >> So you mean that whether to call notifyOnLastGlobalElement when the >> window is empty would be a second configuration option? Or this would >> not be configurable? >> >> Best regards, >> Gabor >> >> >> >> 2015-05-28 19:52 GMT+02:00 Gyula Fóra <gyula.f...@gmail.com>: >> > Let's not get all dramatic :D >> > >> > If we don't call any methods on the empty groups we can still keep them >> > off-memory in a persistent storage with a lazy checkpoint/state-access >> > logic with practically 0 memory overhead. >> > >> > Automatically dropping everything will break a lot of programs without >> > people noticing. >> > >> > On Thu, May 28, 2015 at 7:48 PM, Gábor Gévay <gga...@gmail.com> wrote: >> > >> >> Hi, >> >> >> >> I would vote for making the default behaviour to drop all state for >> >> empty groups, and allow a configuration to set the current behaviour >> >> instead. This issue will probably have a paragraph in the >> >> documentation, but if someone overlooks this, then there is potential >> >> for a greater disaster with the current behaviour, then with dropping: >> >> - If someone is expecting to have the states preserved, then he will >> >> probably immediately notice that something is wrong (because his logic >> >> that required the states will totally not work). >> >> - However, if someone is expecting that the states for empty groups >> >> just disappear (or doesn't even think about what happens with empty >> >> groups), then he might only notice the memleak and slowdown later >> >> (probably in production), which will be very annoying to debug at that >> >> point. >> >> >> >> Best regards, >> >> Gabor >> >> >> >> >> >> >> >> 2015-05-28 19:23 GMT+02:00 Gyula Fóra <gyula.f...@gmail.com>: >> >> > Hi, >> >> > >> >> > Indeed a good catch, and a valid issue exactly because of the stateful >> >> > nature of the trigger and eviction policies. >> >> > >> >> > I agree with the suggested approach that this should be configurable >> for >> >> > the discretizers (and could be set through the API). >> >> > >> >> > As for the default behaviour, I am not 100%. It could be done in a way >> >> that >> >> > empty buffers (triggers and evictions associated with them) don't get >> the >> >> > NotifyOnLastGlobalElement call. That would reduce the overhead. >> >> > >> >> > Cheers, >> >> > Gyula >> >> > >> >> > On Thu, May 28, 2015 at 3:48 PM, Márton Balassi < >> >> balassi.mar...@gmail.com> >> >> > wrote: >> >> > >> >> >> Thanks for debugging this Gabor, indeed a good catch. >> >> >> >> >> >> I am not so sure about surfacing it in the API though - it seems very >> >> >> specific for the session windowing case. I am also wondering whether >> >> maybe >> >> >> this should actually be the default behavior - if there are already >> >> empty >> >> >> windows for a group why not drop the previous states? >> >> >> >> >> >> On Thu, May 28, 2015 at 3:01 PM, Gábor Gévay <gga...@gmail.com> >> wrote: >> >> >> >> >> >> > Hi, >> >> >> > >> >> >> > At Ericsson, we are implementing something similar to what the >> >> >> > SessionWindowing example does: >> >> >> > >> >> >> > There are events belonging to phone calls (sessions), and every >> event >> >> >> > has a call_id, which tells us which call it belongs to. At the end >> of >> >> >> > every call, a large event has to be emitted that contains some >> >> >> > aggregated information about the call. Furthermore, the events that >> >> >> > mark the end of the calls don't always reach our system, so the >> >> >> > sessions have to timeout, just like in the example. >> >> >> > >> >> >> > Therefore, I have experimented a bit with the SessionWindowing >> >> >> > example, and there is a problem: The trigger policy objects >> belonging >> >> >> > to already terminated sessions are kept in memory, and also >> >> >> > NotifyOnLastGlobalElement gets called on them. So, the application >> is >> >> >> > eating up more and more memory, and is also getting slower. >> >> >> > >> >> >> > I understand that Flink can't just simply discard all state >> belonging >> >> >> > to empty groups, as it has no way of knowing whether the user >> supplied >> >> >> > policy wants to trigger in the future (perhaps based on some state >> >> >> > collected before it first triggered). >> >> >> > >> >> >> > Therefore, I propose the following addition to the API: >> >> >> > WindowedDataStream would get a method called something like >> >> >> > dropEmptyGroups, by which the user could tell Flink to >> automatically >> >> >> > discard all state belonging to a group, when the window becomes >> empty. >> >> >> > >> >> >> > The implementation could look like the following: dropEmptyGroups() >> >> >> > would set a flag, and at the end of StreamDiscretizer.evict, if the >> >> >> > flag is true and bufferSize has just become 0, then this >> >> >> > StreamDiscretizer would be removed from the groupedDiscretizers >> map of >> >> >> > GroupedStreamDiscretizer. (StreamDiscretizer would need a new field >> >> >> > set at creation to have a reference to the GroupedStreamDiscretizer >> >> >> > that contains it.) (And GroupedStreamDiscretizer.makeNewGroup would >> >> >> > just run again if an element would later appear in a dropped group >> >> >> > (but this won't happen in this example).) What do you think? >> >> >> > >> >> >> > Best regards, >> >> >> > Gabor >> >> >> > >> >> >> >> >> >>