Hi!
We use flink-1.7.1 and have some problems with restoring from savepoint. We use custom kryo serializer which relies on protobuf representation of our model classes. It had been working fine but when we made some change in our model class it broke because of changed serialVersionUID. We can see this message in the log:
Caused by: java.lang.IllegalStateException: Could not Java-deserialize TypeSerializer while restoring checkpoint metadata for serializer snapshot 'org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer$KryoSerializerConfigSnapshot'. Please update to the TypeSerializerSnapshot interface that removes Java Serialization to avoid this problem in the future.
I found that method snapshotConfiguration of org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer returns instance of KryoSerializerConfigSnapshot. And this class for some reason extends deprecated TypeSerializerConfigSnapshot which relies on java serialization.
Of course we have fixed our problem just by adding special serialVersionUID to our class. But it seems strange to have problems with java serialization while our serializer doesn't use this mechanism. Do you plan to fix this problem?
Full stack trace below:
java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for StreamGroupedReduce_5d41c2bc0b6f18591a40bd21a3e516cd_(2/2) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:284) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) ... 5 more Caused by: java.lang.IllegalStateException: Could not Java-deserialize TypeSerializer while restoring checkpoint metadata for serializer snapshot 'org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer$KryoSerializerConfigSnapshot'. Please update to the TypeSerializerSnapshot interface that removes Java Serialization to avoid this problem in the future. at org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.restoreSerializer(TypeSerializerConfigSnapshot.java:138) at org.apache.flink.runtime.state.StateSerializerProvider$RestoredStateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:212) at org.apache.flink.runtime.state.StateSerializerProvider$RestoredStateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:188) at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.getStateSerializer(RegisteredKeyValueStateBackendMetaInfo.java:135) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.getStateSerializer(CopyOnWriteStateTable.java:541) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.createV2PlusReader(StateTableByKeyGroupReaders.java:69) at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.readerForVersion(StateTableByKeyGroupReaders.java:60) at org.apache.flink.runtime.state.heap.StateTable.keyGroupReader(StateTable.java:199) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:491) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:453) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:410) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:358) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:104) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) ... 7 more Caused by: java.io.InvalidClassException: ru.yandex.vertis.moderation.model.ModerationRequest$UpsertMetadata; local class incompatible: stream classdesc serialVersionUID = -30736003445323259, local class serialVersionUID = -2856495280913794838 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:687) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1880) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1746) at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1711) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1551) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) at java.util.HashMap.readObject(HashMap.java:1409) at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2173) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:558) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.readObject(KryoSerializer.java:583) at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2173) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:289) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116) at org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:161) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:169) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:392) ... 11 more
---
Best regards,
Pavel Potseluev
Software developer, Yandex.Classifieds LLC