Interesting.  How does schema evolution work with Avro and Flink?  E.g.
adding new fields or enum values.

On Wed, Sep 16, 2020 at 12:13 PM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi Dan,
>
> I'd say this is a result of a few assumptions.
>
>    1. We try to separate the concept of format from the connector.
>    Therefore we did not make too many assumption which connector does a format
>    work with.
>    2. Avro needs the original schema that the incoming record was
>    serialized with. It will not work with just an expected schema. Even if it
>    is "compatible" with the old one.
>    3. The most common use case for Avro we see is that it is used for
>    stream processing (e.g. Avro encoded messages in Kafka). In that scenario
>    we do not have the schema encoded alongside the data. Therefore we assume
>    the DDL is the only source of truth for the schema of the data (that is
>    also true for other formats such as e.g. JSON or CSV). I agree in case of
>    Avro files we have the original schema encoded in files. It would
>    contradict with the assumption that DDL is the original schema.
>
> However I think it is a valid scenario to support such a case for the
> combination of avro + filesystem. Honestly we do that already for
> avro-schema-registry format (where we look up the writer schema in SR and
> convert to the schema of DDL). Moreover it should be relatively easy to do
> that. @Jingsong What do you think?
>
> Best,
>
> Dawid
>
>
> On 16/09/2020 09:49, Dan Hill wrote:
>
> I might be misunderstanding Flink Avro support.  I assumed not including a
> field in "CREATE TABLE" would work fine.  If I leave out any field before a
> nested row, "CREATE TABLE" fails.  If I include all of the fields, this
> succeeds.  I assumed fields would be optional.
>
> I'm using Flink v1.11.1 with the Table SQL API.
>
> *Problem*
> If I do not include one of the fields, I get the following exception.  If
> I add back the missing field, "contentId", this works.
>
> "CREATE TABLE `default.mydb.mytable` (\n" +
>         "`userId` STRING, \n" +
>         "`timeEpochMillis` BIGINT, \n" +
>         //"`contentId` BIGINT, \n" +        "`contentDetails` ROW<\n" +
>             "`contentId` BIGINT >\n" +        ") WITH (...)\n"
>
>
> Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to
> org.apache.avro.generic.IndexedRecord
>     at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema
> .lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema
> .java:203)
>     at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema
> .lambda$createNullableConverter$c3bac5d8$1(
> AvroRowDataDeserializationSchema.java:221)
>     at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema
> .lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema
> .java:206)
>     at org.apache.flink.formats.avro.
> AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(
> AvroFileSystemFormatFactory.java:204)
>     at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:100)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:63)
>     at org.apache.flink.streaming.runtime.tasks.
> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>
>

Reply via email to