Hi Bjarke, It's normal to see longer recovery time as the state size grows. There are developments in progress to mitigate this problem.
> Any ideas as to what could be causing this? I think there is no easy way to tell it, I can give just some pointers. First I would take a look at the state files with the State Processor API[1], I would be really surprised though since ADLS is consistent, only the rename with overwrite operation is not atomic what we don't do in Flink. When the checkpoint files are not corrupt then I would add some debug printouts to the exception catch area and try to find out the exact root cause (since several Gb state can fit into memory maybe local repro can be possible with debugger). [1] https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/ BR, G On Wed, Aug 7, 2024 at 9:46 PM Bjarke Tornager <bjarketorna...@gmail.com> wrote: > Hi, > > I am doing stateful streaming (keyed coprocessing) using rocksDB. When my > job restarts to adjust parallelism, and restores from savepoint, I quite > often get the following error: > > 2024-08-07 19:43:09 > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:330) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > .initializeState(AbstractStreamOperator.java:275) > at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain > .initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .restoreStateAndGates(StreamTask.java:858) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .lambda$restoreInternal$5(StreamTask.java:812) > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > .call(StreamTaskActionExecutor.java:55) > at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal( > StreamTask.java:812) > at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask > .java:771) > at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( > Task.java:970) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java: > 939) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for KeyedCoProcessOperator_30465cf1c06f2daf053ed603d38be4b6_ > (1/2) from any of the 1 provided restore options. > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .createAndRestore(BackendRestorerProcedure.java:165) > at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:457) > at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:203) > ... 12 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught > unexpected exception. > at org.apache.flink.contrib.streaming.state. > RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder > .java:459) > at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend > .createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:533) > at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend > .createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:96) > at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl > .lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:446) > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .attemptCreateAndRestore(BackendRestorerProcedure.java:173) > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .createAndRestore(BackendRestorerProcedure.java:137) > ... 14 more > Caused by: java.io.EOFException > at java.base/java.io.DataInputStream.readFully(Unknown Source) > at java.base/java.io.DataInputStream.readFully(Unknown Source) > at org.apache.flink.api.common.typeutils.base.array. > BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer > .java:82) > at org.apache.flink.runtime.state.restore. > FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next( > FullSnapshotRestoreOperation.java:298) > at org.apache.flink.runtime.state.restore. > FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next( > FullSnapshotRestoreOperation.java:273) > at org.apache.flink.contrib.streaming.state.restore. > RocksDBFullRestoreOperation.restoreKVStateData(RocksDBFullRestoreOperation > .java:148) > at org.apache.flink.contrib.streaming.state.restore. > RocksDBFullRestoreOperation.applyRestoreResult(RocksDBFullRestoreOperation > .java:128) > at org.apache.flink.contrib.streaming.state.restore. > RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:102) > at org.apache.flink.contrib.streaming.state. > RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder > .java:376) > ... 19 more > > However, most often the job is able to start up after several retries. It > seems to happen more often with higher parallelism (and higher cpu) and as > such with more pods. > Additionally, as the state grows into the Gb size it takes longer for the > job to start up and as such my job has longer downtime due to the many > retries before it successfully restores state. > > Any ideas as to what could be causing this? > > I am using Apache Flink Operator 1.8, Flink 1.20 using the official Apache > Flink image, Java 17, and Adls Gen2 for durable storage. > > Best regards, > Bjarke >