Hello,

I am running into the problem where (avro) schema evolution works perfectly
for operator/keyed state but does not work when used with keyBy().

For example:

I have a job like so:

    env.addSource(someSource())
          .keyBy(object -> getMyAvroObject())
          .process(doSomething())
          .addSink(someSink());

Where MyAvroObject has the following avdl:

enum SomeEnum{
  A,B
}

record SomeKey {
  SomeEnum someEnum;
}

This works fine but when I change my avro object to:


enum SomeEnum{
  A,B,C
}

record SomeKey {
  SomeEnum someEnum;
}


So with the added "C" in SomeEnum. If I restart my job (from savepoint)
with this new schema I get the following exception:

Caused by: org.apache.flink.util.StateMigrationException: The new key
serializer must be compatible.


Coming from this piece of code
(https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L141):


if (keySerializerSchemaCompat.isCompatibleAfterMigration() ||
keySerializerSchemaCompat.isIncompatible()) {
    throw new StateMigrationException("The new key serializer must be
compatible.");
}



My question is:


What is the reason key serializer / key state explicitly does not
support state migration? And is there any way to work around this?


Regards,


Richard

Reply via email to