Okay, I see. Thanks for the explanation. Br, Bjarke
On Wed, Aug 21, 2024 at 10:17 AM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > There is no golden egg in the RocksDB tuning area but it can help a > significantly. > > As a general rule the RocksDB state backend requires mainly 2 things to > perform well, memory and fast local storage. > You can tune these depending on the bottleneck and make measurements. > > Memory is used as a cache and the available amount will directly impact > read/write throughput. > RocksDB however does not use JVM Heap memory but instead it requires Flink > managed memory. > There are different ways to configure it but the recommended way is as a > percentage of total TM memory. > > Adding more ephemeral storage is an obvious k8s config. > > G > > > On Mon, Aug 19, 2024 at 4:21 PM Bjarke Tornager <bjarketorna...@gmail.com> > wrote: > >> Hi Gabor, >> >> Thanks for the suggestions. >> >> For the rocksDB config tweaks would you be able to make some concrete >> suggestions? I have looked into the official Tuning Checkpoints and Large >> State guide documentation, however tuning rocksDB does not seem >> straightforward tbh. >> >> My entire flinkConfiguration looks like this: >> >> spec: >> flinkConfiguration: >> env.java.opts.all: --add-opens java.base/java.lang=ALL-UNNAMED >> --add-opens java.base/java.util=ALL-UNNAMED >> --add-opens java.base/java.time=ALL-UNNAMED --add-opens >> java.base/java.nio=ALL-UNNAMED >> execution.checkpointing.file-merging.enabled: "true" >> execution.checkpointing.interval: "120000" >> execution.checkpointing.min-pause: "60000" >> execution.checkpointing.timeout: "900000" >> execution.checkpointing.unaligned: "true" >> fs.default-scheme: abfss://<container-name>@<storage-account-name>. >> dfs.core.windows.net >> high-availability.storageDir: >> abfss://<container-name>@<storage-account-name>. >> dfs.core.windows.net/test/ha >> high-availability.type: kubernetes >> job.autoscaler.enabled: "true" >> job.autoscaler.memory.tuning.enabled: "true" >> job.autoscaler.metrics.window: 10m >> job.autoscaler.stabilization.interval: 5m >> job.autoscaler.target.utilization: "0.6" >> job.autoscaler.target.utilization.boundary: "0.2" >> pipeline.max-parallelism: "4" >> rest.profiling.enabled: "true" >> state.backend: rocksdb >> state.backend.incremental: "true" >> state.backend.rocksdb.checkpoint.transfer.thread.num: "8" >> state.backend.rocksdb.compaction.style: UNIVERSAL >> state.backend.rocksdb.localdir: /mnt/flink >> state.backend.rocksdb.thread.num: "8" >> state.backend.rocksdb.writebuffer.size: 128MB >> state.checkpoints.dir: >> abfss://<container-name>@<storage-account-name>. >> dfs.core.windows.net/test/checkpointing >> state.savepoints.dir: abfss://<container-name>@<storage-account-name>. >> dfs.core.windows.net/test/savepoints >> taskmanager.network.memory.buffer-debloat.enabled: "true" >> taskmanager.numberOfTaskSlots: "1" >> >> Best regards, >> Bjarke >> >> >> On Mon, Aug 19, 2024 at 1:18 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> Please see my comments inline. >>> >>> G >>> >>> >>> On Thu, Aug 8, 2024 at 3:41 PM Bjarke Tornager <bjarketorna...@gmail.com> >>> wrote: >>> >>>> Hi Gabor, >>>> >>>> What kind of development work - is it the Disaggregated State Storage >>>> and Management FLIP ( >>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855) >>>> that you are referring to? >>>> >>> Yes. >>> >>>> >>>> Thanks for the suggestion. I am not familiar with the state processor >>>> api but I will have a look. >>>> >>>> My state is not even that large(10-50 Gb). It seems like people are >>>> using RocksDB with Tb of state but I am a bit puzzled as to how they handle >>>> restarts from savepoint when it takes so long. Do you know? >>>> >>> Yeah, we know users with Tb of states and not easy to handle such jobs >>> because of the long restore times. >>> Practically the cluster must be strong enough to catch up the >>> accumulated events after the long restore time + some config tweaks which >>> I've listed in the next bullet point. >>> >>>> >>>> Are there some flink configurations that I might be able to try out to >>>> alleviate the slow restart that might be causing the exception? >>>> >>> What helps is "state.backend.local-recovery=true", some rocksDB tuning >>> and reducing state size if possible (better eviction, etc...). >>> >>>> >>>> Best regards, >>>> Bjarke >>>> >>>> On Thu, Aug 8, 2024 at 11:16 AM Gabor Somogyi < >>>> gabor.g.somo...@gmail.com> wrote: >>>> >>>>> Hi Bjarke, >>>>> >>>>> It's normal to see longer recovery time as the state size grows. There >>>>> are developments in progress to mitigate this problem. >>>>> >>>>> > Any ideas as to what could be causing this? >>>>> I think there is no easy way to tell it, I can give just some pointers. >>>>> >>>>> First I would take a look at the state files with the State Processor >>>>> API[1], I would be really surprised though since ADLS is consistent, >>>>> only the rename with overwrite operation is not atomic what we don't >>>>> do in Flink. >>>>> >>>>> When the checkpoint files are not corrupt then I would add some debug >>>>> printouts to the exception catch area and try to find out the >>>>> exact root cause (since several Gb state can fit into memory maybe >>>>> local repro can be possible with debugger). >>>>> >>>>> [1] >>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/ >>>>> >>>>> BR, >>>>> G >>>>> >>>>> >>>>> On Wed, Aug 7, 2024 at 9:46 PM Bjarke Tornager < >>>>> bjarketorna...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I am doing stateful streaming (keyed coprocessing) using rocksDB. >>>>>> When my job restarts to adjust parallelism, and restores from savepoint, >>>>>> I >>>>>> quite often get the following error: >>>>>> >>>>>> 2024-08-07 19:43:09 >>>>>> java.lang.Exception: Exception while creating >>>>>> StreamOperatorStateContext. >>>>>> at org.apache.flink.streaming.api.operators. >>>>>> StreamTaskStateInitializerImpl.streamOperatorStateContext( >>>>>> StreamTaskStateInitializerImpl.java:330) >>>>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator >>>>>> .initializeState(AbstractStreamOperator.java:275) >>>>>> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain >>>>>> .initializeStateAndOpenOperators(RegularOperatorChain.java:106) >>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>> .restoreStateAndGates(StreamTask.java:858) >>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>> .lambda$restoreInternal$5(StreamTask.java:812) >>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>> StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) >>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>> .restoreInternal(StreamTask.java:812) >>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore( >>>>>> StreamTask.java:771) >>>>>> at org.apache.flink.runtime.taskmanager.Task >>>>>> .runWithSystemExitMonitoring(Task.java:970) >>>>>> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task >>>>>> .java:939) >>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) >>>>>> at java.base/java.lang.Thread.run(Unknown Source) >>>>>> Caused by: org.apache.flink.util.FlinkException: Could not restore >>>>>> keyed state backend for >>>>>> KeyedCoProcessOperator_30465cf1c06f2daf053ed603d38be4b6_(1/2) from >>>>>> any of the 1 provided restore options. >>>>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>>>>> .createAndRestore(BackendRestorerProcedure.java:165) >>>>>> at org.apache.flink.streaming.api.operators. >>>>>> StreamTaskStateInitializerImpl.keyedStatedBackend( >>>>>> StreamTaskStateInitializerImpl.java:457) >>>>>> at org.apache.flink.streaming.api.operators. >>>>>> StreamTaskStateInitializerImpl.streamOperatorStateContext( >>>>>> StreamTaskStateInitializerImpl.java:203) >>>>>> ... 12 more >>>>>> Caused by: org.apache.flink.runtime.state.BackendBuildingException: >>>>>> Caught unexpected exception. >>>>>> at org.apache.flink.contrib.streaming.state. >>>>>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder >>>>>> .java:459) >>>>>> at org.apache.flink.contrib.streaming.state. >>>>>> EmbeddedRocksDBStateBackend.createKeyedStateBackend( >>>>>> EmbeddedRocksDBStateBackend.java:533) >>>>>> at org.apache.flink.contrib.streaming.state. >>>>>> EmbeddedRocksDBStateBackend.createKeyedStateBackend( >>>>>> EmbeddedRocksDBStateBackend.java:96) >>>>>> at org.apache.flink.streaming.api.operators. >>>>>> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3( >>>>>> StreamTaskStateInitializerImpl.java:446) >>>>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>>>>> .attemptCreateAndRestore(BackendRestorerProcedure.java:173) >>>>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>>>>> .createAndRestore(BackendRestorerProcedure.java:137) >>>>>> ... 14 more >>>>>> Caused by: java.io.EOFException >>>>>> at java.base/java.io.DataInputStream.readFully(Unknown Source) >>>>>> at java.base/java.io.DataInputStream.readFully(Unknown Source) >>>>>> at org.apache.flink.api.common.typeutils.base.array. >>>>>> BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer >>>>>> .java:82) >>>>>> at org.apache.flink.runtime.state.restore. >>>>>> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next( >>>>>> FullSnapshotRestoreOperation.java:298) >>>>>> at org.apache.flink.runtime.state.restore. >>>>>> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next( >>>>>> FullSnapshotRestoreOperation.java:273) >>>>>> at org.apache.flink.contrib.streaming.state.restore. >>>>>> RocksDBFullRestoreOperation.restoreKVStateData( >>>>>> RocksDBFullRestoreOperation.java:148) >>>>>> at org.apache.flink.contrib.streaming.state.restore. >>>>>> RocksDBFullRestoreOperation.applyRestoreResult( >>>>>> RocksDBFullRestoreOperation.java:128) >>>>>> at org.apache.flink.contrib.streaming.state.restore. >>>>>> RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java: >>>>>> 102) >>>>>> at org.apache.flink.contrib.streaming.state. >>>>>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder >>>>>> .java:376) >>>>>> ... 19 more >>>>>> >>>>>> However, most often the job is able to start up after several >>>>>> retries. It seems to happen more often with higher parallelism (and >>>>>> higher >>>>>> cpu) and as such with more pods. >>>>>> Additionally, as the state grows into the Gb size it takes longer for >>>>>> the job to start up and as such my job has longer downtime due to the >>>>>> many >>>>>> retries before it successfully restores state. >>>>>> >>>>>> Any ideas as to what could be causing this? >>>>>> >>>>>> I am using Apache Flink Operator 1.8, Flink 1.20 using the official >>>>>> Apache Flink image, Java 17, and Adls Gen2 for durable storage. >>>>>> >>>>>> Best regards, >>>>>> Bjarke >>>>>> >>>>>