Hi Richard, I've pulled in Gordon who worked on this feature. He should be able to tell you about the current limitations of Flink's schema evolution.
Cheers, Till On Wed, May 29, 2019 at 1:44 PM Richard Deurwaarder <rich...@xeli.eu> wrote: > Hello, > > I am running into the problem where (avro) schema evolution works perfectly > for operator/keyed state but does not work when used with keyBy(). > > For example: > > I have a job like so: > > env.addSource(someSource()) > .keyBy(object -> getMyAvroObject()) > .process(doSomething()) > .addSink(someSink()); > > Where MyAvroObject has the following avdl: > > enum SomeEnum{ > A,B > } > > record SomeKey { > SomeEnum someEnum; > } > > This works fine but when I change my avro object to: > > > enum SomeEnum{ > A,B,C > } > > record SomeKey { > SomeEnum someEnum; > } > > > So with the added "C" in SomeEnum. If I restart my job (from savepoint) > with this new schema I get the following exception: > > Caused by: org.apache.flink.util.StateMigrationException: The new key > serializer must be compatible. > > > Coming from this piece of code > ( > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L141 > ): > > > if (keySerializerSchemaCompat.isCompatibleAfterMigration() || > keySerializerSchemaCompat.isIncompatible()) { > throw new StateMigrationException("The new key serializer must be > compatible."); > } > > > > My question is: > > > What is the reason key serializer / key state explicitly does not > support state migration? And is there any way to work around this? > > > Regards, > > > Richard >