There is no golden egg in the RocksDB tuning area but it can help a significantly.
As a general rule the RocksDB state backend requires mainly 2 things to perform well, memory and fast local storage. You can tune these depending on the bottleneck and make measurements. Memory is used as a cache and the available amount will directly impact read/write throughput. RocksDB however does not use JVM Heap memory but instead it requires Flink managed memory. There are different ways to configure it but the recommended way is as a percentage of total TM memory. Adding more ephemeral storage is an obvious k8s config. G On Mon, Aug 19, 2024 at 4:21 PM Bjarke Tornager <bjarketorna...@gmail.com> wrote: > Hi Gabor, > > Thanks for the suggestions. > > For the rocksDB config tweaks would you be able to make some concrete > suggestions? I have looked into the official Tuning Checkpoints and Large > State guide documentation, however tuning rocksDB does not seem > straightforward tbh. > > My entire flinkConfiguration looks like this: > > spec: > flinkConfiguration: > env.java.opts.all: --add-opens java.base/java.lang=ALL-UNNAMED > --add-opens java.base/java.util=ALL-UNNAMED > --add-opens java.base/java.time=ALL-UNNAMED --add-opens > java.base/java.nio=ALL-UNNAMED > execution.checkpointing.file-merging.enabled: "true" > execution.checkpointing.interval: "120000" > execution.checkpointing.min-pause: "60000" > execution.checkpointing.timeout: "900000" > execution.checkpointing.unaligned: "true" > fs.default-scheme: abfss://<container-name>@<storage-account-name>. > dfs.core.windows.net > high-availability.storageDir: > abfss://<container-name>@<storage-account-name>. > dfs.core.windows.net/test/ha > high-availability.type: kubernetes > job.autoscaler.enabled: "true" > job.autoscaler.memory.tuning.enabled: "true" > job.autoscaler.metrics.window: 10m > job.autoscaler.stabilization.interval: 5m > job.autoscaler.target.utilization: "0.6" > job.autoscaler.target.utilization.boundary: "0.2" > pipeline.max-parallelism: "4" > rest.profiling.enabled: "true" > state.backend: rocksdb > state.backend.incremental: "true" > state.backend.rocksdb.checkpoint.transfer.thread.num: "8" > state.backend.rocksdb.compaction.style: UNIVERSAL > state.backend.rocksdb.localdir: /mnt/flink > state.backend.rocksdb.thread.num: "8" > state.backend.rocksdb.writebuffer.size: 128MB > state.checkpoints.dir: abfss://<container-name>@<storage-account-name>. > dfs.core.windows.net/test/checkpointing > state.savepoints.dir: abfss://<container-name>@<storage-account-name>. > dfs.core.windows.net/test/savepoints > taskmanager.network.memory.buffer-debloat.enabled: "true" > taskmanager.numberOfTaskSlots: "1" > > Best regards, > Bjarke > > > On Mon, Aug 19, 2024 at 1:18 PM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> Please see my comments inline. >> >> G >> >> >> On Thu, Aug 8, 2024 at 3:41 PM Bjarke Tornager <bjarketorna...@gmail.com> >> wrote: >> >>> Hi Gabor, >>> >>> What kind of development work - is it the Disaggregated State Storage >>> and Management FLIP ( >>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855) >>> that you are referring to? >>> >> Yes. >> >>> >>> Thanks for the suggestion. I am not familiar with the state processor >>> api but I will have a look. >>> >>> My state is not even that large(10-50 Gb). It seems like people are >>> using RocksDB with Tb of state but I am a bit puzzled as to how they handle >>> restarts from savepoint when it takes so long. Do you know? >>> >> Yeah, we know users with Tb of states and not easy to handle such jobs >> because of the long restore times. >> Practically the cluster must be strong enough to catch up the accumulated >> events after the long restore time + some config tweaks which I've listed >> in the next bullet point. >> >>> >>> Are there some flink configurations that I might be able to try out to >>> alleviate the slow restart that might be causing the exception? >>> >> What helps is "state.backend.local-recovery=true", some rocksDB tuning >> and reducing state size if possible (better eviction, etc...). >> >>> >>> Best regards, >>> Bjarke >>> >>> On Thu, Aug 8, 2024 at 11:16 AM Gabor Somogyi <gabor.g.somo...@gmail.com> >>> wrote: >>> >>>> Hi Bjarke, >>>> >>>> It's normal to see longer recovery time as the state size grows. There >>>> are developments in progress to mitigate this problem. >>>> >>>> > Any ideas as to what could be causing this? >>>> I think there is no easy way to tell it, I can give just some pointers. >>>> >>>> First I would take a look at the state files with the State Processor >>>> API[1], I would be really surprised though since ADLS is consistent, >>>> only the rename with overwrite operation is not atomic what we don't do >>>> in Flink. >>>> >>>> When the checkpoint files are not corrupt then I would add some debug >>>> printouts to the exception catch area and try to find out the >>>> exact root cause (since several Gb state can fit into memory maybe >>>> local repro can be possible with debugger). >>>> >>>> [1] >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/ >>>> >>>> BR, >>>> G >>>> >>>> >>>> On Wed, Aug 7, 2024 at 9:46 PM Bjarke Tornager < >>>> bjarketorna...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> I am doing stateful streaming (keyed coprocessing) using rocksDB. When >>>>> my job restarts to adjust parallelism, and restores from savepoint, I >>>>> quite >>>>> often get the following error: >>>>> >>>>> 2024-08-07 19:43:09 >>>>> java.lang.Exception: Exception while creating >>>>> StreamOperatorStateContext. >>>>> at org.apache.flink.streaming.api.operators. >>>>> StreamTaskStateInitializerImpl.streamOperatorStateContext( >>>>> StreamTaskStateInitializerImpl.java:330) >>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator >>>>> .initializeState(AbstractStreamOperator.java:275) >>>>> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain >>>>> .initializeStateAndOpenOperators(RegularOperatorChain.java:106) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>> .restoreStateAndGates(StreamTask.java:858) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>> .lambda$restoreInternal$5(StreamTask.java:812) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 >>>>> .call(StreamTaskActionExecutor.java:55) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>> .restoreInternal(StreamTask.java:812) >>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore( >>>>> StreamTask.java:771) >>>>> at org.apache.flink.runtime.taskmanager.Task >>>>> .runWithSystemExitMonitoring(Task.java:970) >>>>> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task >>>>> .java:939) >>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) >>>>> at java.base/java.lang.Thread.run(Unknown Source) >>>>> Caused by: org.apache.flink.util.FlinkException: Could not restore >>>>> keyed state backend for >>>>> KeyedCoProcessOperator_30465cf1c06f2daf053ed603d38be4b6_(1/2) from any >>>>> of the 1 provided restore options. >>>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>>>> .createAndRestore(BackendRestorerProcedure.java:165) >>>>> at org.apache.flink.streaming.api.operators. >>>>> StreamTaskStateInitializerImpl.keyedStatedBackend( >>>>> StreamTaskStateInitializerImpl.java:457) >>>>> at org.apache.flink.streaming.api.operators. >>>>> StreamTaskStateInitializerImpl.streamOperatorStateContext( >>>>> StreamTaskStateInitializerImpl.java:203) >>>>> ... 12 more >>>>> Caused by: org.apache.flink.runtime.state.BackendBuildingException: >>>>> Caught unexpected exception. >>>>> at org.apache.flink.contrib.streaming.state. >>>>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder >>>>> .java:459) >>>>> at org.apache.flink.contrib.streaming.state. >>>>> EmbeddedRocksDBStateBackend.createKeyedStateBackend( >>>>> EmbeddedRocksDBStateBackend.java:533) >>>>> at org.apache.flink.contrib.streaming.state. >>>>> EmbeddedRocksDBStateBackend.createKeyedStateBackend( >>>>> EmbeddedRocksDBStateBackend.java:96) >>>>> at org.apache.flink.streaming.api.operators. >>>>> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3( >>>>> StreamTaskStateInitializerImpl.java:446) >>>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>>>> .attemptCreateAndRestore(BackendRestorerProcedure.java:173) >>>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>>>> .createAndRestore(BackendRestorerProcedure.java:137) >>>>> ... 14 more >>>>> Caused by: java.io.EOFException >>>>> at java.base/java.io.DataInputStream.readFully(Unknown Source) >>>>> at java.base/java.io.DataInputStream.readFully(Unknown Source) >>>>> at org.apache.flink.api.common.typeutils.base.array. >>>>> BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer >>>>> .java:82) >>>>> at org.apache.flink.runtime.state.restore. >>>>> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next( >>>>> FullSnapshotRestoreOperation.java:298) >>>>> at org.apache.flink.runtime.state.restore. >>>>> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next( >>>>> FullSnapshotRestoreOperation.java:273) >>>>> at org.apache.flink.contrib.streaming.state.restore. >>>>> RocksDBFullRestoreOperation.restoreKVStateData( >>>>> RocksDBFullRestoreOperation.java:148) >>>>> at org.apache.flink.contrib.streaming.state.restore. >>>>> RocksDBFullRestoreOperation.applyRestoreResult( >>>>> RocksDBFullRestoreOperation.java:128) >>>>> at org.apache.flink.contrib.streaming.state.restore. >>>>> RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java: >>>>> 102) >>>>> at org.apache.flink.contrib.streaming.state. >>>>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder >>>>> .java:376) >>>>> ... 19 more >>>>> >>>>> However, most often the job is able to start up after several retries. >>>>> It seems to happen more often with higher parallelism (and higher cpu) and >>>>> as such with more pods. >>>>> Additionally, as the state grows into the Gb size it takes longer for >>>>> the job to start up and as such my job has longer downtime due to the many >>>>> retries before it successfully restores state. >>>>> >>>>> Any ideas as to what could be causing this? >>>>> >>>>> I am using Apache Flink Operator 1.8, Flink 1.20 using the official >>>>> Apache Flink image, Java 17, and Adls Gen2 for durable storage. >>>>> >>>>> Best regards, >>>>> Bjarke >>>>> >>>>