Hi,

Indeed a good catch, and a valid issue exactly because of the stateful
nature of the trigger and eviction policies.

I agree with the suggested approach that this should be configurable for
the discretizers (and could be set through the API).

As for the default behaviour, I am not 100%. It could be done in a way that
empty buffers (triggers and evictions associated with them) don't get the
NotifyOnLastGlobalElement call. That would reduce the overhead.

Cheers,
Gyula

On Thu, May 28, 2015 at 3:48 PM, Márton Balassi <balassi.mar...@gmail.com>
wrote:

> Thanks for debugging this Gabor, indeed a good catch.
>
> I am not so sure about surfacing it in the API though - it seems very
> specific for the session windowing case. I am also wondering whether maybe
> this should actually be the default behavior - if there are already empty
> windows for a group why not drop the previous states?
>
> On Thu, May 28, 2015 at 3:01 PM, Gábor Gévay <gga...@gmail.com> wrote:
>
> > Hi,
> >
> > At Ericsson, we are implementing something similar to what the
> > SessionWindowing example does:
> >
> > There are events belonging to phone calls (sessions), and every event
> > has a call_id, which tells us which call it belongs to. At the end of
> > every call, a large event has to be emitted that contains some
> > aggregated information about the call. Furthermore, the events that
> > mark the end of the calls don't always reach our system, so the
> > sessions have to timeout, just like in the example.
> >
> > Therefore, I have experimented a bit with the SessionWindowing
> > example, and there is a problem: The trigger policy objects belonging
> > to already terminated sessions are kept in memory, and also
> > NotifyOnLastGlobalElement gets called on them. So, the application is
> > eating up more and more memory, and is also getting slower.
> >
> > I understand that Flink can't just simply discard all state belonging
> > to empty groups, as it has no way of knowing whether the user supplied
> > policy wants to trigger in the future (perhaps based on some state
> > collected before it first triggered).
> >
> > Therefore, I propose the following addition to the API:
> > WindowedDataStream would get a method called something like
> > dropEmptyGroups, by which the user could tell Flink to automatically
> > discard all state belonging to a group, when the window becomes empty.
> >
> > The implementation could look like the following: dropEmptyGroups()
> > would set a flag, and at the end of StreamDiscretizer.evict, if the
> > flag is true and bufferSize has just become 0, then this
> > StreamDiscretizer would be removed from the groupedDiscretizers map of
> > GroupedStreamDiscretizer. (StreamDiscretizer would need a new field
> > set at creation to have a reference to the GroupedStreamDiscretizer
> > that contains it.) (And GroupedStreamDiscretizer.makeNewGroup would
> > just run again if an element would later appear in a dropped group
> > (but this won't happen in this example).) What do you think?
> >
> > Best regards,
> > Gabor
> >
>

Reply via email to