Hi Richard,

Schema evolution for data types that are used as keys is not allowed
because, potentially, if the schema of the key changes, hash codes of keys
may also change and can break partitioning for internal state managed by
Flink.
There are of course some evolution scenarios that would not result in the
hashes changing (e.g. only evolving the serialization format), but at the
moment there is no way for Flink to determine that given the current
serializer compatibility interface.

For the workaround, I would need a bit more information:
Could you let me know if this job is already in production and you are
trying to restore from a savepoint? Would it be possible for you to give me
a snippet of the generated SpecificRecord from your Avro schema?

Cheers,
Gordon

On Mon, Jun 3, 2019 at 6:15 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Richard,
>
> I've pulled in Gordon who worked on this feature. He should be able to
> tell you about the current limitations of Flink's schema evolution.
>
> Cheers,
> Till
>
> On Wed, May 29, 2019 at 1:44 PM Richard Deurwaarder <rich...@xeli.eu>
> wrote:
>
>> Hello,
>>
>> I am running into the problem where (avro) schema evolution works
>> perfectly
>> for operator/keyed state but does not work when used with keyBy().
>>
>> For example:
>>
>> I have a job like so:
>>
>>     env.addSource(someSource())
>>           .keyBy(object -> getMyAvroObject())
>>           .process(doSomething())
>>           .addSink(someSink());
>>
>> Where MyAvroObject has the following avdl:
>>
>> enum SomeEnum{
>>   A,B
>> }
>>
>> record SomeKey {
>>   SomeEnum someEnum;
>> }
>>
>> This works fine but when I change my avro object to:
>>
>>
>> enum SomeEnum{
>>   A,B,C
>> }
>>
>> record SomeKey {
>>   SomeEnum someEnum;
>> }
>>
>>
>> So with the added "C" in SomeEnum. If I restart my job (from savepoint)
>> with this new schema I get the following exception:
>>
>> Caused by: org.apache.flink.util.StateMigrationException: The new key
>> serializer must be compatible.
>>
>>
>> Coming from this piece of code
>> (
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java#L141
>> ):
>>
>>
>> if (keySerializerSchemaCompat.isCompatibleAfterMigration() ||
>> keySerializerSchemaCompat.isIncompatible()) {
>>     throw new StateMigrationException("The new key serializer must be
>> compatible.");
>> }
>>
>>
>>
>> My question is:
>>
>>
>> What is the reason key serializer / key state explicitly does not
>> support state migration? And is there any way to work around this?
>>
>>
>> Regards,
>>
>>
>> Richard
>>
>

Reply via email to