Hello, We are facing an error restarting a job from a savepoint. We believe it is because one of the common classes used across all of our jobs was changed but there was no *serialVersionUID* assigned to the class. There error we are facing is
java.lang.Exception: Exception while creating StreamOperatorStateContext. > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.streamOperatorStateContext( > StreamTaskStateInitializerImpl.java:254) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > .initializeState(AbstractStreamOperator.java:272) > at org.apache.flink.streaming.runtime.tasks.OperatorChain > .initializeStateAndOpenOperators(OperatorChain.java:425) > at org.apache.flink.streaming.runtime.tasks.StreamTask > .lambda$beforeInvoke$2(StreamTask.java:535) > at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 > .runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > StreamTask.java:525) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:565) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.base/java.lang.Thread.run(Unknown Source) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for SplittableDoFnOperator_60af72bbf6b3989cb3e849280faa23d8_ > (2/4) from any of the 1 provided restore options. > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .createAndRestore(BackendRestorerProcedure.java:160) > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.keyedStatedBackend( > StreamTaskStateInitializerImpl.java:345) > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.streamOperatorStateContext( > StreamTaskStateInitializerImpl.java:163) > ... 9 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend > at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder > .build(HeapKeyedStateBackendBuilder.java:115) > at org.apache.flink.runtime.state.filesystem.FsStateBackend > .createKeyedStateBackend(FsStateBackend.java:559) > at org.apache.flink.runtime.state.filesystem.FsStateBackend > .createKeyedStateBackend(FsStateBackend.java:101) > at org.apache.flink.runtime.state.StateBackend > .createKeyedStateBackend(StateBackend.java:181) > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1( > StreamTaskStateInitializerImpl.java:328) > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .attemptCreateAndRestore(BackendRestorerProcedure.java:168) > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .createAndRestore(BackendRestorerProcedure.java:135) > ... 11 more > Caused by: java.io.InvalidClassException: com.****.******.*******; local > class incompatible: stream classdesc serialVersionUID = - > 7317586767482317266, local class serialVersionUID = -8797204481428423223 > at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source) > at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source > ) > at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source) > at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown > Source) > at java.base/java.io.ObjectInputStream.readObject0(Unknown Source) > at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown > Source) > at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source) > at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown > Source) > at java.base/java.io.ObjectInputStream.readObject0(Unknown Source) > at java.base/java.io.ObjectInputStream.readObject(Unknown Source) > at java.base/java.io.ObjectInputStream.readObject(Unknown Source) > at org.apache.beam.sdk.coders.SerializableCoder.decode( > SerializableCoder.java:194) > at org.apache.beam.sdk.coders.SerializableCoder.decode( > SerializableCoder.java:54) > at org.apache.beam.sdk.io. > Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode( > Read.java:669) > at org.apache.beam.sdk.io. > Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode( > Read.java:642) > at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer > .deserialize(CoderTypeSerializer.java:118) > at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders > .lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77) > at org.apache.flink.runtime.state. > KeyGroupPartitioner$PartitioningResultKeyGroupReader > .readMappingsInKeyGroup(KeyGroupPartitioner.java:289) > at org.apache.flink.runtime.state.heap.HeapRestoreOperation > .readKeyGroupStateData(HeapRestoreOperation.java:323) > at org.apache.flink.runtime.state.heap.HeapRestoreOperation > .readStateHandleStateData(HeapRestoreOperation.java:285) > at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore( > HeapRestoreOperation.java:172) > at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder > .build(HeapKeyedStateBackendBuilder.java:112) > ... 17 more The change was to have the failing class implement *serializable. *My questions are - What are our options now to get this job to restart? In the non production environments we can delete the savepoint but we really don't want to do that in production - Any best practices/guidance to follow to avoid such issues in the future. Should we have just implemented a serialVersionUID for the class? Should we/can we write custom serializers/deserializers for this class. The class is just a factory which creates connection objects so we normally don't think twice in standard java applications Any help is appreciated. Thanks!