Hi,

While developing a job we mistakenly imported flink-avro as a dependency
and then we did some cleanups. Sadly it seems that flink-avro has
registered some kryo serializers that are now required to load the
savepoints even though we do not use the functionalities offered by this
module.
The error is (this is using flink 1.12.1):

java.lang.Exception: Exception while creating StreamOperatorStateContext.
        at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_b1a2a2523a4642215643a6a4e58f0d05_(1/1) from
any of the 1 provided restore options.
        at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
        at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:285)
        at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:173)
        ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
        at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
        at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:607)
        at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:276)
        at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
        at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
        ... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key
'org.apache.avro.generic.GenericData$Array'
        at
org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:186)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:90)
        at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at
java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
        at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550)
        at
java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
        at
java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517)
        at
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:221)
        at
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:80)
        at
org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:219)
        at
org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:186)
        at
org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:161)
        at
org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:112)
        at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:93)
        at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80)
        ... 15 more

Are there ways to circumvent this situation without involving rewriting the
savepoints with the state-processor-api?

Thanks!

--David

Reply via email to