Hi, I am doing stateful streaming (keyed coprocessing) using rocksDB. When my job restarts to adjust parallelism, and restores from savepoint, I quite often get the following error:
2024-08-07 19:43:09 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:330) at org.apache.flink.streaming.api.operators.AbstractStreamOperator .initializeState(AbstractStreamOperator.java:275) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain .initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates( StreamTask.java:858) at org.apache.flink.streaming.runtime.tasks.StreamTask .lambda$restoreInternal$5(StreamTask.java:812) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call( StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal( StreamTask.java:812) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask .java:771) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( Task.java:970) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Unknown Source) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedCoProcessOperator_30465cf1c06f2daf053ed603d38be4b6_(1 /2) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure .createAndRestore(BackendRestorerProcedure.java:165) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl .keyedStatedBackend(StreamTaskStateInitializerImpl.java:457) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:203) ... 12 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder .build(RocksDBKeyedStateBackendBuilder.java:459) at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend .createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:533) at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend .createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:96) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl .lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:446) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure .attemptCreateAndRestore(BackendRestorerProcedure.java:173) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure .createAndRestore(BackendRestorerProcedure.java:137) ... 14 more Caused by: java.io.EOFException at java.base/java.io.DataInputStream.readFully(Unknown Source) at java.base/java.io.DataInputStream.readFully(Unknown Source) at org.apache.flink.api.common.typeutils.base.array. BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java: 82) at org.apache.flink.runtime.state.restore. FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next( FullSnapshotRestoreOperation.java:298) at org.apache.flink.runtime.state.restore. FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next( FullSnapshotRestoreOperation.java:273) at org.apache.flink.contrib.streaming.state.restore. RocksDBFullRestoreOperation.restoreKVStateData(RocksDBFullRestoreOperation .java:148) at org.apache.flink.contrib.streaming.state.restore. RocksDBFullRestoreOperation.applyRestoreResult(RocksDBFullRestoreOperation .java:128) at org.apache.flink.contrib.streaming.state.restore. RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:102) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder .build(RocksDBKeyedStateBackendBuilder.java:376) ... 19 more However, most often the job is able to start up after several retries. It seems to happen more often with higher parallelism (and higher cpu) and as such with more pods. Additionally, as the state grows into the Gb size it takes longer for the job to start up and as such my job has longer downtime due to the many retries before it successfully restores state. Any ideas as to what could be causing this? I am using Apache Flink Operator 1.8, Flink 1.20 using the official Apache Flink image, Java 17, and Adls Gen2 for durable storage. Best regards, Bjarke