Hi,

I am doing stateful streaming (keyed coprocessing) using rocksDB. When my
job restarts to adjust parallelism, and restores from savepoint, I quite
often get the following error:

2024-08-07 19:43:09
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:330)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.initializeState(AbstractStreamOperator.java:275)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(
StreamTask.java:858)
at org.apache.flink.streaming.runtime.tasks.StreamTask
.lambda$restoreInternal$5(StreamTask.java:812)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(
StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(
StreamTask.java:812)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask
.java:771)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for KeyedCoProcessOperator_30465cf1c06f2daf053ed603d38be4b6_(1
/2) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:165)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.keyedStatedBackend(StreamTaskStateInitializerImpl.java:457)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:203)
... 12 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder
.build(RocksDBKeyedStateBackendBuilder.java:459)
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:533)
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:96)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:446)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:137)
... 14 more
Caused by: java.io.EOFException
at java.base/java.io.DataInputStream.readFully(Unknown Source)
at java.base/java.io.DataInputStream.readFully(Unknown Source)
at org.apache.flink.api.common.typeutils.base.array.
BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:
82)
at org.apache.flink.runtime.state.restore.
FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next(
FullSnapshotRestoreOperation.java:298)
at org.apache.flink.runtime.state.restore.
FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next(
FullSnapshotRestoreOperation.java:273)
at org.apache.flink.contrib.streaming.state.restore.
RocksDBFullRestoreOperation.restoreKVStateData(RocksDBFullRestoreOperation
.java:148)
at org.apache.flink.contrib.streaming.state.restore.
RocksDBFullRestoreOperation.applyRestoreResult(RocksDBFullRestoreOperation
.java:128)
at org.apache.flink.contrib.streaming.state.restore.
RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:102)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder
.build(RocksDBKeyedStateBackendBuilder.java:376)
... 19 more

However, most often the job is able to start up after several retries. It
seems to happen more often with higher parallelism (and higher cpu) and as
such with more pods.
Additionally, as the state grows into the Gb size it takes longer for the
job to start up and as such my job has longer downtime due to the many
retries before it successfully restores state.

Any ideas as to what could be causing this?

I am using Apache Flink Operator 1.8, Flink 1.20 using the official Apache
Flink image, Java 17, and Adls Gen2 for durable storage.

Best regards,
Bjarke

Reply via email to