Hi! Schema evolution is a bit tricky at the moment. There is a short term and long term answer to this:
- Long term: We store serializer configuration in the snapshots, and want to use this in the future to offer a path that converts old format to new format (read with old serializer, pass through a user-specified converter function, serialize with new serializer). Two out of three parts are in place, but it is not fully working at this point. - Short term: To be able to evolve state, I would recommend to use something like Avro or so, that has schema evolution built in. Kryo is unfortunately particularly bad at class/schema evolution. To use avro with your types, when creating your state, pass it a "new AvroTypeInfo<>(MyClass.class)". You need do add "flink-avro" as a dependency in your application. Best, Stephan On Fri, Mar 16, 2018 at 11:19 AM, Juho Autio <juho.au...@rovio.com> wrote: > Is it possible to add new fields to the object type of a stream, and then > restore from savepoint? > > I tried to add a new field "private String" to my java class. It > previously had "private String" and a "private final Map<String, String>". > When trying to restore an old savepoint after this code change, it failed > with "KryoException: Unable to find class". > > Is it possible to evolve the stream classes and restore old state after > such changes? For me it would work if the new fields are set to null when > restoring state with such objects. And if a field has been deleted, > restored values could be ignored. > > Here's a full stack trace: > > 2018-03-07 08:49:03,072 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > - EnrichIdFunction -> AppIdFilter([appsimulator_236e5fb7]) -> > DiscardBeforeDateFunction(null) -> DiscardedEventsFunction -> > (LateDataLabelFunction, EventMapper -> ThreadPoolGateway (capacity=10) -> > Sink: ResponseKafkaSink) (7/8) (66b47839cefef8518605ece669709c65) > switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize operator state > backend. > at org.apache.flink.streaming.api.operators.AbstractStreamOperator. > initOperatorState(AbstractStreamOperator.java:330) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator. > initializeState(AbstractStreamOperator.java:241) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeOperators(StreamTask.java:676) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeState(StreamTask.java:663) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: > �mo > Serialization trace: > params (com.rovio.ds.flink.http.Event) > at com.esotericsoftware.kryo.util.DefaultClassResolver. > readName(DefaultClassResolver.java:138) > at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( > DefaultClassResolver.java:115) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.serializers.ObjectField.read( > ObjectField.java:99) > at com.esotericsoftware.kryo.serializers.FieldSerializer. > read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at org.apache.flink.api.java.typeutils.runtime.kryo. > KryoSerializer.deserialize(KryoSerializer.java:250) > at org.apache.flink.streaming.runtime.streamrecord. > StreamElementSerializer.deserialize(StreamElementSerializer.java:203) > at org.apache.flink.streaming.runtime.streamrecord. > StreamElementSerializer.deserialize(StreamElementSerializer.java:48) > at org.apache.flink.runtime.state.DefaultOperatorStateBackend. > deserializeStateValues(DefaultOperatorStateBackend.java:552) > at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore( > DefaultOperatorStateBackend.java:368) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > createOperatorStateBackend(StreamTask.java:737) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator. > initOperatorState(AbstractStreamOperator.java:328) > ... 6 more > Caused by: java.lang.ClassNotFoundException: �mo > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at com.esotericsoftware.kryo.util.DefaultClassResolver. > readName(DefaultClassResolver.java:136) > ... 18 more > > I used flink 1.5-SNAPSHOT for this. > > > > FYI, how I solved this for now: > > As a work-around for my case, I happened to have a Map<String, String> in > the object that I was able to use as a bit of a hack for this. I inserted > the new field into the map and removed it afterwards to clean it up. This > way I was able to get null values with map.get for objects that had been > restored from the old savepoint, and non-null values for any new instances. > > > > FYI, failed attempt to do this: > > Only related things I could find about such thing were (not much help, but > mention the Optional annotation): > > https://groups.google.com/forum/#!topic/kryo-users/F0FA4GkDg0M > https://stackoverflow.com/questions/39105113/kryo- > deserialize-old-version-of-class > > If I set @FieldSerializer.Optional("appId") on the new field, then Flink > was able to restore from the old savepoint, but apparently the new field > got ignored entirely, it just kept being null also for the new instances – > apparently totally ignored then. > >