Hi, In the Scala API, type serializers may be anonymous classes generated by Scala macros, and would therefore contain a reference to the wrapping class (i.e., your `Operators` class). Since Flink currently serializes serializers into the savepoint to be used for deserialization on restore, and the fact that they must be present at restore time, changing the `Operators` classname would result in the previous anonymous class serializer to no longer be in the classpath and therefore fails the deserialization of the written serializer. This is a limitation caused by how registering serializers for written state currently works in Flink.
Generally speaking, to overcome this, you would need to have the previous serializer class still around in the classpath when restoring, and can only be completely removed from user code once the migration is completed. One thing that I’m not completely certain with yet, is where in your demonstrated code a anonymous-classed serializer is generated for some type. From what I see, there shouldn’t be any anonymous-class serializers for the code. Is the code you provided a “simplified” version of the actual code in which you observed the restore error? Cheers, Gordon On 28 January 2018 at 6:00:32 PM, jelmer (jkupe...@gmail.com) wrote: Changing the class operators are nested in can break compatibility with existing savepoints. The following piece of code demonstrates this https://gist.github.com/jelmerk/e55abeb0876539975cd32ad0ced8141a If I change Operators in this file to Operators2 i will not be able to recover from a savepoint that was made when this class still had its old name. The error in the flink ui will be java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.<init>(RegisteredKeyedBackendStateMetaInfo.java:53) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1216) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) ... 6 more But the real reason is found in the task manager logs 2018-01-28 17:03:58,830 WARN org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil - Deserialization of serializer errored; replacing with null. java.io.IOException: Unloadable class for type serializer. at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:463) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:189) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:162) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:282) at org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoReaderV3.readStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:200) at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:152) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:1175) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeyedStateBackend.java:1153) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.InvalidClassException: failed to read class descriptor at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1611) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:454) ... 17 more Caused by: java.lang.ClassNotFoundException: com.ecg.foo.Main$Operators$$anon$3$$anon$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$FailureTolerantObjectInputStream.readClassDescriptor(TypeSerializerSerializationUtil.java:110) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1609) ... 22 more Is there any way to make this code more robust ? Using java serialization in this way feels very brittle in the face of refactorings.