Hi David, I can confirm that I'm able to reproduce this behaviour. I've tried profiling/flame graphs and I was not able to make much sense out of those results. There are no IO/Memory bottlenecks that I could notice, it looks indeed like the Job is stuck inside RocksDB itself. This might be an issue with for example memory configuration. Streaming jobs and State Processor API are running in very different environments as the latter one is using DataSet API under the hood, so maybe that can explain this? However I'm no expert in neither DataSet API nor the RocksDB, so it's hard for me to make progress here.
Maybe someone else can help here? Piotrek śr., 8 wrz 2021 o 14:45 David Causse <dcau...@wikimedia.org> napisał(a): > Hi, > > I'm investigating why a job we use to inspect a flink state is a lot > slower than the bootstrap job used to generate it. > > I use RocksdbDB with a simple keyed value state mapping a string key to a > long value. Generating the bootstrap state from a CSV file with 100M > entries takes a couple minutes over 12 slots spread over 3 TM (4Gb > allowed). But another job that does the opposite (converts this state into > a CSV file) takes several hours. I would have expected these two job > runtimes to be in the same ballpark. > > I wrote a simple test case[1] to reproduce the problem. This program has 3 > jobs: > - CreateState: generate a keyed state (string->long) using the state > processor api > - StreamJob: reads all the keys using a StreamingExecutionEnvironment > - ReadState: reads all the keys using the state processor api > > Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState & > StreamJob are done in less than a minute. > ReadState is much slower (> 30minutes) on my system. The RocksDB state > appears to be restored relatively quickly but after that the slots are > performing at very different speeds. Some slots finish quickly but some > others struggle to advance. > Looking at the thread dumps I always see threads in > org.rocksdb.RocksDB.get: > > "DataSource (at readKeyedState(ExistingSavepoint.java:314) > (org.apache.flink.state.api.input.KeyedStateInputFormat)) (12/12)#0" Id=371 > RUNNABLE > at org.rocksdb.RocksDB.get(Native Method) > at org.rocksdb.RocksDB.get(RocksDB.java:2084) > at > org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83) > at org.wikimedia.flink.StateReader.readKey(ReadState.scala:38) > at org.wikimedia.flink.StateReader.readKey(ReadState.scala:32) > at > org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:76) > at > org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:51) > at > org.apache.flink.state.api.input.KeyedStateInputFormat.nextRecord(KeyedStateInputFormat.java:228) > > It seems suspiciously slow to me and I'm wondering if I'm missing > something in the way the state processor api works. > > Thanks for your help! > > David. > > 1: https://github.com/nomoa/rocksdb-state-processor-test >