Hi Ganti, If you could ensure that newer class could keep backwards compatibility as previous class, you can try to set serialVesionUID explicitly of current class to -7317586767482317266.
If you want to avoid such issue later, you must set the serialVesionUID explicitly first if not using customized serializer for those classes. Another better solution is to ensure the class backwards compatibility with customized serializer or leverage apache avro. You could refer to [1] for more details. [1] https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html Best Yun Tang ________________________________ From: Yashwant Ganti <yashwan...@gmail.com> Sent: Thursday, May 27, 2021 1:14 To: user@flink.apache.org <user@flink.apache.org> Subject: Error restarting job from Savepoint Hello, We are facing an error restarting a job from a savepoint. We believe it is because one of the common classes used across all of our jobs was changed but there was no serialVersionUID assigned to the class. There error we are facing is java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.base/java.lang.Thread.run(Unknown Source) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for SplittableDoFnOperator_60af72bbf6b3989cb3e849280faa23d8_(2/4) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:115) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:559) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:101) at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:181) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 11 more Caused by: java.io<http://java.io>.InvalidClassException: com.****.******.*******; local class incompatible: stream classdesc serialVersionUID = -7317586767482317266, local class serialVersionUID = -8797204481428423223 at java.base/java.io<http://java.io>.ObjectStreamClass.initNonProxy(Unknown Source) at java.base/java.io<http://java.io>.ObjectInputStream.readNonProxyDesc(Unknown Source) at java.base/java.io<http://java.io>.ObjectInputStream.readClassDesc(Unknown Source) at java.base/java.io<http://java.io>.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.base/java.io<http://java.io>.ObjectInputStream.readObject0(Unknown Source) at java.base/java.io<http://java.io>.ObjectInputStream.defaultReadFields(Unknown Source) at java.base/java.io<http://java.io>.ObjectInputStream.readSerialData(Unknown Source) at java.base/java.io<http://java.io>.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.base/java.io<http://java.io>.ObjectInputStream.readObject0(Unknown Source) at java.base/java.io<http://java.io>.ObjectInputStream.readObject(Unknown Source) at java.base/java.io<http://java.io>.ObjectInputStream.readObject(Unknown Source) at org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:194) at org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:54) at org.apache.beam.sdk.io<http://org.apache.beam.sdk.io>.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(Read.java:669) at org.apache.beam.sdk.io<http://org.apache.beam.sdk.io>.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(Read.java:642) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:118) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77) at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:289) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:323) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:285) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:172) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:112) ... 17 more The change was to have the failing class implement serializable. My questions are * What are our options now to get this job to restart? In the non production environments we can delete the savepoint but we really don't want to do that in production * Any best practices/guidance to follow to avoid such issues in the future. Should we have just implemented a serialVersionUID for the class? Should we/can we write custom serializers/deserializers for this class. The class is just a factory which creates connection objects so we normally don't think twice in standard java applications Any help is appreciated. Thanks!