If anyone is interested, I reliazed that State Processor API was not the right tool for this since it spends a lot of time rebuilding RocksDB tables and then a lot of memory trying to read from it. All I really needed was operator keys.
So I used SavepointLoader.loadSavepointMetadata to get KeyGroupsStateHandle objects and built an InputFormat heavily based on the code I found in RocksDBFullRestoreOperation.java. It ended up working extremely quickly while keeping memory and CPU usage at the minimum. On Tue, Mar 9, 2021 at 1:51 PM Andrey Bulgakov <m...@andreiko.ru> wrote: > Hi all, > > I'm trying to use the State Processor API to extract all keys from a > RocksDB savepoint produced by an operator in a Flink streaming job into CSV > files. > > The problem is that the storage size of the savepoint is 30TB and I'm > running into garbage collection issues no matter how much memory in > different proportions or CPU cores I allocate to task managers. (I tried > allocating up to 120GB and 16 cores to each task). > > The same program and hardware configuration works with no problems for a > smaller savepoint (300GB), it's some sort of a scalability issue here. > > At the beginning the tasks spend a couple hours in what I call "the > download phase". During that phase heap usage as indicated by metrics and > Flink UI is at about 10% and everything is going great. > > But at certain point heap usage for tasks coming out of the download phase > starts to go up, climbs up to about 87% usage as indicated in Flink UI and > by the "tm.Status.JVM.Memory.Heap.Used" metric. At that point the heap > usage metric doesn't increase anymore and JVM starts spending a lot of time > collecting garbage and keeping all CPUs 100% loaded. After some time in > this mode the job crashes with "java.util.concurrent.TimeoutException: > Heartbeat of TaskManager with id container_1614821414188_0002_01_000035 > timed out." > > At all times the indicated managed memory usage is 0%. Which seems > suspicious since RocksDB is supposed to be using it? > > Also, judging by the lack of an application metric I have in the state > processor operator, KeyedStateReaderFunction.readKey never gets called. > > I would appreciate if somebody helped answering some of my questions or > suggested a way I could further diagnose/fix this: > > 1. Is it normal that this overwhelming garbage collection starts long > before reaching 100% heap usage? At the time it happens there 's usually > 10-15GB of heap showing up as available. > > 2. Am I correct to assume that even in batch mode Flink implements memory > back pressure and is supposed to slow down processing/allocations when it's > low on available heap memory? > > 3. If #2 is true, is it possible that due to some misconfiguration Flink > considers more heap space to be available than there actually is and keeps > allocating even though there's no more heap? > > 4. As an alternative to #3, is it possible that there are some unaccounted > heap allocations that are not shown in the UI and by the metric and > therefore not taken into account by the memory back pressure mechanism? > > Here's the minimal code example that demonstrates the issue: > https://gist.github.com/andreiko/94c675b4f04b40144b4cb4474b2f050f > > I'm running this on Flink 12.2 (and many earlier versions, too) with the > following base configuration and parallelism of 80 (tried lowering that to > have more resources available, too): > https://gist.github.com/andreiko/305d77c23be605042b85d9d4eb63f025 > > I tried many things with no success: > - reducing parallelism and making more resources available to each task > manager > - enabling object reuse and modifying the tuple mapper to avoid extra > tuple allocations > - manipulating memory ratios to allocate more memory to be used as heap, > managed > - allocating 20% of memory for JVM overhead > - switching to G1GC garbage collector > > Again, would appreciate any help with this. > > -- > With regards, > Andrey Bulgakov > -- With regards, Andrey Bulgakov