Is it possible to add new fields to the object type of a stream, and then
restore from savepoint?

I tried to add a new field "private String" to my java class. It previously
had "private String" and a "private final Map<String, String>". When trying
to restore an old savepoint after this code change, it failed with
"KryoException:
Unable to find class".

Is it possible to evolve the stream classes and restore old state after
such changes? For me it would work if the new fields are set to null when
restoring state with such objects. And if a field has been deleted,
restored values could be ignored.

Here's a full stack trace:

2018-03-07 08:49:03,072 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        -
EnrichIdFunction -> AppIdFilter([appsimulator_236e5fb7]) ->
DiscardBeforeDateFunction(null) -> DiscardedEventsFunction ->
(LateDataLabelFunction, EventMapper -> ThreadPoolGateway (capacity=10) ->
Sink: ResponseKafkaSink) (7/8) (66b47839cefef8518605ece669709c65) switched
from RUNNING to FAILED.
java.lang.IllegalStateException: Could not initialize operator state
backend.
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:330)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:241)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class:
�mo
Serialization trace:
params (com.rovio.ds.flink.http.Event)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:203)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:552)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:368)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:737)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:328)
... 6 more
Caused by: java.lang.ClassNotFoundException: �mo
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
... 18 more

I used flink 1.5-SNAPSHOT for this.



FYI, how I solved this for now:

As a work-around for my case, I happened to have a Map<String, String> in
the object that I was able to use as a bit of a hack for this. I inserted
the new field into the map and removed it afterwards to clean it up. This
way I was able to get null values with map.get for objects that had been
restored from the old savepoint, and non-null values for any new instances.



FYI, failed attempt to do this:

Only related things I could find about such thing were (not much help, but
mention the Optional annotation):

https://groups.google.com/forum/#!topic/kryo-users/F0FA4GkDg0M
https://stackoverflow.com/questions/39105113/kryo-deserialize-old-version-of-class

If I set @FieldSerializer.Optional("appId") on the new field, then Flink
was able to restore from the old savepoint, but apparently the new field
got ignored entirely, it just kept being null also for the new instances –
apparently totally ignored then.

Reply via email to