Hi,

> 
> I am currently dealing with an OOM issue caused by continuous queries in 
> Flink SQL (for example, a GROUP BY clause without a window). After looking 
> into Flink's Runtime & Core module, I found that as time goes by, StateTables 
> in the HeapKeyedStateBackend grows larger and larger, because there are too 
> many potential keys to be put into the memory (and GC is taking up most of 
> the CPU time, usually for 20 seconds and more), eventually crashing the job 
> or even the entire JobManager.

did you consider increasing the parallelism to solve the problem? If your 
machine has enough cores and memory, running more JVMs can have the benefit of 
smaller per-JVM heap sizes. I think also that GC tuning and future progress in 
GC algorithms like Shenandoah can potentially bring improvements for this 
problem.

> 
> Therefore, I am now thinking of implementing a "spillable" StateTable that 
> could spill the keys together with values into disk (or StateBackends like 
> RocksDB) when heap memory is going to deplete. And also when memory pressure 
> alleviates, the entries could be put back. Therefore, contrary to using 
> RocksDB alone, Flink's throughput would not be affected if there is no need 
> for a "spill" (when heap memory is enough
> 
> I am not sure if the Flink community has some plans in dealing with the OOM 
> issue caused by large number of keys & states in the heap (except for using 
> RocksDB alone because the throughput would be rather slow when heap memory is 
> enough), and whether my plan has any serious flaws, making it not worth 
> doing, like making checkpointing process harder to be consistent, or greatly 
> reducing the throughput because of the additional lookup cost, etc.?
> 

We were also thinking about this, but the feature was never important/requested 
enough to make it on the shortlist for a release. I think it is not just as 
easy as „spilling“ makes it sound. For example, how does the spilling deal with 
updates? Eventually you might need to write them back to disk and you should 
have a strategy that will not kill you with random I/O. Probably you might end 
up implementing something similar to RocksDB, which is why I could see an 
argument for rather wrapping the RocksDB backend with an in-memory cache than 
enhancing the heap backend with spilling. You also need to consider the 
use-case and key-distribution pattern. Does it have hot/cold keys? Or maybe not 
an a lot of keys competing for a small cache? What cache replacement strategy 
makes sense, maybe some LRU-K or Clock?

Another aspect is also when and how to sync between the cache/RocksDB or 
heap/spilling partition (e.g. write-though vs write-back), so that the 
checkpoint is consistent and at the same time, we don’t have a blocking 
behaviour when a checkpoint starts. E.g. write-through will still keep a 
serializer overhead in your main processing loop or some background thread 
(that might lead to some latency/blocking if it cannot keep up), write-back 
might stall checkpoints because changes need to be published first.

So, yes there are ideas floating around, but it is not as trivial as it might 
look at first and it never made it into an important-enough feature because the 
alternatives (scaling out, Rocks) seem good enough for most people.

Best,
Stefan

Reply via email to