Hi, I don’t think that this is a bug, but rather a necessity that comes with the (imo questionable) design of allowing lazy state registration. In this design, just because a state is *currently* not registered does not mean that you can simply drop it. Imagine that your code did *not yet* re-register a state, but could still do so in the future. If a checkpoint/recovery happens in between, all data for that state would suddenly be lost, just because by chance the state was not registered „fast enough“. As I see it, the proper way is the register the state under the same name and clear it if you want to get rid of the data. There is currently no call that explicitly drops a state that was once declared, and you might make a case that this is a feature to have for the future. Then again, we need a general decision about lazy and eager state IMO.
Best, Stefan > Am 22.02.2018 um 11:10 schrieb Gyula Fóra <gyula.f...@gmail.com>: > > Hi all, > > We have discovered a fairly serious memory leak > in DefaultOperatorStateBackend, with broadcast (union) list states. > > The problem seems to occur when a broadcast state name is changed, in order > to drop some state (intentionally). > > Flink does not drop the "garbage" broadcast state, and keeps snapshotting, > broadcasting, multiplying it exponentially at every savepoint/restore cycle. > > With high enough parallelism this can easily lead to small states (few > bytes) growing to several gigs and more over a few restarts eventually > leading to a very bad crash restart cycle where TMs OOM in a few secs. > > Basically 2 things seems to be missing, garbage collection of unreferenced > operator states (they are eagerly restored into memory). And probably lazy > restore would also be nice :) > > We run Flink 1.4.0 but 1.4.1 seems to be affected as well, haven't checked > the latest master. > > Could someone please confirm that this behaviour is not as intended? > > Cheers, > Gyula