Hello everyone,

I am trying to figure out how to set up Flink with Avro for state
management (especially the content of snapshots) to enable state migrations
(e.g. adding a nullable fields to a class). So far, in version 1.4.2, I
tried to explicitly provide an instance of "new
AvroTypeInfo(Accumulator.getClass())" where accumulator is a very simple
Avro generated SpecificRecordBase of the following schema:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
     {"name": "accumulator", "type": "int"}
 ]
}

This successfully saves the state to the snapshot. When I then try to load
the snapshot with an updated schema (adding the nullable field) it fails.
Schema looks like this:

{"namespace": "io.relayr.flink",
 "type": "record",
 "name": "Accumulator",
 "fields": [
     {"name": "accumulator", "type": "int"},
     {"name": "newStuff", "type": ["int", "null"]}
 ]
}

When I try to restart the Job from the snapshot, I get the following
exception:
2018-04-17 09:35:23,519 WARN  org.apache.flink.api.common.typeutils.
TypeSerializerSerializationUtil  - Deserialization of serializer errored;
replacing with null.
java.io.IOException: Unloadable class for type serializer.
...
Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator;
local class incompatible: stream classdesc serialVersionUID =
-3555733236161157838, local class serialVersionUID = 5291033088112484292

Which is true, Avro tools do generate a new serialization ID for the bean,
I just didn't expect it to be used and expected the Avro schema to be used
instead? Did anyone get this working? What am I getting wrong?

Best regards,
Petter

Reply via email to