Iaroslav Zeigerman created FLINK-23953: ------------------------------------------
Summary: AvroSerializerSnapshot causes state migration even though the old and the new schemas are compatible Key: FLINK-23953 URL: https://issues.apache.org/jira/browse/FLINK-23953 Project: Flink Issue Type: Bug Components: API / Type Serialization System Affects Versions: 1.13.2 Reporter: Iaroslav Zeigerman The problematic code is located [here|https://github.com/apache/flink/blob/release-1.13.2/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java#L194]. Returning COMPATIBLE_AFTER_MIGRATION seems completely unnecessary and causes issues during schema evolution. The COMPATIBLE_AS_IS status should be returned instead. Even the comment right above the shared snippet suggests that: {noformat} // The new serializer would be able to read data persisted with *this* // serializer, therefore no migration // is required. {noformat} This issue leads to Flink failures in a scenario when a new optional field is added to a schema. The following happens in this case: # Records in state get deserialized successfully using the old serializer (with old schema) # The schema changes leads to state migration due to the code path that I shared above. # RocksDBKeyedStateBackend attempts to serialize a record with the old schema using the new schema. # The latter operation fails for obvious reasons (incompatibility of record indexes between the old and the new schemas). The failure occurs with a stack trace which looks something like this: {noformat} Caused by: org.apache.flink.util.StateMigrationException: Error while trying to migrate RocksDB list state. at org.apache.flink.contrib.streaming.state.RocksDBListState.migrateSerializedValue(RocksDBListState.java:269) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:630) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:559) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:509) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:670) at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71) ... 18 more Caused by: java.lang.ArrayIndexOutOfBoundsException: 10 at org.apache.avro.generic.GenericData$Record.get(GenericData.java:261) at org.apache.avro.generic.GenericData.getField(GenericData.java:825) at org.apache.avro.generic.GenericData.getField(GenericData.java:844) at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:204) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:186) at org.apache.flink.contrib.streaming.state.RocksDBListState.migrateSerializedValue(RocksDBListState.java:263) ... 28 more {noformat} If Flink skipped the migration in this case and just went ahead using the new schema for deserialization of old records no such issue would've occurred. -- This message was sent by Atlassian Jira (v8.3.4#803005)