Okay, I see. Thanks for the explanation.

Br,
Bjarke



On Wed, Aug 21, 2024 at 10:17 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> There is no golden egg in the RocksDB tuning area but it can help a
> significantly.
>
> As a general rule the RocksDB state backend requires mainly 2 things to
> perform well, memory and fast local storage.
> You can tune these depending on the bottleneck and make measurements.
>
> Memory is used as a cache and the available amount will directly impact
> read/write throughput.
> RocksDB however does not use JVM Heap memory but instead it requires Flink
> managed memory.
> There are different ways to configure it but the recommended way is as a
> percentage of total TM memory.
>
> Adding more ephemeral storage is an obvious k8s config.
>
> G
>
>
> On Mon, Aug 19, 2024 at 4:21 PM Bjarke Tornager <bjarketorna...@gmail.com>
> wrote:
>
>> Hi Gabor,
>>
>> Thanks for the suggestions.
>>
>> For the rocksDB config tweaks would you be able to make some concrete
>> suggestions? I have looked into the official Tuning Checkpoints and Large
>> State guide documentation, however tuning rocksDB does not seem
>> straightforward tbh.
>>
>> My entire flinkConfiguration looks like this:
>>
>> spec:
>>   flinkConfiguration:
>>     env.java.opts.all: --add-opens java.base/java.lang=ALL-UNNAMED
>> --add-opens java.base/java.util=ALL-UNNAMED
>>       --add-opens java.base/java.time=ALL-UNNAMED --add-opens
>> java.base/java.nio=ALL-UNNAMED
>>     execution.checkpointing.file-merging.enabled: "true"
>>     execution.checkpointing.interval: "120000"
>>     execution.checkpointing.min-pause: "60000"
>>     execution.checkpointing.timeout: "900000"
>>     execution.checkpointing.unaligned: "true"
>>     fs.default-scheme: abfss://<container-name>@<storage-account-name>.
>> dfs.core.windows.net
>>     high-availability.storageDir:
>> abfss://<container-name>@<storage-account-name>.
>> dfs.core.windows.net/test/ha
>>     high-availability.type: kubernetes
>>     job.autoscaler.enabled: "true"
>>     job.autoscaler.memory.tuning.enabled: "true"
>>     job.autoscaler.metrics.window: 10m
>>     job.autoscaler.stabilization.interval: 5m
>>     job.autoscaler.target.utilization: "0.6"
>>     job.autoscaler.target.utilization.boundary: "0.2"
>>     pipeline.max-parallelism: "4"
>>     rest.profiling.enabled: "true"
>>     state.backend: rocksdb
>>     state.backend.incremental: "true"
>>     state.backend.rocksdb.checkpoint.transfer.thread.num: "8"
>>     state.backend.rocksdb.compaction.style: UNIVERSAL
>>     state.backend.rocksdb.localdir: /mnt/flink
>>     state.backend.rocksdb.thread.num: "8"
>>     state.backend.rocksdb.writebuffer.size: 128MB
>>     state.checkpoints.dir:
>> abfss://<container-name>@<storage-account-name>.
>> dfs.core.windows.net/test/checkpointing
>>     state.savepoints.dir: abfss://<container-name>@<storage-account-name>.
>> dfs.core.windows.net/test/savepoints
>>     taskmanager.network.memory.buffer-debloat.enabled: "true"
>>     taskmanager.numberOfTaskSlots: "1"
>>
>> Best regards,
>> Bjarke
>>
>>
>> On Mon, Aug 19, 2024 at 1:18 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>>> Please see my comments inline.
>>>
>>> G
>>>
>>>
>>> On Thu, Aug 8, 2024 at 3:41 PM Bjarke Tornager <bjarketorna...@gmail.com>
>>> wrote:
>>>
>>>> Hi Gabor,
>>>>
>>>> What kind of development work - is it the Disaggregated State Storage
>>>> and Management FLIP (
>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855)
>>>> that you are referring to?
>>>>
>>> Yes.
>>>
>>>>
>>>> Thanks for the suggestion. I am not familiar with the state processor
>>>> api but I will have a look.
>>>>
>>>> My state is not even that large(10-50 Gb). It seems like people are
>>>> using RocksDB with Tb of state but I am a bit puzzled as to how they handle
>>>> restarts from savepoint when it takes so long. Do you know?
>>>>
>>> Yeah, we know users with Tb of states and not easy to handle such jobs
>>> because of the long restore times.
>>> Practically the cluster must be strong enough to catch up the
>>> accumulated events after the long restore time + some config tweaks which
>>> I've listed in the next bullet point.
>>>
>>>>
>>>> Are there some flink configurations that I might be able to try out to
>>>> alleviate the slow restart that might be causing the exception?
>>>>
>>> What helps is "state.backend.local-recovery=true", some rocksDB tuning
>>> and reducing state size if possible (better eviction, etc...).
>>>
>>>>
>>>> Best regards,
>>>> Bjarke
>>>>
>>>> On Thu, Aug 8, 2024 at 11:16 AM Gabor Somogyi <
>>>> gabor.g.somo...@gmail.com> wrote:
>>>>
>>>>> Hi Bjarke,
>>>>>
>>>>> It's normal to see longer recovery time as the state size grows. There
>>>>> are developments in progress to mitigate this problem.
>>>>>
>>>>> > Any ideas as to what could be causing this?
>>>>> I think there is no easy way to tell it, I can give just some pointers.
>>>>>
>>>>> First I would take a look at the state files with the State Processor
>>>>> API[1], I would be really surprised though since ADLS is consistent,
>>>>> only the rename with overwrite operation is not atomic what we don't
>>>>> do in Flink.
>>>>>
>>>>> When the checkpoint files are not corrupt then I would add some debug
>>>>> printouts to the exception catch area and try to find out the
>>>>> exact root cause (since several Gb state can fit into memory maybe
>>>>> local repro can be possible with debugger).
>>>>>
>>>>> [1]
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
>>>>>
>>>>> BR,
>>>>> G
>>>>>
>>>>>
>>>>> On Wed, Aug 7, 2024 at 9:46 PM Bjarke Tornager <
>>>>> bjarketorna...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am doing stateful streaming (keyed coprocessing) using rocksDB.
>>>>>> When my job restarts to adjust parallelism, and restores from savepoint, 
>>>>>> I
>>>>>> quite often get the following error:
>>>>>>
>>>>>> 2024-08-07 19:43:09
>>>>>> java.lang.Exception: Exception while creating
>>>>>> StreamOperatorStateContext.
>>>>>> at org.apache.flink.streaming.api.operators.
>>>>>> StreamTaskStateInitializerImpl.streamOperatorStateContext(
>>>>>> StreamTaskStateInitializerImpl.java:330)
>>>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
>>>>>> .initializeState(AbstractStreamOperator.java:275)
>>>>>> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
>>>>>> .initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>> .restoreStateAndGates(StreamTask.java:858)
>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>> .lambda$restoreInternal$5(StreamTask.java:812)
>>>>>> at org.apache.flink.streaming.runtime.tasks.
>>>>>> StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>> .restoreInternal(StreamTask.java:812)
>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(
>>>>>> StreamTask.java:771)
>>>>>> at org.apache.flink.runtime.taskmanager.Task
>>>>>> .runWithSystemExitMonitoring(Task.java:970)
>>>>>> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task
>>>>>> .java:939)
>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>>>>> at java.base/java.lang.Thread.run(Unknown Source)
>>>>>> Caused by: org.apache.flink.util.FlinkException: Could not restore
>>>>>> keyed state backend for
>>>>>> KeyedCoProcessOperator_30465cf1c06f2daf053ed603d38be4b6_(1/2) from
>>>>>> any of the 1 provided restore options.
>>>>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>>>>>> .createAndRestore(BackendRestorerProcedure.java:165)
>>>>>> at org.apache.flink.streaming.api.operators.
>>>>>> StreamTaskStateInitializerImpl.keyedStatedBackend(
>>>>>> StreamTaskStateInitializerImpl.java:457)
>>>>>> at org.apache.flink.streaming.api.operators.
>>>>>> StreamTaskStateInitializerImpl.streamOperatorStateContext(
>>>>>> StreamTaskStateInitializerImpl.java:203)
>>>>>> ... 12 more
>>>>>> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>>>>>> Caught unexpected exception.
>>>>>> at org.apache.flink.contrib.streaming.state.
>>>>>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder
>>>>>> .java:459)
>>>>>> at org.apache.flink.contrib.streaming.state.
>>>>>> EmbeddedRocksDBStateBackend.createKeyedStateBackend(
>>>>>> EmbeddedRocksDBStateBackend.java:533)
>>>>>> at org.apache.flink.contrib.streaming.state.
>>>>>> EmbeddedRocksDBStateBackend.createKeyedStateBackend(
>>>>>> EmbeddedRocksDBStateBackend.java:96)
>>>>>> at org.apache.flink.streaming.api.operators.
>>>>>> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(
>>>>>> StreamTaskStateInitializerImpl.java:446)
>>>>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>>>>>> .attemptCreateAndRestore(BackendRestorerProcedure.java:173)
>>>>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>>>>>> .createAndRestore(BackendRestorerProcedure.java:137)
>>>>>> ... 14 more
>>>>>> Caused by: java.io.EOFException
>>>>>> at java.base/java.io.DataInputStream.readFully(Unknown Source)
>>>>>> at java.base/java.io.DataInputStream.readFully(Unknown Source)
>>>>>> at org.apache.flink.api.common.typeutils.base.array.
>>>>>> BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer
>>>>>> .java:82)
>>>>>> at org.apache.flink.runtime.state.restore.
>>>>>> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next(
>>>>>> FullSnapshotRestoreOperation.java:298)
>>>>>> at org.apache.flink.runtime.state.restore.
>>>>>> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next(
>>>>>> FullSnapshotRestoreOperation.java:273)
>>>>>> at org.apache.flink.contrib.streaming.state.restore.
>>>>>> RocksDBFullRestoreOperation.restoreKVStateData(
>>>>>> RocksDBFullRestoreOperation.java:148)
>>>>>> at org.apache.flink.contrib.streaming.state.restore.
>>>>>> RocksDBFullRestoreOperation.applyRestoreResult(
>>>>>> RocksDBFullRestoreOperation.java:128)
>>>>>> at org.apache.flink.contrib.streaming.state.restore.
>>>>>> RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:
>>>>>> 102)
>>>>>> at org.apache.flink.contrib.streaming.state.
>>>>>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder
>>>>>> .java:376)
>>>>>> ... 19 more
>>>>>>
>>>>>> However, most often the job is able to start up after several
>>>>>> retries. It seems to happen more often with higher parallelism (and 
>>>>>> higher
>>>>>> cpu) and as such with more pods.
>>>>>> Additionally, as the state grows into the Gb size it takes longer for
>>>>>> the job to start up and as such my job has longer downtime due to the 
>>>>>> many
>>>>>> retries before it successfully restores state.
>>>>>>
>>>>>> Any ideas as to what could be causing this?
>>>>>>
>>>>>> I am using Apache Flink Operator 1.8, Flink 1.20 using the official
>>>>>> Apache Flink image, Java 17, and Adls Gen2 for durable storage.
>>>>>>
>>>>>> Best regards,
>>>>>> Bjarke
>>>>>>
>>>>>

Reply via email to