Interesting. How does schema evolution work with Avro and Flink? E.g. adding new fields or enum values.
On Wed, Sep 16, 2020 at 12:13 PM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi Dan, > > I'd say this is a result of a few assumptions. > > 1. We try to separate the concept of format from the connector. > Therefore we did not make too many assumption which connector does a format > work with. > 2. Avro needs the original schema that the incoming record was > serialized with. It will not work with just an expected schema. Even if it > is "compatible" with the old one. > 3. The most common use case for Avro we see is that it is used for > stream processing (e.g. Avro encoded messages in Kafka). In that scenario > we do not have the schema encoded alongside the data. Therefore we assume > the DDL is the only source of truth for the schema of the data (that is > also true for other formats such as e.g. JSON or CSV). I agree in case of > Avro files we have the original schema encoded in files. It would > contradict with the assumption that DDL is the original schema. > > However I think it is a valid scenario to support such a case for the > combination of avro + filesystem. Honestly we do that already for > avro-schema-registry format (where we look up the writer schema in SR and > convert to the schema of DDL). Moreover it should be relatively easy to do > that. @Jingsong What do you think? > > Best, > > Dawid > > > On 16/09/2020 09:49, Dan Hill wrote: > > I might be misunderstanding Flink Avro support. I assumed not including a > field in "CREATE TABLE" would work fine. If I leave out any field before a > nested row, "CREATE TABLE" fails. If I include all of the fields, this > succeeds. I assumed fields would be optional. > > I'm using Flink v1.11.1 with the Table SQL API. > > *Problem* > If I do not include one of the fields, I get the following exception. If > I add back the missing field, "contentId", this works. > > "CREATE TABLE `default.mydb.mytable` (\n" + > "`userId` STRING, \n" + > "`timeEpochMillis` BIGINT, \n" + > //"`contentId` BIGINT, \n" + "`contentDetails` ROW<\n" + > "`contentId` BIGINT >\n" + ") WITH (...)\n" > > > Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to > org.apache.avro.generic.IndexedRecord > at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema > .lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema > .java:203) > at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema > .lambda$createNullableConverter$c3bac5d8$1( > AvroRowDataDeserializationSchema.java:221) > at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema > .lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema > .java:206) > at org.apache.flink.formats.avro. > AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord( > AvroFileSystemFormatFactory.java:204) > at org.apache.flink.streaming.api.functions.source. > InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) > at org.apache.flink.streaming.api.operators.StreamSource.run( > StreamSource.java:100) > at org.apache.flink.streaming.api.operators.StreamSource.run( > StreamSource.java:63) > at org.apache.flink.streaming.runtime.tasks. > SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > >