>One thing that I’m not completely certain with yet, is where in your
demonstrated code a anonymous-classed serializer is generated for some type.
>From what I see, there shouldn’t be any anonymous-class serializers for
the code. Is the code you provided a “simplified” version of the actual
code in which you >observed the restore error?

I had the same problem, I couldn't easily figure out where that anonymous
serializer is coming from. That also makes it rather difficult to package
an old version of this class along with a new job in case of refactorings

While i initially observed this problem in a more complex job. The behavior
can be 100% reproduced with just that example job in the gist


On 29 January 2018 at 12:33, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi,
>
> In the Scala API, type serializers may be anonymous classes generated by
> Scala macros, and would therefore contain a reference to the wrapping class
> (i.e., your `Operators` class).
> Since Flink currently serializes serializers into the savepoint to be used
> for deserialization on restore, and the fact that they must be present at
> restore time, changing the `Operators` classname would result in the
> previous anonymous class serializer to no longer be in the classpath and
> therefore fails the deserialization of the written serializer.
> This is a limitation caused by how registering serializers for written
> state currently works in Flink.
>
> Generally speaking, to overcome this, you would need to have the previous
> serializer class still around in the classpath when restoring, and can only
> be completely removed from user code once the migration is completed.
>
> One thing that I’m not completely certain with yet, is where in your
> demonstrated code a anonymous-classed serializer is generated for some type.
> From what I see, there shouldn’t be any anonymous-class serializers for
> the code. Is the code you provided a “simplified” version of the actual
> code in which you observed the restore error?
>
> Cheers,
> Gordon
>
>
> On 28 January 2018 at 6:00:32 PM, jelmer (jkupe...@gmail.com) wrote:
>
> Changing the class operators are nested in can break compatibility with
> existing savepoints. The following piece of code demonstrates this
>
> https://gist.github.com/jelmerk/e55abeb0876539975cd32ad0ced8141a
>
> If I change Operators in this file to Operators2  i will not be able to
> recover from a savepoint that was made  when this class still had its old
> name.
>
> The error in the flink ui will be
>
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initKeyedState(AbstractStreamOperator.java:293)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:225)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:692)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:679)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMet
> aInfo.<init>(RegisteredKeyedBackendStateMetaInfo.java:53)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullRestoreOperation.restoreKVStateMetaData(
> RocksDBKeyedStateBackend.java:1216)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(
> RocksDBKeyedStateBackend.java:1153)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.
> restore(RocksDBKeyedStateBackend.java:1034)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createKeyedStateBackend(StreamTask.java:773)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
>
> But the real reason is found in the task manager logs
>
>
> 2018-01-28 17:03:58,830 WARN  org.apache.flink.api.common.typeutils.
> TypeSerializerSerializationUtil  - Deserialization of serializer errored;
> replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$
> TypeSerializerSerializationProxy.read(TypeSerializerSerializationUti
> l.java:463)
> at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUti
> l.tryReadSerializer(TypeSerializerSerializationUtil.java:189)
> at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUti
> l.tryReadSerializer(TypeSerializerSerializationUtil.java:162)
> at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.
> readSerializersAndConfigsWithResilience(TypeSerializerSerializationUti
> l.java:282)
> at org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnaps
> hotReaderWriters$KeyedBackendStateMetaInfoReaderV3.readStateMetaInfo(
> KeyedBackendStateMetaInfoSnapshotReaderWriters.java:200)
> at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(
> KeyedBackendSerializationProxy.java:152)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullRestoreOperation.restoreKVStateMetaData(
> RocksDBKeyedStateBackend.java:1175)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(
> RocksDBKeyedStateBackend.java:1153)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$
> RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1139)
> at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.
> restore(RocksDBKeyedStateBackend.java:1034)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createKeyedStateBackend(StreamTask.java:773)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initKeyedState(AbstractStreamOperator.java:283)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:225)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:692)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:679)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1611)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
> at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1781)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
> at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$
> TypeSerializerSerializationProxy.read(TypeSerializerSerializationUti
> l.java:454)
> ... 17 more
> Caused by: java.lang.ClassNotFoundException: com.ecg.foo.Main$Operators$$
> anon$3$$anon$1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.
> resolveClass(InstantiationUtil.java:64)
> at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$
> FailureTolerantObjectInputStream.readClassDescriptor(
> TypeSerializerSerializationUtil.java:110)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1609)
> ... 22 more
>
>
>
> Is there any way to make this code more robust ? Using java serialization
> in this way feels very brittle in the face of refactorings.
>
>

Reply via email to