Indeed, when flink-avro is on the classpath we automatically register 1 serializer with Kryo.

There is no switch to ignore this error or to exclude the Avro serializer somehow. As such you'll either need to rewrite the savepoint, with either the state-processing-api or by creating a slightly modified version of Flink (multiple options on how to implement it; but it would always mean restoring once on the custom version with Avro on the classpath, taking a savepoint, and then going back to the original version without avro).

On 11/02/2022 10:01, David Causse wrote:
Hi,

While developing a job we mistakenly imported flink-avro as a dependency and then we did some cleanups. Sadly it seems that flink-avro has registered some kryo serializers that are now required to load the savepoints even though we do not use the functionalities offered by this module.
The error is (this is using flink 1.12.1):

java.lang.Exception: Exception while creating StreamOperatorStateContext.
        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)         at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)         at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)         at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)         at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_b1a2a2523a4642215643a6a4e58f0d05_(1/1) from any of the 1 provided restore options.         at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)         at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:285)         at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:173)
        ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend         at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)         at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:607)         at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:276)         at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)         at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
        ... 11 more
Caused by: java.lang.IllegalStateException: Missing value for the key 'org.apache.avro.generic.GenericData$Array'         at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:186)         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:90)         at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)         at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)         at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)         at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)         at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550)         at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)         at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517)         at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:221)         at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:80)         at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:219)         at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:186)         at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:161)         at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:112)         at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:93)         at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80)
        ... 15 more

Are there ways to circumvent this situation without involving rewriting the savepoints with the state-processor-api?

Thanks!

--David


Reply via email to