Hi Dan,

I'd say this is a result of a few assumptions.

 1. We try to separate the concept of format from the connector.
    Therefore we did not make too many assumption which connector does a
    format work with.
 2. Avro needs the original schema that the incoming record was
    serialized with. It will not work with just an expected schema. Even
    if it is "compatible" with the old one.
 3. The most common use case for Avro we see is that it is used for
    stream processing (e.g. Avro encoded messages in Kafka). In that
    scenario we do not have the schema encoded alongside the data.
    Therefore we assume the DDL is the only source of truth for the
    schema of the data (that is also true for other formats such as e.g.
    JSON or CSV). I agree in case of Avro files we have the original
    schema encoded in files. It would contradict with the assumption
    that DDL is the original schema. 

However I think it is a valid scenario to support such a case for the
combination of avro + filesystem. Honestly we do that already for
avro-schema-registry format (where we look up the writer schema in SR
and convert to the schema of DDL). Moreover it should be relatively easy
to do that. @Jingsong What do you think?

Best,

Dawid


On 16/09/2020 09:49, Dan Hill wrote:
> I might be misunderstanding Flink Avro support.  I assumed not
> including a field in "CREATE TABLE" would work fine.  If I leave out
> any field before a nested row, "CREATE TABLE" fails.  If I include all
> of the fields, this succeeds.  I assumed fields would be optional.
> *
> *
> I'm using Flink v1.11.1 with the Table SQL API.
> *
> *
> *Problem*
> If I do not include one of the fields, I get the following exception. 
> If I add back the missing field, "contentId", this works.
> "CREATE TABLE `default.mydb.mytable` (\n" +
>         "`userId` STRING, \n" +
>         "`timeEpochMillis` BIGINT, \n" +
>         //"`contentId` BIGINT, \n" +        "`contentDetails` ROW<\n" +
>             "`contentId` BIGINT >\n" +") WITH (...)\n"
>
> Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast
> to org.apache.avro.generic.IndexedRecord
>     at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:203)
>     at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221)
>     at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206)
>     at
> org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(AvroFileSystemFormatFactory.java:204)
>     at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
>     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>     at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to