Hi all, I'm trying to use the State Processor API to extract all keys from a RocksDB savepoint produced by an operator in a Flink streaming job into CSV files.
The problem is that the storage size of the savepoint is 30TB and I'm running into garbage collection issues no matter how much memory in different proportions or CPU cores I allocate to task managers. (I tried allocating up to 120GB and 16 cores to each task). The same program and hardware configuration works with no problems for a smaller savepoint (300GB), it's some sort of a scalability issue here. At the beginning the tasks spend a couple hours in what I call "the download phase". During that phase heap usage as indicated by metrics and Flink UI is at about 10% and everything is going great. But at certain point heap usage for tasks coming out of the download phase starts to go up, climbs up to about 87% usage as indicated in Flink UI and by the "tm.Status.JVM.Memory.Heap.Used" metric. At that point the heap usage metric doesn't increase anymore and JVM starts spending a lot of time collecting garbage and keeping all CPUs 100% loaded. After some time in this mode the job crashes with "java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_1614821414188_0002_01_000035 timed out." At all times the indicated managed memory usage is 0%. Which seems suspicious since RocksDB is supposed to be using it? Also, judging by the lack of an application metric I have in the state processor operator, KeyedStateReaderFunction.readKey never gets called. I would appreciate if somebody helped answering some of my questions or suggested a way I could further diagnose/fix this: 1. Is it normal that this overwhelming garbage collection starts long before reaching 100% heap usage? At the time it happens there 's usually 10-15GB of heap showing up as available. 2. Am I correct to assume that even in batch mode Flink implements memory back pressure and is supposed to slow down processing/allocations when it's low on available heap memory? 3. If #2 is true, is it possible that due to some misconfiguration Flink considers more heap space to be available than there actually is and keeps allocating even though there's no more heap? 4. As an alternative to #3, is it possible that there are some unaccounted heap allocations that are not shown in the UI and by the metric and therefore not taken into account by the memory back pressure mechanism? Here's the minimal code example that demonstrates the issue: https://gist.github.com/andreiko/94c675b4f04b40144b4cb4474b2f050f I'm running this on Flink 12.2 (and many earlier versions, too) with the following base configuration and parallelism of 80 (tried lowering that to have more resources available, too): https://gist.github.com/andreiko/305d77c23be605042b85d9d4eb63f025 I tried many things with no success: - reducing parallelism and making more resources available to each task manager - enabling object reuse and modifying the tuple mapper to avoid extra tuple allocations - manipulating memory ratios to allocate more memory to be used as heap, managed - allocating 20% of memory for JVM overhead - switching to G1GC garbage collector Again, would appreciate any help with this. -- With regards, Andrey Bulgakov