Hi Gabor, Returning to your answer from a while ago. Since writing to you about handling 10s GB of state with Apache Flink I am now deploying jobs with multiple terabytes of state.
The problems that I outlined in my initial email have been handled by doing some of the tuning that you suggested - thanks a lot for the suggestions! I am now able to take a savepoint of rocksdb state size of 100s of GB and restart the job successfully (restart is between 5-7 minutes, which is okay). But I have also started to deploy some jobs that have multiple terabytes of state. Taking a savepoint of such a large state times out within 15 minutes - I am doing parallelism 8 (NumberOfTaskSlots: "1") with machines that each have 250gb of ram and 28vcpu in k8s. I could probably try to increase the timeout much more, however, the job recovers fine from the checkpoints so I am okay with that for now - although if you have some good suggestions for what I can do with Flink 1.20 to handle savepoints of this size it would be greatly appreciated. However, I was thinking about these issues of doing a savepoint and restarting from a savepoint in terms of the disaggregated state management in the upcoming Flink 2.0. Will the disaggregated state management completely solve this problem of restoring from savepoint and if so how? My understanding is that the savepoint still needs to be packaged under the savepoint directory in remote storage, just no need for upload/download or? This is my flinkConfig: flinkConfiguration: env.java.opts.all: --add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.time=ALL-UNNAMED --add-opens java.base/java.nio=ALL-UNNAMED execution.checkpointing.interval: "120000" execution.checkpointing.min-pause: "300000" execution.checkpointing.timeout: "900000" execution.checkpointing.unaligned: "true" fs.default-scheme: abfss://x...@y.dfs.core.windows.net high-availability.storageDir: abfss:// x...@y.dfs.core.windows.net/some-product/ha high-availability.type: kubernetes pipeline.force-avro: "false" pipeline.max-parallelism: "32" state.backend: rocksdb state.backend.incremental: "true" state.backend.rocksdb.checkpoint.transfer.thread.num: "28" state.backend.rocksdb.compaction.style: LEVEL state.backend.rocksdb.localdir: /mnt/flink state.backend.rocksdb.thread.num: "28" state.backend.rocksdb.use-bloom-filter: "true" state.backend.rocksdb.writebuffer.size: 128MB state.checkpoints.dir: abfss:// x...@y.dfs.core.windows.net/some-product/checkpointing state.savepoints.dir: abfss:// x...@y.dfs.core.windows.net/some-product/savepoints taskmanager.memory.framework.off-heap.size: 6g taskmanager.memory.task.off-heap.size: 24g taskmanager.network.memory.buffer-debloat.enabled: "true" taskmanager.numberOfTaskSlots: "1" Best, Bjarke On Mon, Aug 19, 2024 at 1:18 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Please see my comments inline. > > G > > > On Thu, Aug 8, 2024 at 3:41 PM Bjarke Tornager <bjarketorna...@gmail.com> > wrote: > >> Hi Gabor, >> >> What kind of development work - is it the Disaggregated State Storage and >> Management FLIP ( >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855) >> that you are referring to? >> > Yes. > >> >> Thanks for the suggestion. I am not familiar with the state processor api >> but I will have a look. >> >> My state is not even that large(10-50 Gb). It seems like people are using >> RocksDB with Tb of state but I am a bit puzzled as to how they handle >> restarts from savepoint when it takes so long. Do you know? >> > Yeah, we know users with Tb of states and not easy to handle such jobs > because of the long restore times. > Practically the cluster must be strong enough to catch up the accumulated > events after the long restore time + some config tweaks which I've listed > in the next bullet point. > >> >> Are there some flink configurations that I might be able to try out to >> alleviate the slow restart that might be causing the exception? >> > What helps is "state.backend.local-recovery=true", some rocksDB tuning and > reducing state size if possible (better eviction, etc...). > >> >> Best regards, >> Bjarke >> >> On Thu, Aug 8, 2024 at 11:16 AM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> Hi Bjarke, >>> >>> It's normal to see longer recovery time as the state size grows. There >>> are developments in progress to mitigate this problem. >>> >>> > Any ideas as to what could be causing this? >>> I think there is no easy way to tell it, I can give just some pointers. >>> >>> First I would take a look at the state files with the State Processor >>> API[1], I would be really surprised though since ADLS is consistent, >>> only the rename with overwrite operation is not atomic what we don't do >>> in Flink. >>> >>> When the checkpoint files are not corrupt then I would add some debug >>> printouts to the exception catch area and try to find out the >>> exact root cause (since several Gb state can fit into memory maybe local >>> repro can be possible with debugger). >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/ >>> >>> BR, >>> G >>> >>> >>> On Wed, Aug 7, 2024 at 9:46 PM Bjarke Tornager <bjarketorna...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I am doing stateful streaming (keyed coprocessing) using rocksDB. When >>>> my job restarts to adjust parallelism, and restores from savepoint, I quite >>>> often get the following error: >>>> >>>> 2024-08-07 19:43:09 >>>> java.lang.Exception: Exception while creating >>>> StreamOperatorStateContext. >>>> at org.apache.flink.streaming.api.operators. >>>> StreamTaskStateInitializerImpl.streamOperatorStateContext( >>>> StreamTaskStateInitializerImpl.java:330) >>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator >>>> .initializeState(AbstractStreamOperator.java:275) >>>> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain >>>> .initializeStateAndOpenOperators(RegularOperatorChain.java:106) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>> .restoreStateAndGates(StreamTask.java:858) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>> .lambda$restoreInternal$5(StreamTask.java:812) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 >>>> .call(StreamTaskActionExecutor.java:55) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal( >>>> StreamTask.java:812) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore( >>>> StreamTask.java:771) >>>> at org.apache.flink.runtime.taskmanager.Task >>>> .runWithSystemExitMonitoring(Task.java:970) >>>> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task >>>> .java:939) >>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) >>>> at java.base/java.lang.Thread.run(Unknown Source) >>>> Caused by: org.apache.flink.util.FlinkException: Could not restore >>>> keyed state backend for >>>> KeyedCoProcessOperator_30465cf1c06f2daf053ed603d38be4b6_(1/2) from any >>>> of the 1 provided restore options. >>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>>> .createAndRestore(BackendRestorerProcedure.java:165) >>>> at org.apache.flink.streaming.api.operators. >>>> StreamTaskStateInitializerImpl.keyedStatedBackend( >>>> StreamTaskStateInitializerImpl.java:457) >>>> at org.apache.flink.streaming.api.operators. >>>> StreamTaskStateInitializerImpl.streamOperatorStateContext( >>>> StreamTaskStateInitializerImpl.java:203) >>>> ... 12 more >>>> Caused by: org.apache.flink.runtime.state.BackendBuildingException: >>>> Caught unexpected exception. >>>> at org.apache.flink.contrib.streaming.state. >>>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder >>>> .java:459) >>>> at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend >>>> .createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:533) >>>> at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend >>>> .createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:96) >>>> at org.apache.flink.streaming.api.operators. >>>> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3( >>>> StreamTaskStateInitializerImpl.java:446) >>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>>> .attemptCreateAndRestore(BackendRestorerProcedure.java:173) >>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>>> .createAndRestore(BackendRestorerProcedure.java:137) >>>> ... 14 more >>>> Caused by: java.io.EOFException >>>> at java.base/java.io.DataInputStream.readFully(Unknown Source) >>>> at java.base/java.io.DataInputStream.readFully(Unknown Source) >>>> at org.apache.flink.api.common.typeutils.base.array. >>>> BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer >>>> .java:82) >>>> at org.apache.flink.runtime.state.restore. >>>> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next( >>>> FullSnapshotRestoreOperation.java:298) >>>> at org.apache.flink.runtime.state.restore. >>>> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next( >>>> FullSnapshotRestoreOperation.java:273) >>>> at org.apache.flink.contrib.streaming.state.restore. >>>> RocksDBFullRestoreOperation.restoreKVStateData( >>>> RocksDBFullRestoreOperation.java:148) >>>> at org.apache.flink.contrib.streaming.state.restore. >>>> RocksDBFullRestoreOperation.applyRestoreResult( >>>> RocksDBFullRestoreOperation.java:128) >>>> at org.apache.flink.contrib.streaming.state.restore. >>>> RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java: >>>> 102) >>>> at org.apache.flink.contrib.streaming.state. >>>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder >>>> .java:376) >>>> ... 19 more >>>> >>>> However, most often the job is able to start up after several retries. >>>> It seems to happen more often with higher parallelism (and higher cpu) and >>>> as such with more pods. >>>> Additionally, as the state grows into the Gb size it takes longer for >>>> the job to start up and as such my job has longer downtime due to the many >>>> retries before it successfully restores state. >>>> >>>> Any ideas as to what could be causing this? >>>> >>>> I am using Apache Flink Operator 1.8, Flink 1.20 using the official >>>> Apache Flink image, Java 17, and Adls Gen2 for durable storage. >>>> >>>> Best regards, >>>> Bjarke >>>> >>>