Please see my comments inline. G
On Thu, Aug 8, 2024 at 3:41 PM Bjarke Tornager <bjarketorna...@gmail.com> wrote: > Hi Gabor, > > What kind of development work - is it the Disaggregated State Storage and > Management FLIP ( > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855) > that you are referring to? > Yes. > > Thanks for the suggestion. I am not familiar with the state processor api > but I will have a look. > > My state is not even that large(10-50 Gb). It seems like people are using > RocksDB with Tb of state but I am a bit puzzled as to how they handle > restarts from savepoint when it takes so long. Do you know? > Yeah, we know users with Tb of states and not easy to handle such jobs because of the long restore times. Practically the cluster must be strong enough to catch up the accumulated events after the long restore time + some config tweaks which I've listed in the next bullet point. > > Are there some flink configurations that I might be able to try out to > alleviate the slow restart that might be causing the exception? > What helps is "state.backend.local-recovery=true", some rocksDB tuning and reducing state size if possible (better eviction, etc...). > > Best regards, > Bjarke > > On Thu, Aug 8, 2024 at 11:16 AM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> Hi Bjarke, >> >> It's normal to see longer recovery time as the state size grows. There >> are developments in progress to mitigate this problem. >> >> > Any ideas as to what could be causing this? >> I think there is no easy way to tell it, I can give just some pointers. >> >> First I would take a look at the state files with the State Processor >> API[1], I would be really surprised though since ADLS is consistent, >> only the rename with overwrite operation is not atomic what we don't do >> in Flink. >> >> When the checkpoint files are not corrupt then I would add some debug >> printouts to the exception catch area and try to find out the >> exact root cause (since several Gb state can fit into memory maybe local >> repro can be possible with debugger). >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/ >> >> BR, >> G >> >> >> On Wed, Aug 7, 2024 at 9:46 PM Bjarke Tornager <bjarketorna...@gmail.com> >> wrote: >> >>> Hi, >>> >>> I am doing stateful streaming (keyed coprocessing) using rocksDB. When >>> my job restarts to adjust parallelism, and restores from savepoint, I quite >>> often get the following error: >>> >>> 2024-08-07 19:43:09 >>> java.lang.Exception: Exception while creating StreamOperatorStateContext >>> . >>> at org.apache.flink.streaming.api.operators. >>> StreamTaskStateInitializerImpl.streamOperatorStateContext( >>> StreamTaskStateInitializerImpl.java:330) >>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator >>> .initializeState(AbstractStreamOperator.java:275) >>> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain >>> .initializeStateAndOpenOperators(RegularOperatorChain.java:106) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>> .restoreStateAndGates(StreamTask.java:858) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>> .lambda$restoreInternal$5(StreamTask.java:812) >>> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 >>> .call(StreamTaskActionExecutor.java:55) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal( >>> StreamTask.java:812) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore( >>> StreamTask.java:771) >>> at org.apache.flink.runtime.taskmanager.Task >>> .runWithSystemExitMonitoring(Task.java:970) >>> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java: >>> 939) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) >>> at java.base/java.lang.Thread.run(Unknown Source) >>> Caused by: org.apache.flink.util.FlinkException: Could not restore >>> keyed state backend for >>> KeyedCoProcessOperator_30465cf1c06f2daf053ed603d38be4b6_(1/2) from any >>> of the 1 provided restore options. >>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>> .createAndRestore(BackendRestorerProcedure.java:165) >>> at org.apache.flink.streaming.api.operators. >>> StreamTaskStateInitializerImpl.keyedStatedBackend( >>> StreamTaskStateInitializerImpl.java:457) >>> at org.apache.flink.streaming.api.operators. >>> StreamTaskStateInitializerImpl.streamOperatorStateContext( >>> StreamTaskStateInitializerImpl.java:203) >>> ... 12 more >>> Caused by: org.apache.flink.runtime.state.BackendBuildingException: >>> Caught unexpected exception. >>> at org.apache.flink.contrib.streaming.state. >>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder >>> .java:459) >>> at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend >>> .createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:533) >>> at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend >>> .createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:96) >>> at org.apache.flink.streaming.api.operators. >>> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3( >>> StreamTaskStateInitializerImpl.java:446) >>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>> .attemptCreateAndRestore(BackendRestorerProcedure.java:173) >>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>> .createAndRestore(BackendRestorerProcedure.java:137) >>> ... 14 more >>> Caused by: java.io.EOFException >>> at java.base/java.io.DataInputStream.readFully(Unknown Source) >>> at java.base/java.io.DataInputStream.readFully(Unknown Source) >>> at org.apache.flink.api.common.typeutils.base.array. >>> BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer >>> .java:82) >>> at org.apache.flink.runtime.state.restore. >>> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next( >>> FullSnapshotRestoreOperation.java:298) >>> at org.apache.flink.runtime.state.restore. >>> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next( >>> FullSnapshotRestoreOperation.java:273) >>> at org.apache.flink.contrib.streaming.state.restore. >>> RocksDBFullRestoreOperation.restoreKVStateData( >>> RocksDBFullRestoreOperation.java:148) >>> at org.apache.flink.contrib.streaming.state.restore. >>> RocksDBFullRestoreOperation.applyRestoreResult( >>> RocksDBFullRestoreOperation.java:128) >>> at org.apache.flink.contrib.streaming.state.restore. >>> RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:102 >>> ) >>> at org.apache.flink.contrib.streaming.state. >>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder >>> .java:376) >>> ... 19 more >>> >>> However, most often the job is able to start up after several retries. >>> It seems to happen more often with higher parallelism (and higher cpu) and >>> as such with more pods. >>> Additionally, as the state grows into the Gb size it takes longer for >>> the job to start up and as such my job has longer downtime due to the many >>> retries before it successfully restores state. >>> >>> Any ideas as to what could be causing this? >>> >>> I am using Apache Flink Operator 1.8, Flink 1.20 using the official >>> Apache Flink image, Java 17, and Adls Gen2 for durable storage. >>> >>> Best regards, >>> Bjarke >>> >>