Hello, I am running into the problem where (avro) schema evolution works perfectly for operator/keyed state but does not work when used with keyBy().
For example: I have a job like so: env.addSource(someSource()) .keyBy(object -> getMyAvroObject()) .process(doSomething()) .addSink(someSink()); Where MyAvroObject has the following avdl: enum SomeEnum{ A,B } record SomeKey { SomeEnum someEnum; } This works fine but when I change my avro object to: enum SomeEnum{ A,B,C } record SomeKey { SomeEnum someEnum; } So with the added "C" in SomeEnum. If I restart my job (from savepoint) with this new schema I get the following exception: Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible. Coming from this piece of code (https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L141): if (keySerializerSchemaCompat.isCompatibleAfterMigration() || keySerializerSchemaCompat.isIncompatible()) { throw new StateMigrationException("The new key serializer must be compatible."); } My question is: What is the reason key serializer / key state explicitly does not support state migration? And is there any way to work around this? Regards, Richard