Hello,

We are facing an error restarting a job from a savepoint. We believe it is
because one of the common classes used across all of our jobs was changed
but there was no *serialVersionUID* assigned to the class. There error we
are facing is

java.lang.Exception: Exception while creating StreamOperatorStateContext.
>     at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:254)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:272)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .initializeStateAndOpenOperators(OperatorChain.java:425)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$beforeInvoke$2(StreamTask.java:535)
>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> .runThrowing(StreamTaskActionExecutor.java:50)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:525)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:565)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>     at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for SplittableDoFnOperator_60af72bbf6b3989cb3e849280faa23d8_
> (2/4) from any of the 1 provided restore options.
>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:160)
>     at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.keyedStatedBackend(
> StreamTaskStateInitializerImpl.java:345)
>     at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:163)
>     ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
> when trying to restore heap backend
>     at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> .build(HeapKeyedStateBackendBuilder.java:115)
>     at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:559)
>     at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:101)
>     at org.apache.flink.runtime.state.StateBackend
> .createKeyedStateBackend(StateBackend.java:181)
>     at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
> StreamTaskStateInitializerImpl.java:328)
>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:135)
>     ... 11 more
> Caused by: java.io.InvalidClassException: com.****.******.*******; local
> class incompatible: stream classdesc serialVersionUID = -
> 7317586767482317266, local class serialVersionUID = -8797204481428423223
>     at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source
> )
>     at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown
> Source)
>     at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
>     at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown
> Source)
>     at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown
> Source)
>     at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
>     at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
>     at org.apache.beam.sdk.coders.SerializableCoder.decode(
> SerializableCoder.java:194)
>     at org.apache.beam.sdk.coders.SerializableCoder.decode(
> SerializableCoder.java:54)
>     at org.apache.beam.sdk.io.
> Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(
> Read.java:669)
>     at org.apache.beam.sdk.io.
> Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(
> Read.java:642)
>     at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer
> .deserialize(CoderTypeSerializer.java:118)
>     at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders
> .lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
>     at org.apache.flink.runtime.state.
> KeyGroupPartitioner$PartitioningResultKeyGroupReader
> .readMappingsInKeyGroup(KeyGroupPartitioner.java:289)
>     at org.apache.flink.runtime.state.heap.HeapRestoreOperation
> .readKeyGroupStateData(HeapRestoreOperation.java:323)
>     at org.apache.flink.runtime.state.heap.HeapRestoreOperation
> .readStateHandleStateData(HeapRestoreOperation.java:285)
>     at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(
> HeapRestoreOperation.java:172)
>     at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> .build(HeapKeyedStateBackendBuilder.java:112)
>     ... 17 more


The change was to have the failing class implement *serializable. *My
questions are

   - What are our options now to get this job to restart? In the non
   production environments we can delete the savepoint but we really don't
   want to do that in production
   - Any best practices/guidance to follow to avoid such issues in the
   future. Should we have just implemented a serialVersionUID for the class?
   Should we/can we write custom serializers/deserializers for this class. The
   class is just a factory which creates connection objects so we normally
   don't think twice in standard java applications

Any help is appreciated.

Thanks!

Reply via email to