Is it possible to add new fields to the object type of a stream, and then restore from savepoint?
I tried to add a new field "private String" to my java class. It previously had "private String" and a "private final Map<String, String>". When trying to restore an old savepoint after this code change, it failed with "KryoException: Unable to find class". Is it possible to evolve the stream classes and restore old state after such changes? For me it would work if the new fields are set to null when restoring state with such objects. And if a field has been deleted, restored values could be ignored. Here's a full stack trace: 2018-03-07 08:49:03,072 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - EnrichIdFunction -> AppIdFilter([appsimulator_236e5fb7]) -> DiscardBeforeDateFunction(null) -> DiscardedEventsFunction -> (LateDataLabelFunction, EventMapper -> ThreadPoolGateway (capacity=10) -> Sink: ResponseKafkaSink) (7/8) (66b47839cefef8518605ece669709c65) switched from RUNNING to FAILED. java.lang.IllegalStateException: Could not initialize operator state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:330) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:241) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: �mo Serialization trace: params (com.rovio.ds.flink.http.Event) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:203) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.deserializeStateValues(DefaultOperatorStateBackend.java:552) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:368) at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:737) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:328) ... 6 more Caused by: java.lang.ClassNotFoundException: �mo at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ... 18 more I used flink 1.5-SNAPSHOT for this. FYI, how I solved this for now: As a work-around for my case, I happened to have a Map<String, String> in the object that I was able to use as a bit of a hack for this. I inserted the new field into the map and removed it afterwards to clean it up. This way I was able to get null values with map.get for objects that had been restored from the old savepoint, and non-null values for any new instances. FYI, failed attempt to do this: Only related things I could find about such thing were (not much help, but mention the Optional annotation): https://groups.google.com/forum/#!topic/kryo-users/F0FA4GkDg0M https://stackoverflow.com/questions/39105113/kryo-deserialize-old-version-of-class If I set @FieldSerializer.Optional("appId") on the new field, then Flink was able to restore from the old savepoint, but apparently the new field got ignored entirely, it just kept being null also for the new instances – apparently totally ignored then.