Hi Dan,

It depends which part of the system you have in mind. Generally though
Avro itself does need the original schema of the record it was written
with. There are a couple of alternatives. You have
RegistryAvroDeserializationSchema for DataStream, which looks up the old
schema in schema registry (It will be exposed in SQL in 1.12[1]). If we
are talking about schema migration for state objects, we persist the
schema with which the state was written. So that upon restore we have
the old schema and possibly new schema.

In case of avro files we could/should probably use the schema from the file.

Best,

Dawid


[1] https://issues.apache.org/jira/browse/FLINK-16048

On 16/09/2020 21:20, Dan Hill wrote:
> Interesting.  How does schema evolution work with Avro and Flink? 
> E.g. adding new fields or enum values.
>
> On Wed, Sep 16, 2020 at 12:13 PM Dawid Wysakowicz
> <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> wrote:
>
>     Hi Dan,
>
>     I'd say this is a result of a few assumptions.
>
>      1. We try to separate the concept of format from the connector.
>         Therefore we did not make too many assumption which connector
>         does a format work with.
>      2. Avro needs the original schema that the incoming record was
>         serialized with. It will not work with just an expected
>         schema. Even if it is "compatible" with the old one.
>      3. The most common use case for Avro we see is that it is used
>         for stream processing (e.g. Avro encoded messages in Kafka).
>         In that scenario we do not have the schema encoded alongside
>         the data. Therefore we assume the DDL is the only source of
>         truth for the schema of the data (that is also true for other
>         formats such as e.g. JSON or CSV). I agree in case of Avro
>         files we have the original schema encoded in files. It would
>         contradict with the assumption that DDL is the original schema. 
>
>     However I think it is a valid scenario to support such a case for
>     the combination of avro + filesystem. Honestly we do that already
>     for avro-schema-registry format (where we look up the writer
>     schema in SR and convert to the schema of DDL). Moreover it should
>     be relatively easy to do that. @Jingsong What do you think?
>
>     Best,
>
>     Dawid
>
>
>     On 16/09/2020 09:49, Dan Hill wrote:
>>     I might be misunderstanding Flink Avro support.  I assumed not
>>     including a field in "CREATE TABLE" would work fine.  If I leave
>>     out any field before a nested row, "CREATE TABLE" fails.  If I
>>     include all of the fields, this succeeds.  I assumed fields would
>>     be optional.
>>     *
>>     *
>>     I'm using Flink v1.11.1 with the Table SQL API.
>>     *
>>     *
>>     *Problem*
>>     If I do not include one of the fields, I get the following
>>     exception.  If I add back the missing field, "contentId", this works.
>>     "CREATE TABLE `default.mydb.mytable` (\n" +
>>             "`userId` STRING, \n" +
>>             "`timeEpochMillis` BIGINT, \n" +
>>             //"`contentId` BIGINT, \n" +        "`contentDetails` ROW<\n" +
>>                 "`contentId` BIGINT >\n" +") WITH (...)\n"
>>
>>     Caused by: java.lang.ClassCastException: java.lang.Long cannot be
>>     cast to org.apache.avro.generic.IndexedRecord
>>         at
>>     
>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:203)
>>         at
>>     
>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221)
>>         at
>>     
>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206)
>>         at
>>     
>> org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(AvroFileSystemFormatFactory.java:204)
>>         at
>>     
>> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
>>         at
>>     
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>         at
>>     
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>         at
>>     
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to