Hi Yashwant Ganti, > Caused by: java.io.InvalidClassException: com.****.******.*******; local > class incompatible: stream classdesc serialVersionUID = -7317586767482317266, > local class serialVersionUID = -8797204481428423223 1. Please try this way: find the com.****.******.*******, add `private static final long serialVersionUID = -7317586767482317266` (which is old serialVersionUID mentioned in the above error message). Then repackage your jar, restart job with new jar. 2. Serializable classes must define a Serial Version UID, please see(https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization). Please add a serialVersionUID for Serializable classes, especially those which would take part in checkpointing/savepointing.
Best, JING ZHANG Yashwant Ganti <yashwan...@gmail.com> 于2021年5月27日周四 上午1:15写道: > > Hello, > > We are facing an error restarting a job from a savepoint. We believe it is > because one of the common classes used across all of our jobs was changed but > there was no serialVersionUID assigned to the class. There error we are > facing is > >> java.lang.Exception: Exception while creating StreamOperatorStateContext. >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >> at java.base/java.lang.Thread.run(Unknown Source) >> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed >> state backend for >> SplittableDoFnOperator_60af72bbf6b3989cb3e849280faa23d8_(2/4) from any of >> the 1 provided restore options. >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) >> ... 9 more >> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed >> when trying to restore heap backend >> at >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:115) >> at >> org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:559) >> at >> org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:101) >> at >> org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:181) >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) >> ... 11 more >> Caused by: java.io.InvalidClassException: com.****.******.*******; local >> class incompatible: stream classdesc serialVersionUID = >> -7317586767482317266, local class serialVersionUID = -8797204481428423223 >> at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source) >> at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) >> at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source) >> at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) >> at java.base/java.io.ObjectInputStream.readObject0(Unknown Source) >> at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source) >> at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source) >> at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) >> at java.base/java.io.ObjectInputStream.readObject0(Unknown Source) >> at java.base/java.io.ObjectInputStream.readObject(Unknown Source) >> at java.base/java.io.ObjectInputStream.readObject(Unknown Source) >> at >> org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:194) >> at >> org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:54) >> at >> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(Read.java:669) >> at >> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(Read.java:642) >> at >> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:118) >> at >> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77) >> at >> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:289) >> at >> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:323) >> at >> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:285) >> at >> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:172) >> at >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:112) >> ... 17 more > > > The change was to have the failing class implement serializable. My questions > are > > What are our options now to get this job to restart? In the non production > environments we can delete the savepoint but we really don't want to do that > in production > Any best practices/guidance to follow to avoid such issues in the future. > Should we have just implemented a serialVersionUID for the class? Should > we/can we write custom serializers/deserializers for this class. The class is > just a factory which creates connection objects so we normally don't think > twice in standard java applications > > Any help is appreciated. > > Thanks!