Right now, I don’t think there is a way of doing that. I don’t think there is 
something fundament against having a method that drops a state complete, data 
and registered meta data. But so far that never existed and it seems nobody 
ever needed it (or asked for it at least). The closest thing for keyed state is 
calling the „clear()“ method, but the meta data will stick.

> Am 22.02.2018 um 12:06 schrieb Gyula Fóra <gyula.f...@gmail.com>:
> 
> Do you have any suggestion how to completely delete an operator and keyed
> state?
> For operator state this seems to be easy enough, but what about completely
> dropping a keyed state?
> 
> Gyula
> 
> Stefan Richter <s.rich...@data-artisans.com> ezt írta (időpont: 2018. febr.
> 22., Cs, 11:46):
> 
>> 
>> Hi,
>> 
>> I don’t think that this is a bug, but rather a necessity that comes with
>> the (imo questionable) design of allowing lazy state registration. In this
>> design, just because a state is *currently* not registered does not mean
>> that you can simply drop it. Imagine that your code did *not yet*
>> re-register a state, but could still do so in the future. If a
>> checkpoint/recovery happens in between, all data for that state would
>> suddenly be lost, just because by chance the state was not registered „fast
>> enough“. As I see it, the proper way is the register the state under the
>> same name and clear it if you want to get rid of the data. There is
>> currently no call that explicitly drops a state that was once declared, and
>> you might make a case that this is a feature to have for the future. Then
>> again, we need a general decision about lazy and eager state IMO.
>> 
>> Best,
>> Stefan
>> 
>>> Am 22.02.2018 um 11:10 schrieb Gyula Fóra <gyula.f...@gmail.com>:
>>> 
>>> Hi all,
>>> 
>>> We have discovered a fairly serious memory leak
>>> in DefaultOperatorStateBackend, with broadcast (union) list states.
>>> 
>>> The problem seems to occur when a broadcast state name is changed, in
>> order
>>> to drop some state (intentionally).
>>> 
>>> Flink does not drop the "garbage" broadcast state, and keeps
>> snapshotting,
>>> broadcasting, multiplying it exponentially at every savepoint/restore
>> cycle.
>>> 
>>> With high enough parallelism this can easily lead to small states (few
>>> bytes) growing to several gigs and more over a few restarts eventually
>>> leading to a very bad crash restart cycle where TMs OOM in a few secs.
>>> 
>>> Basically 2 things seems to be missing, garbage collection of
>> unreferenced
>>> operator states (they are eagerly restored into memory). And probably
>> lazy
>>> restore would also be nice :)
>>> 
>>> We run Flink 1.4.0 but 1.4.1 seems to be affected as well, haven't
>> checked
>>> the latest master.
>>> 
>>> Could someone please confirm that this behaviour is not as intended?
>>> 
>>> Cheers,
>>> Gyula
>> 
>> 

Reply via email to