Thanks Dawid! You answered most of my questions: 1) Kafka to Flink - Is the most common practice to use the Confluent Schema Registry and then use ConfluentRegistryAvroDeserializationSchema? 2) Flink to State - great. 3) Flink to Avro file output - great. 4) Avro file output to Flink (batch) - Yes, I assumed this convert from the schema in the file to the SQL schema.
I'm sorry if this was a basic question. The previous systems I've designed use Protobufs. Evolution was a lot easier (if tags are backwards compatible). On Thu, Sep 17, 2020 at 3:33 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi Dan, > > It depends which part of the system you have in mind. Generally though > Avro itself does need the original schema of the record it was written > with. There are a couple of alternatives. You have > RegistryAvroDeserializationSchema for DataStream, which looks up the old > schema in schema registry (It will be exposed in SQL in 1.12[1]). If we are > talking about schema migration for state objects, we persist the schema > with which the state was written. So that upon restore we have the old > schema and possibly new schema. > > In case of avro files we could/should probably use the schema from the > file. > > Best, > > Dawid > > > [1] https://issues.apache.org/jira/browse/FLINK-16048 > On 16/09/2020 21:20, Dan Hill wrote: > > Interesting. How does schema evolution work with Avro and Flink? E.g. > adding new fields or enum values. > > On Wed, Sep 16, 2020 at 12:13 PM Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > >> Hi Dan, >> >> I'd say this is a result of a few assumptions. >> >> 1. We try to separate the concept of format from the connector. >> Therefore we did not make too many assumption which connector does a >> format >> work with. >> 2. Avro needs the original schema that the incoming record was >> serialized with. It will not work with just an expected schema. Even if it >> is "compatible" with the old one. >> 3. The most common use case for Avro we see is that it is used for >> stream processing (e.g. Avro encoded messages in Kafka). In that scenario >> we do not have the schema encoded alongside the data. Therefore we assume >> the DDL is the only source of truth for the schema of the data (that is >> also true for other formats such as e.g. JSON or CSV). I agree in case of >> Avro files we have the original schema encoded in files. It would >> contradict with the assumption that DDL is the original schema. >> >> However I think it is a valid scenario to support such a case for the >> combination of avro + filesystem. Honestly we do that already for >> avro-schema-registry format (where we look up the writer schema in SR and >> convert to the schema of DDL). Moreover it should be relatively easy to do >> that. @Jingsong What do you think? >> >> Best, >> >> Dawid >> >> >> On 16/09/2020 09:49, Dan Hill wrote: >> >> I might be misunderstanding Flink Avro support. I assumed not including >> a field in "CREATE TABLE" would work fine. If I leave out any field before >> a nested row, "CREATE TABLE" fails. If I include all of the fields, this >> succeeds. I assumed fields would be optional. >> >> I'm using Flink v1.11.1 with the Table SQL API. >> >> *Problem* >> If I do not include one of the fields, I get the following exception. If >> I add back the missing field, "contentId", this works. >> >> "CREATE TABLE `default.mydb.mytable` (\n" + >> "`userId` STRING, \n" + >> "`timeEpochMillis` BIGINT, \n" + >> //"`contentId` BIGINT, \n" + "`contentDetails` ROW<\n" + >> "`contentId` BIGINT >\n" + ") WITH (...)\n" >> >> >> Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast >> to org.apache.avro.generic.IndexedRecord >> at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema >> .lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema >> .java:203) >> at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema >> .lambda$createNullableConverter$c3bac5d8$1( >> AvroRowDataDeserializationSchema.java:221) >> at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema >> .lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema >> .java:206) >> at org.apache.flink.formats.avro. >> AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord( >> AvroFileSystemFormatFactory.java:204) >> at org.apache.flink.streaming.api.functions.source. >> InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) >> at org.apache.flink.streaming.api.operators.StreamSource.run( >> StreamSource.java:100) >> at org.apache.flink.streaming.api.operators.StreamSource.run( >> StreamSource.java:63) >> at org.apache.flink.streaming.runtime.tasks. >> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201 >> ) >> >>