Hi!

Schema evolution is a bit tricky at the moment. There is a short term and
long term answer to this:

  - Long term: We store serializer configuration in the snapshots, and want
to use this in the future to offer a path that converts old format to new
format (read with old serializer, pass through a user-specified converter
function, serialize with new serializer). Two out of three parts are in
place, but it is not fully working at this point.

  - Short term: To be able to evolve state, I would recommend to use
something like Avro or so, that has schema evolution built in. Kryo is
unfortunately particularly bad at class/schema evolution. To use avro with
your types, when creating your state, pass it a "new
AvroTypeInfo<>(MyClass.class)". You need do add "flink-avro" as a
dependency in your application.

Best,
Stephan


On Fri, Mar 16, 2018 at 11:19 AM, Juho Autio <juho.au...@rovio.com> wrote:

> Is it possible to add new fields to the object type of a stream, and then
> restore from savepoint?
>
> I tried to add a new field "private String" to my java class. It
> previously had "private String" and a "private final Map<String, String>".
> When trying to restore an old savepoint after this code change, it failed
> with "KryoException: Unable to find class".
>
> Is it possible to evolve the stream classes and restore old state after
> such changes? For me it would work if the new fields are set to null when
> restoring state with such objects. And if a field has been deleted,
> restored values could be ignored.
>
> Here's a full stack trace:
>
> 2018-03-07 08:49:03,072 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph
>       - EnrichIdFunction -> AppIdFilter([appsimulator_236e5fb7]) ->
> DiscardBeforeDateFunction(null) -> DiscardedEventsFunction ->
> (LateDataLabelFunction, EventMapper -> ThreadPoolGateway (capacity=10) ->
> Sink: ResponseKafkaSink) (7/8) (66b47839cefef8518605ece669709c65)
> switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize operator state
> backend.
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initOperatorState(AbstractStreamOperator.java:330)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:241)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:676)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:663)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:252)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class:
> �mo
> Serialization trace:
> params (com.rovio.ds.flink.http.Event)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:138)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(
> ObjectField.java:99)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.
> read(FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:250)
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:203)
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:48)
> at org.apache.flink.runtime.state.DefaultOperatorStateBackend.
> deserializeStateValues(DefaultOperatorStateBackend.java:552)
> at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(
> DefaultOperatorStateBackend.java:368)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createOperatorStateBackend(StreamTask.java:737)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initOperatorState(AbstractStreamOperator.java:328)
> ... 6 more
> Caused by: java.lang.ClassNotFoundException: �mo
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:136)
> ... 18 more
>
> I used flink 1.5-SNAPSHOT for this.
>
>
>
> FYI, how I solved this for now:
>
> As a work-around for my case, I happened to have a Map<String, String> in
> the object that I was able to use as a bit of a hack for this. I inserted
> the new field into the map and removed it afterwards to clean it up. This
> way I was able to get null values with map.get for objects that had been
> restored from the old savepoint, and non-null values for any new instances.
>
>
>
> FYI, failed attempt to do this:
>
> Only related things I could find about such thing were (not much help, but
> mention the Optional annotation):
>
> https://groups.google.com/forum/#!topic/kryo-users/F0FA4GkDg0M
> https://stackoverflow.com/questions/39105113/kryo-
> deserialize-old-version-of-class
>
> If I set @FieldSerializer.Optional("appId") on the new field, then Flink
> was able to restore from the old savepoint, but apparently the new field
> got ignored entirely, it just kept being null also for the new instances –
> apparently totally ignored then.
>
>

Reply via email to