>One thing that I’m not completely certain with yet, is where in your demonstrated code a anonymous-classed serializer is generated for some type. >From what I see, there shouldn’t be any anonymous-class serializers for the code. Is the code you provided a “simplified” version of the actual code in which you >observed the restore error?
I had the same problem, I couldn't easily figure out where that anonymous serializer is coming from. That also makes it rather difficult to package an old version of this class along with a new job in case of refactorings While i initially observed this problem in a more complex job. The behavior can be 100% reproduced with just that example job in the gist On 29 January 2018 at 12:33, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi, > > In the Scala API, type serializers may be anonymous classes generated by > Scala macros, and would therefore contain a reference to the wrapping class > (i.e., your `Operators` class). > Since Flink currently serializes serializers into the savepoint to be used > for deserialization on restore, and the fact that they must be present at > restore time, changing the `Operators` classname would result in the > previous anonymous class serializer to no longer be in the classpath and > therefore fails the deserialization of the written serializer. > This is a limitation caused by how registering serializers for written > state currently works in Flink. > > Generally speaking, to overcome this, you would need to have the previous > serializer class still around in the classpath when restoring, and can only > be completely removed from user code once the migration is completed. > > One thing that I’m not completely certain with yet, is where in your > demonstrated code a anonymous-classed serializer is generated for some type. > From what I see, there shouldn’t be any anonymous-class serializers for > the code. Is the code you provided a “simplified” version of the actual > code in which you observed the restore error? > > Cheers, > Gordon > > > On 28 January 2018 at 6:00:32 PM, jelmer (jkupe...@gmail.com) wrote: > > Changing the class operators are nested in can break compatibility with > existing savepoints. The following piece of code demonstrates this > > https://gist.github.com/jelmerk/e55abeb0876539975cd32ad0ced8141a > > If I change Operators in this file to Operators2 i will not be able to > recover from a savepoint that was made when this class still had its old > name. > > The error in the flink ui will be > > java.lang.IllegalStateException: Could not initialize keyed state backend. > at org.apache.flink.streaming.api.operators.AbstractStreamOperator. > initKeyedState(AbstractStreamOperator.java:293) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator. > initializeState(AbstractStreamOperator.java:225) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeOperators(StreamTask.java:692) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeState(StreamTask.java:679) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58) > at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMet > aInfo.<init>(RegisteredKeyedBackendStateMetaInfo.java:53) > at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$ > RocksDBFullRestoreOperation.restoreKVStateMetaData( > RocksDBKeyedStateBackend.java:1216) > at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$ > RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle( > RocksDBKeyedStateBackend.java:1153) > at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$ > RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139) > at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend. > restore(RocksDBKeyedStateBackend.java:1034) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > createKeyedStateBackend(StreamTask.java:773) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator. > initKeyedState(AbstractStreamOperator.java:283) > ... 6 more > > But the real reason is found in the task manager logs > > > 2018-01-28 17:03:58,830 WARN org.apache.flink.api.common.typeutils. > TypeSerializerSerializationUtil - Deserialization of serializer errored; > replacing with null. > java.io.IOException: Unloadable class for type serializer. > at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$ > TypeSerializerSerializationProxy.read(TypeSerializerSerializationUti > l.java:463) > at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUti > l.tryReadSerializer(TypeSerializerSerializationUtil.java:189) > at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUti > l.tryReadSerializer(TypeSerializerSerializationUtil.java:162) > at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil. > readSerializersAndConfigsWithResilience(TypeSerializerSerializationUti > l.java:282) > at org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnaps > hotReaderWriters$KeyedBackendStateMetaInfoReaderV3.readStateMetaInfo( > KeyedBackendStateMetaInfoSnapshotReaderWriters.java:200) > at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read( > KeyedBackendSerializationProxy.java:152) > at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$ > RocksDBFullRestoreOperation.restoreKVStateMetaData( > RocksDBKeyedStateBackend.java:1175) > at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$ > RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle( > RocksDBKeyedStateBackend.java:1153) > at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$ > RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139) > at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend. > restore(RocksDBKeyedStateBackend.java:1034) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > createKeyedStateBackend(StreamTask.java:773) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator. > initKeyedState(AbstractStreamOperator.java:283) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator. > initializeState(AbstractStreamOperator.java:225) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeOperators(StreamTask.java:692) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeState(StreamTask.java:679) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.InvalidClassException: failed to read class descriptor > at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1611) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:1781) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) > at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$ > TypeSerializerSerializationProxy.read(TypeSerializerSerializationUti > l.java:454) > ... 17 more > Caused by: java.lang.ClassNotFoundException: com.ecg.foo.Main$Operators$$ > anon$3$$anon$1 > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream. > resolveClass(InstantiationUtil.java:64) > at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$ > FailureTolerantObjectInputStream.readClassDescriptor( > TypeSerializerSerializationUtil.java:110) > at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1609) > ... 22 more > > > > Is there any way to make this code more robust ? Using java serialization > in this way feels very brittle in the face of refactorings. > >