Iaroslav Zeigerman created FLINK-23953:
------------------------------------------

             Summary: AvroSerializerSnapshot causes state migration even though 
the old and the new schemas are compatible
                 Key: FLINK-23953
                 URL: https://issues.apache.org/jira/browse/FLINK-23953
             Project: Flink
          Issue Type: Bug
          Components: API / Type Serialization System
    Affects Versions: 1.13.2
            Reporter: Iaroslav Zeigerman


The problematic code is located 
[here|https://github.com/apache/flink/blob/release-1.13.2/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java#L194].
 Returning COMPATIBLE_AFTER_MIGRATION seems completely unnecessary and causes 
issues during schema evolution. The COMPATIBLE_AS_IS status should be returned 
instead. Even the comment right above the shared snippet suggests that:
{noformat}
                    // The new serializer would be able to read data persisted 
with *this*
                    // serializer, therefore no migration
                    // is required.
{noformat}

This issue leads to Flink failures in a scenario when a new optional field is 
added to a schema. The following happens in this case:
# Records in state get deserialized successfully using the old serializer (with 
old schema)
# The schema changes leads to state migration due to the code path that I 
shared above.
# RocksDBKeyedStateBackend attempts to serialize a record with the old schema 
using the new schema.
# The latter operation fails for obvious reasons (incompatibility of record 
indexes between the old and the new schemas).

The failure occurs with a stack trace which looks something like this:

{noformat}
Caused by: org.apache.flink.util.StateMigrationException: Error while trying to 
migrate RocksDB list state.
        at 
org.apache.flink.contrib.streaming.state.RocksDBListState.migrateSerializedValue(RocksDBListState.java:269)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:630)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:559)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:509)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:670)
        at 
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
        at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
        ... 18 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 10
        at org.apache.avro.generic.GenericData$Record.get(GenericData.java:261)
        at org.apache.avro.generic.GenericData.getField(GenericData.java:825)
        at org.apache.avro.generic.GenericData.getField(GenericData.java:844)
        at 
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:204)
        at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
        at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
        at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
        at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144)
        at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
        at 
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
        at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
        at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
        at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
        at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
        at 
org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:186)
        at 
org.apache.flink.contrib.streaming.state.RocksDBListState.migrateSerializedValue(RocksDBListState.java:263)
        ... 28 more
{noformat}

If Flink skipped the migration in this case and just went ahead using the new 
schema for deserialization of old records no such issue would've occurred.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to