Hi Dan, I'd say this is a result of a few assumptions.
1. We try to separate the concept of format from the connector. Therefore we did not make too many assumption which connector does a format work with. 2. Avro needs the original schema that the incoming record was serialized with. It will not work with just an expected schema. Even if it is "compatible" with the old one. 3. The most common use case for Avro we see is that it is used for stream processing (e.g. Avro encoded messages in Kafka). In that scenario we do not have the schema encoded alongside the data. Therefore we assume the DDL is the only source of truth for the schema of the data (that is also true for other formats such as e.g. JSON or CSV). I agree in case of Avro files we have the original schema encoded in files. It would contradict with the assumption that DDL is the original schema. However I think it is a valid scenario to support such a case for the combination of avro + filesystem. Honestly we do that already for avro-schema-registry format (where we look up the writer schema in SR and convert to the schema of DDL). Moreover it should be relatively easy to do that. @Jingsong What do you think? Best, Dawid On 16/09/2020 09:49, Dan Hill wrote: > I might be misunderstanding Flink Avro support. I assumed not > including a field in "CREATE TABLE" would work fine. If I leave out > any field before a nested row, "CREATE TABLE" fails. If I include all > of the fields, this succeeds. I assumed fields would be optional. > * > * > I'm using Flink v1.11.1 with the Table SQL API. > * > * > *Problem* > If I do not include one of the fields, I get the following exception. > If I add back the missing field, "contentId", this works. > "CREATE TABLE `default.mydb.mytable` (\n" + > "`userId` STRING, \n" + > "`timeEpochMillis` BIGINT, \n" + > //"`contentId` BIGINT, \n" + "`contentDetails` ROW<\n" + > "`contentId` BIGINT >\n" +") WITH (...)\n" > > Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast > to org.apache.avro.generic.IndexedRecord > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:203) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:221) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:206) > at > org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(AvroFileSystemFormatFactory.java:204) > at > org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
signature.asc
Description: OpenPGP digital signature