Hi Dominik, I am not sure which documentation do you refer to when saying: "According to the docs the schema resolution is compatible with the Avro docs", but I assume this one[1]. If this is the case then the AvroDeserializationSchema plays no role in this process. That page describes evolution of the schema of the Flink's state. What that means is if you use an Avro class for objects in your state, take savepoint and then restore your job with updated schema of those avro objects, Flink should migrate the state to the new schema. AvroDeserializationSchema is used when reading records from external systems e.g. from Kafka.
General remark on Avro's schema migration (not Flink's per se). The avro reader needs both the schema with which the record was written (writer's schema) and the current schema (reader's schema) to perform the migration. In case of AvroDeserializationSchema both are always equal, therefore you cannot read records written with different schemas (we are missing the writer's schema). If you want to support reading records written with different schemas you need to use RegistryDeserializationSchema. That DeserializationSchema can provide the writer's schema for every record. Flink, out of the box, provides an implementation of RegistryDeserializationSchema that integrates with Confluent's schema registry for providing the writers schema, but you are free to provide your own implementation. Coming back to the state migration. The way it works is that Flink writes the avro schema as part of the snapshot. Therefore it is possible to migrate the whole state to the changed schema upon restoring. The new snapshot will be written entirely with the new updated avro schema. Hope this clarifies how the integration with Avro in Flink works. Best, Dawid [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/schema_evolution.html#avro-types On 04/11/2019 10:58, Dominik Wosiński wrote: > Hey, > I have a question regarding Avro Types and schema evolution. According to > the docs the schema resolution is compatible with the Avro docs [1]. > > But I have done some testing. For example, I have created a record, written > it to Kafka, and then changed the order the fields in schema and tried to > read the data with changed schema using AvroDeserializationSchema from > Flink, it was failing with IndexOfBounds during deserialization, but > according to the docs fields are resolved by name during deserialization > and changed ordering should not really be a problem. > > I have also found some other inconsistencies like when the writer is > missing the field, but it is optional for reader, which means that it has > default value. According to Avro docs this should be resolved, but I only > managed to resolve this of the missing field is at the end, not if it's in > the middle. > > Is there anything that I don't understand here, or is something wrong ?? > > Best Regards, > Dom. > > [1]http://avro.apache.org/docs/current/spec.html#Schema+Resolution >
signature.asc
Description: OpenPGP digital signature