Hi,

At Ericsson, we are implementing something similar to what the
SessionWindowing example does:

There are events belonging to phone calls (sessions), and every event
has a call_id, which tells us which call it belongs to. At the end of
every call, a large event has to be emitted that contains some
aggregated information about the call. Furthermore, the events that
mark the end of the calls don't always reach our system, so the
sessions have to timeout, just like in the example.

Therefore, I have experimented a bit with the SessionWindowing
example, and there is a problem: The trigger policy objects belonging
to already terminated sessions are kept in memory, and also
NotifyOnLastGlobalElement gets called on them. So, the application is
eating up more and more memory, and is also getting slower.

I understand that Flink can't just simply discard all state belonging
to empty groups, as it has no way of knowing whether the user supplied
policy wants to trigger in the future (perhaps based on some state
collected before it first triggered).

Therefore, I propose the following addition to the API:
WindowedDataStream would get a method called something like
dropEmptyGroups, by which the user could tell Flink to automatically
discard all state belonging to a group, when the window becomes empty.

The implementation could look like the following: dropEmptyGroups()
would set a flag, and at the end of StreamDiscretizer.evict, if the
flag is true and bufferSize has just become 0, then this
StreamDiscretizer would be removed from the groupedDiscretizers map of
GroupedStreamDiscretizer. (StreamDiscretizer would need a new field
set at creation to have a reference to the GroupedStreamDiscretizer
that contains it.) (And GroupedStreamDiscretizer.makeNewGroup would
just run again if an element would later appear in a dropped group
(but this won't happen in this example).) What do you think?

Best regards,
Gabor

Reply via email to