There is no golden egg in the RocksDB tuning area but it can help a
significantly.

As a general rule the RocksDB state backend requires mainly 2 things to
perform well, memory and fast local storage.
You can tune these depending on the bottleneck and make measurements.

Memory is used as a cache and the available amount will directly impact
read/write throughput.
RocksDB however does not use JVM Heap memory but instead it requires Flink
managed memory.
There are different ways to configure it but the recommended way is as a
percentage of total TM memory.

Adding more ephemeral storage is an obvious k8s config.

G


On Mon, Aug 19, 2024 at 4:21 PM Bjarke Tornager <bjarketorna...@gmail.com>
wrote:

> Hi Gabor,
>
> Thanks for the suggestions.
>
> For the rocksDB config tweaks would you be able to make some concrete
> suggestions? I have looked into the official Tuning Checkpoints and Large
> State guide documentation, however tuning rocksDB does not seem
> straightforward tbh.
>
> My entire flinkConfiguration looks like this:
>
> spec:
>   flinkConfiguration:
>     env.java.opts.all: --add-opens java.base/java.lang=ALL-UNNAMED
> --add-opens java.base/java.util=ALL-UNNAMED
>       --add-opens java.base/java.time=ALL-UNNAMED --add-opens
> java.base/java.nio=ALL-UNNAMED
>     execution.checkpointing.file-merging.enabled: "true"
>     execution.checkpointing.interval: "120000"
>     execution.checkpointing.min-pause: "60000"
>     execution.checkpointing.timeout: "900000"
>     execution.checkpointing.unaligned: "true"
>     fs.default-scheme: abfss://<container-name>@<storage-account-name>.
> dfs.core.windows.net
>     high-availability.storageDir:
> abfss://<container-name>@<storage-account-name>.
> dfs.core.windows.net/test/ha
>     high-availability.type: kubernetes
>     job.autoscaler.enabled: "true"
>     job.autoscaler.memory.tuning.enabled: "true"
>     job.autoscaler.metrics.window: 10m
>     job.autoscaler.stabilization.interval: 5m
>     job.autoscaler.target.utilization: "0.6"
>     job.autoscaler.target.utilization.boundary: "0.2"
>     pipeline.max-parallelism: "4"
>     rest.profiling.enabled: "true"
>     state.backend: rocksdb
>     state.backend.incremental: "true"
>     state.backend.rocksdb.checkpoint.transfer.thread.num: "8"
>     state.backend.rocksdb.compaction.style: UNIVERSAL
>     state.backend.rocksdb.localdir: /mnt/flink
>     state.backend.rocksdb.thread.num: "8"
>     state.backend.rocksdb.writebuffer.size: 128MB
>     state.checkpoints.dir: abfss://<container-name>@<storage-account-name>.
> dfs.core.windows.net/test/checkpointing
>     state.savepoints.dir: abfss://<container-name>@<storage-account-name>.
> dfs.core.windows.net/test/savepoints
>     taskmanager.network.memory.buffer-debloat.enabled: "true"
>     taskmanager.numberOfTaskSlots: "1"
>
> Best regards,
> Bjarke
>
>
> On Mon, Aug 19, 2024 at 1:18 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> Please see my comments inline.
>>
>> G
>>
>>
>> On Thu, Aug 8, 2024 at 3:41 PM Bjarke Tornager <bjarketorna...@gmail.com>
>> wrote:
>>
>>> Hi Gabor,
>>>
>>> What kind of development work - is it the Disaggregated State Storage
>>> and Management FLIP (
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855)
>>> that you are referring to?
>>>
>> Yes.
>>
>>>
>>> Thanks for the suggestion. I am not familiar with the state processor
>>> api but I will have a look.
>>>
>>> My state is not even that large(10-50 Gb). It seems like people are
>>> using RocksDB with Tb of state but I am a bit puzzled as to how they handle
>>> restarts from savepoint when it takes so long. Do you know?
>>>
>> Yeah, we know users with Tb of states and not easy to handle such jobs
>> because of the long restore times.
>> Practically the cluster must be strong enough to catch up the accumulated
>> events after the long restore time + some config tweaks which I've listed
>> in the next bullet point.
>>
>>>
>>> Are there some flink configurations that I might be able to try out to
>>> alleviate the slow restart that might be causing the exception?
>>>
>> What helps is "state.backend.local-recovery=true", some rocksDB tuning
>> and reducing state size if possible (better eviction, etc...).
>>
>>>
>>> Best regards,
>>> Bjarke
>>>
>>> On Thu, Aug 8, 2024 at 11:16 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> Hi Bjarke,
>>>>
>>>> It's normal to see longer recovery time as the state size grows. There
>>>> are developments in progress to mitigate this problem.
>>>>
>>>> > Any ideas as to what could be causing this?
>>>> I think there is no easy way to tell it, I can give just some pointers.
>>>>
>>>> First I would take a look at the state files with the State Processor
>>>> API[1], I would be really surprised though since ADLS is consistent,
>>>> only the rename with overwrite operation is not atomic what we don't do
>>>> in Flink.
>>>>
>>>> When the checkpoint files are not corrupt then I would add some debug
>>>> printouts to the exception catch area and try to find out the
>>>> exact root cause (since several Gb state can fit into memory maybe
>>>> local repro can be possible with debugger).
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
>>>>
>>>> BR,
>>>> G
>>>>
>>>>
>>>> On Wed, Aug 7, 2024 at 9:46 PM Bjarke Tornager <
>>>> bjarketorna...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am doing stateful streaming (keyed coprocessing) using rocksDB. When
>>>>> my job restarts to adjust parallelism, and restores from savepoint, I 
>>>>> quite
>>>>> often get the following error:
>>>>>
>>>>> 2024-08-07 19:43:09
>>>>> java.lang.Exception: Exception while creating
>>>>> StreamOperatorStateContext.
>>>>> at org.apache.flink.streaming.api.operators.
>>>>> StreamTaskStateInitializerImpl.streamOperatorStateContext(
>>>>> StreamTaskStateInitializerImpl.java:330)
>>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
>>>>> .initializeState(AbstractStreamOperator.java:275)
>>>>> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
>>>>> .initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>> .restoreStateAndGates(StreamTask.java:858)
>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>> .lambda$restoreInternal$5(StreamTask.java:812)
>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
>>>>> .call(StreamTaskActionExecutor.java:55)
>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>> .restoreInternal(StreamTask.java:812)
>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(
>>>>> StreamTask.java:771)
>>>>> at org.apache.flink.runtime.taskmanager.Task
>>>>> .runWithSystemExitMonitoring(Task.java:970)
>>>>> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task
>>>>> .java:939)
>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>>>> at java.base/java.lang.Thread.run(Unknown Source)
>>>>> Caused by: org.apache.flink.util.FlinkException: Could not restore
>>>>> keyed state backend for
>>>>> KeyedCoProcessOperator_30465cf1c06f2daf053ed603d38be4b6_(1/2) from any
>>>>> of the 1 provided restore options.
>>>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>>>>> .createAndRestore(BackendRestorerProcedure.java:165)
>>>>> at org.apache.flink.streaming.api.operators.
>>>>> StreamTaskStateInitializerImpl.keyedStatedBackend(
>>>>> StreamTaskStateInitializerImpl.java:457)
>>>>> at org.apache.flink.streaming.api.operators.
>>>>> StreamTaskStateInitializerImpl.streamOperatorStateContext(
>>>>> StreamTaskStateInitializerImpl.java:203)
>>>>> ... 12 more
>>>>> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>>>>> Caught unexpected exception.
>>>>> at org.apache.flink.contrib.streaming.state.
>>>>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder
>>>>> .java:459)
>>>>> at org.apache.flink.contrib.streaming.state.
>>>>> EmbeddedRocksDBStateBackend.createKeyedStateBackend(
>>>>> EmbeddedRocksDBStateBackend.java:533)
>>>>> at org.apache.flink.contrib.streaming.state.
>>>>> EmbeddedRocksDBStateBackend.createKeyedStateBackend(
>>>>> EmbeddedRocksDBStateBackend.java:96)
>>>>> at org.apache.flink.streaming.api.operators.
>>>>> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(
>>>>> StreamTaskStateInitializerImpl.java:446)
>>>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>>>>> .attemptCreateAndRestore(BackendRestorerProcedure.java:173)
>>>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>>>>> .createAndRestore(BackendRestorerProcedure.java:137)
>>>>> ... 14 more
>>>>> Caused by: java.io.EOFException
>>>>> at java.base/java.io.DataInputStream.readFully(Unknown Source)
>>>>> at java.base/java.io.DataInputStream.readFully(Unknown Source)
>>>>> at org.apache.flink.api.common.typeutils.base.array.
>>>>> BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer
>>>>> .java:82)
>>>>> at org.apache.flink.runtime.state.restore.
>>>>> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next(
>>>>> FullSnapshotRestoreOperation.java:298)
>>>>> at org.apache.flink.runtime.state.restore.
>>>>> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next(
>>>>> FullSnapshotRestoreOperation.java:273)
>>>>> at org.apache.flink.contrib.streaming.state.restore.
>>>>> RocksDBFullRestoreOperation.restoreKVStateData(
>>>>> RocksDBFullRestoreOperation.java:148)
>>>>> at org.apache.flink.contrib.streaming.state.restore.
>>>>> RocksDBFullRestoreOperation.applyRestoreResult(
>>>>> RocksDBFullRestoreOperation.java:128)
>>>>> at org.apache.flink.contrib.streaming.state.restore.
>>>>> RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:
>>>>> 102)
>>>>> at org.apache.flink.contrib.streaming.state.
>>>>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder
>>>>> .java:376)
>>>>> ... 19 more
>>>>>
>>>>> However, most often the job is able to start up after several retries.
>>>>> It seems to happen more often with higher parallelism (and higher cpu) and
>>>>> as such with more pods.
>>>>> Additionally, as the state grows into the Gb size it takes longer for
>>>>> the job to start up and as such my job has longer downtime due to the many
>>>>> retries before it successfully restores state.
>>>>>
>>>>> Any ideas as to what could be causing this?
>>>>>
>>>>> I am using Apache Flink Operator 1.8, Flink 1.20 using the official
>>>>> Apache Flink image, Java 17, and Adls Gen2 for durable storage.
>>>>>
>>>>> Best regards,
>>>>> Bjarke
>>>>>
>>>>

Reply via email to