Hi David,

I can confirm that I'm able to reproduce this behaviour. I've tried
profiling/flame graphs and I was not able to make much sense out of those
results. There are no IO/Memory bottlenecks that I could notice, it looks
indeed like the Job is stuck inside RocksDB itself. This might be an issue
with for example memory configuration. Streaming jobs and State Processor
API are running in very different environments as the latter one is using
DataSet API under the hood, so maybe that can explain this? However I'm no
expert in neither DataSet API nor the RocksDB, so it's hard for me to make
progress here.

Maybe someone else can help here?

Piotrek


śr., 8 wrz 2021 o 14:45 David Causse <dcau...@wikimedia.org> napisał(a):

> Hi,
>
> I'm investigating why a job we use to inspect a flink state is a lot
> slower than the bootstrap job used to generate it.
>
> I use RocksdbDB with a simple keyed value state mapping a string key to a
> long value. Generating the bootstrap state from a CSV file with 100M
> entries takes a couple minutes over 12 slots spread over 3 TM (4Gb
> allowed). But another job that does the opposite (converts this state into
> a CSV file) takes several hours. I would have expected these two job
> runtimes to be in the same ballpark.
>
> I wrote a simple test case[1] to reproduce the problem. This program has 3
> jobs:
> - CreateState: generate a keyed state (string->long) using the state
> processor api
> - StreamJob: reads all the keys using a StreamingExecutionEnvironment
> - ReadState: reads all the keys using the state processor api
>
> Running with 30M keys and (12 slots/3TM with 4Gb each) CreateState &
> StreamJob are done in less than a minute.
> ReadState is much slower (> 30minutes) on my system. The RocksDB state
> appears to be restored relatively quickly but after that the slots are
> performing at very different speeds. Some slots finish quickly but some
> others struggle to advance.
> Looking at the thread dumps I always see threads in
> org.rocksdb.RocksDB.get:
>
> "DataSource (at readKeyedState(ExistingSavepoint.java:314)
> (org.apache.flink.state.api.input.KeyedStateInputFormat)) (12/12)#0" Id=371
> RUNNABLE
> at org.rocksdb.RocksDB.get(Native Method)
> at org.rocksdb.RocksDB.get(RocksDB.java:2084)
> at
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:83)
> at org.wikimedia.flink.StateReader.readKey(ReadState.scala:38)
> at org.wikimedia.flink.StateReader.readKey(ReadState.scala:32)
> at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:76)
> at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.processElement(KeyedStateReaderOperator.java:51)
> at
> org.apache.flink.state.api.input.KeyedStateInputFormat.nextRecord(KeyedStateInputFormat.java:228)
>
> It seems suspiciously slow to me and I'm wondering if I'm missing
> something in the way the state processor api works.
>
> Thanks for your help!
>
> David.
>
> 1: https://github.com/nomoa/rocksdb-state-processor-test
>

Reply via email to