Hi Dominik,

I am not sure which documentation do you refer to when saying: "According to
the docs the schema resolution is compatible with the Avro docs", but I
assume this one[1]. If this is the case then the
AvroDeserializationSchema plays no role in this process. That page
describes evolution of the schema of the Flink's state. What that means
is if you use an Avro class for objects in your state, take savepoint
and then restore your job with updated schema of those avro objects,
Flink should migrate the state to the new schema.
AvroDeserializationSchema is used when reading records from external
systems e.g. from Kafka.

General remark on Avro's schema migration (not Flink's per se). The avro
reader needs both the schema with which the record was written (writer's
schema) and the current schema (reader's schema) to perform the migration.

In case of AvroDeserializationSchema both are always equal, therefore
you cannot read records written with different schemas (we are missing
the writer's schema). If you want to support reading records written
with different schemas you need to use RegistryDeserializationSchema.
That DeserializationSchema can provide the writer's schema for every
record. Flink, out of the box, provides an implementation of
RegistryDeserializationSchema that integrates with Confluent's schema
registry for providing the writers schema, but you are free to provide
your own implementation.

Coming back to the state migration. The way it works is that Flink
writes the avro schema as part of the snapshot. Therefore it is possible
to migrate the whole state to the changed schema upon restoring. The new
snapshot will be written entirely with the new updated avro schema.

Hope this clarifies how the integration with Avro in Flink works.

Best,

Dawid


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/schema_evolution.html#avro-types

On 04/11/2019 10:58, Dominik Wosiński wrote:
> Hey,
> I have a question regarding Avro Types and schema evolution. According to
> the docs the schema resolution is compatible with the Avro docs [1].
>
> But I have done some testing. For example, I have created a record, written
> it to Kafka, and then changed the order the fields in schema and tried to
> read the data with changed schema using AvroDeserializationSchema from
> Flink, it was  failing with IndexOfBounds during deserialization, but
> according to the docs fields are resolved by name during deserialization
> and changed ordering should not really be a problem.
>
> I have also found some other inconsistencies like  when the writer is
> missing the field, but it is optional for reader, which means that it has
> default value. According to Avro docs this should be resolved, but I only
> managed to resolve this of the missing field is at the end, not if it's in
> the middle.
>
> Is there anything that I don't understand here, or is something wrong ??
>
> Best Regards,
> Dom.
>
> [1]http://avro.apache.org/docs/current/spec.html#Schema+Resolution
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to