[ https://issues.apache.org/jira/browse/FLINK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16638834#comment-16638834 ]
Elias Levy commented on FLINK-10493: ------------------------------------ This issue was noted [~tzulitai] back in June 2017 [here|https://github.com/apache/flink/pull/4090#issuecomment-307109692]. > Macro generated CaseClassSerializer considered harmful > ------------------------------------------------------ > > Key: FLINK-10493 > URL: https://issues.apache.org/jira/browse/FLINK-10493 > Project: Flink > Issue Type: Bug > Components: Scala API, State Backends, Checkpointing, Type > Serialization System > Affects Versions: 1.4.0, 1.4.1, 1.4.2, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.6.1, > 1.5.4 > Reporter: Elias Levy > Priority: Major > > The Flink Scala API uses implicits and macros to generate {{TypeInformation}} > and {{TypeSerializer}} objects for types. In the case of Scala tuple and > case classes, the macro generates an [anonymous {{CaseClassSerializer}} > class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161]. > > The Scala compiler will generate a name for the anonymous class that depends > on the relative position in the code of the macro invocation to other > anonymous classes. If the code is changed such that the anonymous class > relative position changes, even if the overall logic of the code or the type > in question do not change, the name of the serializer class will change. > That will result in errors, such as the one below, if the job is restored > from a savepoint, as the serializer to read the data in the savepoint will no > longer be found, as its name will have changed. > At the very least, there should be a prominent warning in the documentation > about this issue. Minor code changes can result in jobs that can't restore > previous state. Ideally, the use of anonymous classes should be deprecated > if possible. > {noformat} > WARN org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil > - Deserialization of serializer errored; replacing with null. > java.io.IOException: Unloadable class for type serializer. > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203) > at > org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207) > at > org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source) > Caused by: java.io.InvalidClassException: failed to read class descriptor > at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) > at java.io.ObjectInputStream.readClassDesc(Unknown Source) > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > at java.io.ObjectInputStream.readObject0(Unknown Source) > at java.io.ObjectInputStream.defaultReadFields(Unknown Source) > at java.io.ObjectInputStream.readSerialData(Unknown Source) > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > at java.io.ObjectInputStream.readObject0(Unknown Source) > at java.io.ObjectInputStream.readObject(Unknown Source) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375) > ... 14 more > Caused by: java.lang.ClassNotFoundException: > com.somewhere.TestJob$$anon$13$$anon$3 > at java.net.URLClassLoader.findClass(Unknown Source) > at java.lang.ClassLoader.loadClass(Unknown Source) > at > org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) > at java.lang.ClassLoader.loadClass(Unknown Source) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Unknown Source) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73) > at > org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204) > ... 24 more > WARN org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil > - Deserialization of serializer errored; replacing with null. > java.io.IOException: Unloadable class for type serializer. > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203) > at > org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:445) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:250) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:206) > at > org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207) > at > org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source) > Caused by: java.io.InvalidClassException: failed to read class descriptor > at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) > at java.io.ObjectInputStream.readClassDesc(Unknown Source) > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > at java.io.ObjectInputStream.readObject0(Unknown Source) > at java.io.ObjectInputStream.readObject(Unknown Source) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375) > ... 18 more > Caused by: java.lang.ClassNotFoundException: > com.somewhere.TestJob$$anon$13$$anon$3 > at java.net.URLClassLoader.findClass(Unknown Source) > at java.lang.ClassLoader.loadClass(Unknown Source) > at > org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) > at java.lang.ClassLoader.loadClass(Unknown Source) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Unknown Source) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73) > at > org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204) > ... 24 more > ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error > during disposal of stream operator. > java.lang.NullPointerException > at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:353) > at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:330) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source) > INFO org.apache.flink.runtime.taskmanager.Task - Source: > Kafka Topic -> Async Function (1/1) (1de078fb77acdd16b7e021fb3e70339f) > switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize operator state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source) > Caused by: java.io.IOException: Unable to restore operator state > [_async_wait_operator_state_]. The previous serializer of the operator state > must be present; the serializer could have been removed from the classpath, > or its implementation have changed and could not be loaded. This is a > temporary restriction that will be fixed in future versions. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:367) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) > ... 6 more > {noformat} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)