Hi Gabor, Thanks for the suggestions.
For the rocksDB config tweaks would you be able to make some concrete suggestions? I have looked into the official Tuning Checkpoints and Large State guide documentation, however tuning rocksDB does not seem straightforward tbh. My entire flinkConfiguration looks like this: spec: flinkConfiguration: env.java.opts.all: --add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.time=ALL-UNNAMED --add-opens java.base/java.nio=ALL-UNNAMED execution.checkpointing.file-merging.enabled: "true" execution.checkpointing.interval: "120000" execution.checkpointing.min-pause: "60000" execution.checkpointing.timeout: "900000" execution.checkpointing.unaligned: "true" fs.default-scheme: abfss://<container-name>@<storage-account-name>. dfs.core.windows.net high-availability.storageDir: abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/test/ha high-availability.type: kubernetes job.autoscaler.enabled: "true" job.autoscaler.memory.tuning.enabled: "true" job.autoscaler.metrics.window: 10m job.autoscaler.stabilization.interval: 5m job.autoscaler.target.utilization: "0.6" job.autoscaler.target.utilization.boundary: "0.2" pipeline.max-parallelism: "4" rest.profiling.enabled: "true" state.backend: rocksdb state.backend.incremental: "true" state.backend.rocksdb.checkpoint.transfer.thread.num: "8" state.backend.rocksdb.compaction.style: UNIVERSAL state.backend.rocksdb.localdir: /mnt/flink state.backend.rocksdb.thread.num: "8" state.backend.rocksdb.writebuffer.size: 128MB state.checkpoints.dir: abfss://<container-name>@<storage-account-name>. dfs.core.windows.net/test/checkpointing state.savepoints.dir: abfss://<container-name>@<storage-account-name>. dfs.core.windows.net/test/savepoints taskmanager.network.memory.buffer-debloat.enabled: "true" taskmanager.numberOfTaskSlots: "1" Best regards, Bjarke On Mon, Aug 19, 2024 at 1:18 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Please see my comments inline. > > G > > > On Thu, Aug 8, 2024 at 3:41 PM Bjarke Tornager <bjarketorna...@gmail.com> > wrote: > >> Hi Gabor, >> >> What kind of development work - is it the Disaggregated State Storage and >> Management FLIP ( >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855) >> that you are referring to? >> > Yes. > >> >> Thanks for the suggestion. I am not familiar with the state processor api >> but I will have a look. >> >> My state is not even that large(10-50 Gb). It seems like people are using >> RocksDB with Tb of state but I am a bit puzzled as to how they handle >> restarts from savepoint when it takes so long. Do you know? >> > Yeah, we know users with Tb of states and not easy to handle such jobs > because of the long restore times. > Practically the cluster must be strong enough to catch up the accumulated > events after the long restore time + some config tweaks which I've listed > in the next bullet point. > >> >> Are there some flink configurations that I might be able to try out to >> alleviate the slow restart that might be causing the exception? >> > What helps is "state.backend.local-recovery=true", some rocksDB tuning and > reducing state size if possible (better eviction, etc...). > >> >> Best regards, >> Bjarke >> >> On Thu, Aug 8, 2024 at 11:16 AM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> Hi Bjarke, >>> >>> It's normal to see longer recovery time as the state size grows. There >>> are developments in progress to mitigate this problem. >>> >>> > Any ideas as to what could be causing this? >>> I think there is no easy way to tell it, I can give just some pointers. >>> >>> First I would take a look at the state files with the State Processor >>> API[1], I would be really surprised though since ADLS is consistent, >>> only the rename with overwrite operation is not atomic what we don't do >>> in Flink. >>> >>> When the checkpoint files are not corrupt then I would add some debug >>> printouts to the exception catch area and try to find out the >>> exact root cause (since several Gb state can fit into memory maybe local >>> repro can be possible with debugger). >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/ >>> >>> BR, >>> G >>> >>> >>> On Wed, Aug 7, 2024 at 9:46 PM Bjarke Tornager <bjarketorna...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I am doing stateful streaming (keyed coprocessing) using rocksDB. When >>>> my job restarts to adjust parallelism, and restores from savepoint, I quite >>>> often get the following error: >>>> >>>> 2024-08-07 19:43:09 >>>> java.lang.Exception: Exception while creating >>>> StreamOperatorStateContext. >>>> at org.apache.flink.streaming.api.operators. >>>> StreamTaskStateInitializerImpl.streamOperatorStateContext( >>>> StreamTaskStateInitializerImpl.java:330) >>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator >>>> .initializeState(AbstractStreamOperator.java:275) >>>> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain >>>> .initializeStateAndOpenOperators(RegularOperatorChain.java:106) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>> .restoreStateAndGates(StreamTask.java:858) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>> .lambda$restoreInternal$5(StreamTask.java:812) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 >>>> .call(StreamTaskActionExecutor.java:55) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal( >>>> StreamTask.java:812) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore( >>>> StreamTask.java:771) >>>> at org.apache.flink.runtime.taskmanager.Task >>>> .runWithSystemExitMonitoring(Task.java:970) >>>> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task >>>> .java:939) >>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) >>>> at java.base/java.lang.Thread.run(Unknown Source) >>>> Caused by: org.apache.flink.util.FlinkException: Could not restore >>>> keyed state backend for >>>> KeyedCoProcessOperator_30465cf1c06f2daf053ed603d38be4b6_(1/2) from any >>>> of the 1 provided restore options. >>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>>> .createAndRestore(BackendRestorerProcedure.java:165) >>>> at org.apache.flink.streaming.api.operators. >>>> StreamTaskStateInitializerImpl.keyedStatedBackend( >>>> StreamTaskStateInitializerImpl.java:457) >>>> at org.apache.flink.streaming.api.operators. >>>> StreamTaskStateInitializerImpl.streamOperatorStateContext( >>>> StreamTaskStateInitializerImpl.java:203) >>>> ... 12 more >>>> Caused by: org.apache.flink.runtime.state.BackendBuildingException: >>>> Caught unexpected exception. >>>> at org.apache.flink.contrib.streaming.state. >>>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder >>>> .java:459) >>>> at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend >>>> .createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:533) >>>> at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend >>>> .createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:96) >>>> at org.apache.flink.streaming.api.operators. >>>> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3( >>>> StreamTaskStateInitializerImpl.java:446) >>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>>> .attemptCreateAndRestore(BackendRestorerProcedure.java:173) >>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>>> .createAndRestore(BackendRestorerProcedure.java:137) >>>> ... 14 more >>>> Caused by: java.io.EOFException >>>> at java.base/java.io.DataInputStream.readFully(Unknown Source) >>>> at java.base/java.io.DataInputStream.readFully(Unknown Source) >>>> at org.apache.flink.api.common.typeutils.base.array. >>>> BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer >>>> .java:82) >>>> at org.apache.flink.runtime.state.restore. >>>> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next( >>>> FullSnapshotRestoreOperation.java:298) >>>> at org.apache.flink.runtime.state.restore. >>>> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next( >>>> FullSnapshotRestoreOperation.java:273) >>>> at org.apache.flink.contrib.streaming.state.restore. >>>> RocksDBFullRestoreOperation.restoreKVStateData( >>>> RocksDBFullRestoreOperation.java:148) >>>> at org.apache.flink.contrib.streaming.state.restore. >>>> RocksDBFullRestoreOperation.applyRestoreResult( >>>> RocksDBFullRestoreOperation.java:128) >>>> at org.apache.flink.contrib.streaming.state.restore. >>>> RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java: >>>> 102) >>>> at org.apache.flink.contrib.streaming.state. >>>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder >>>> .java:376) >>>> ... 19 more >>>> >>>> However, most often the job is able to start up after several retries. >>>> It seems to happen more often with higher parallelism (and higher cpu) and >>>> as such with more pods. >>>> Additionally, as the state grows into the Gb size it takes longer for >>>> the job to start up and as such my job has longer downtime due to the many >>>> retries before it successfully restores state. >>>> >>>> Any ideas as to what could be causing this? >>>> >>>> I am using Apache Flink Operator 1.8, Flink 1.20 using the official >>>> Apache Flink image, Java 17, and Adls Gen2 for durable storage. >>>> >>>> Best regards, >>>> Bjarke >>>> >>>