Hi,

I would vote for making the default behaviour to drop all state for
empty groups, and allow a configuration to set the current behaviour
instead. This issue will probably have a paragraph in the
documentation, but if someone overlooks this, then there is potential
for a greater disaster with the current behaviour, then with dropping:
- If someone is expecting to have the states preserved, then he will
probably immediately notice that something is wrong (because his logic
that required the states will totally not work).
- However, if someone is expecting that the states for empty groups
just disappear (or doesn't even think about what happens with empty
groups), then he might only notice the memleak and slowdown later
(probably in production), which will be very annoying to debug at that
point.

Best regards,
Gabor



2015-05-28 19:23 GMT+02:00 Gyula Fóra <gyula.f...@gmail.com>:
> Hi,
>
> Indeed a good catch, and a valid issue exactly because of the stateful
> nature of the trigger and eviction policies.
>
> I agree with the suggested approach that this should be configurable for
> the discretizers (and could be set through the API).
>
> As for the default behaviour, I am not 100%. It could be done in a way that
> empty buffers (triggers and evictions associated with them) don't get the
> NotifyOnLastGlobalElement call. That would reduce the overhead.
>
> Cheers,
> Gyula
>
> On Thu, May 28, 2015 at 3:48 PM, Márton Balassi <balassi.mar...@gmail.com>
> wrote:
>
>> Thanks for debugging this Gabor, indeed a good catch.
>>
>> I am not so sure about surfacing it in the API though - it seems very
>> specific for the session windowing case. I am also wondering whether maybe
>> this should actually be the default behavior - if there are already empty
>> windows for a group why not drop the previous states?
>>
>> On Thu, May 28, 2015 at 3:01 PM, Gábor Gévay <gga...@gmail.com> wrote:
>>
>> > Hi,
>> >
>> > At Ericsson, we are implementing something similar to what the
>> > SessionWindowing example does:
>> >
>> > There are events belonging to phone calls (sessions), and every event
>> > has a call_id, which tells us which call it belongs to. At the end of
>> > every call, a large event has to be emitted that contains some
>> > aggregated information about the call. Furthermore, the events that
>> > mark the end of the calls don't always reach our system, so the
>> > sessions have to timeout, just like in the example.
>> >
>> > Therefore, I have experimented a bit with the SessionWindowing
>> > example, and there is a problem: The trigger policy objects belonging
>> > to already terminated sessions are kept in memory, and also
>> > NotifyOnLastGlobalElement gets called on them. So, the application is
>> > eating up more and more memory, and is also getting slower.
>> >
>> > I understand that Flink can't just simply discard all state belonging
>> > to empty groups, as it has no way of knowing whether the user supplied
>> > policy wants to trigger in the future (perhaps based on some state
>> > collected before it first triggered).
>> >
>> > Therefore, I propose the following addition to the API:
>> > WindowedDataStream would get a method called something like
>> > dropEmptyGroups, by which the user could tell Flink to automatically
>> > discard all state belonging to a group, when the window becomes empty.
>> >
>> > The implementation could look like the following: dropEmptyGroups()
>> > would set a flag, and at the end of StreamDiscretizer.evict, if the
>> > flag is true and bufferSize has just become 0, then this
>> > StreamDiscretizer would be removed from the groupedDiscretizers map of
>> > GroupedStreamDiscretizer. (StreamDiscretizer would need a new field
>> > set at creation to have a reference to the GroupedStreamDiscretizer
>> > that contains it.) (And GroupedStreamDiscretizer.makeNewGroup would
>> > just run again if an element would later appear in a dropped group
>> > (but this won't happen in this example).) What do you think?
>> >
>> > Best regards,
>> > Gabor
>> >
>>

Reply via email to