Hello,
I am running into the problem where (avro) schema evolution works perfectly
for operator/keyed state but does not work when used with keyBy().
For example:
I have a job like so:
env.addSource(someSource())
.keyBy(object -> getMyAvroObject())
.process(doSomething())
.addSink(someSink());
Where MyAvroObject has the following avdl:
enum SomeEnum{
A,B
}
record SomeKey {
SomeEnum someEnum;
}
This works fine but when I change my avro object to:
enum SomeEnum{
A,B,C
}
record SomeKey {
SomeEnum someEnum;
}
So with the added "C" in SomeEnum. If I restart my job (from savepoint)
with this new schema I get the following exception:
Caused by: org.apache.flink.util.StateMigrationException: The new key
serializer must be compatible.
Coming from this piece of code
(https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L141):
if (keySerializerSchemaCompat.isCompatibleAfterMigration() ||
keySerializerSchemaCompat.isIncompatible()) {
throw new StateMigrationException("The new key serializer must be
compatible.");
}
My question is:
What is the reason key serializer / key state explicitly does not
support state migration? And is there any way to work around this?
Regards,
Richard