Thanks Dawid!

You answered most of my questions:
1) Kafka to Flink - Is the most common practice to use the Confluent Schema
Registry and then use ConfluentRegistryAvroDeserializationSchema?
2) Flink to State - great.
3) Flink to Avro file output - great.
4) Avro file output to Flink (batch) - Yes, I assumed this convert from the
schema in the file to the SQL schema.

I'm sorry if this was a basic question.  The previous systems I've designed
use Protobufs.  Evolution was a lot easier (if tags are backwards
compatible).



On Thu, Sep 17, 2020 at 3:33 AM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi Dan,
>
> It depends which part of the system you have in mind. Generally though
> Avro itself does need the original schema of the record it was written
> with. There are a couple of alternatives. You have
> RegistryAvroDeserializationSchema for DataStream, which looks up the old
> schema in schema registry (It will be exposed in SQL in 1.12[1]). If we are
> talking about schema migration for state objects, we persist the schema
> with which the state was written. So that upon restore we have the old
> schema and possibly new schema.
>
> In case of avro files we could/should probably use the schema from the
> file.
>
> Best,
>
> Dawid
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-16048
> On 16/09/2020 21:20, Dan Hill wrote:
>
> Interesting.  How does schema evolution work with Avro and Flink?  E.g.
> adding new fields or enum values.
>
> On Wed, Sep 16, 2020 at 12:13 PM Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
>> Hi Dan,
>>
>> I'd say this is a result of a few assumptions.
>>
>>    1. We try to separate the concept of format from the connector.
>>    Therefore we did not make too many assumption which connector does a 
>> format
>>    work with.
>>    2. Avro needs the original schema that the incoming record was
>>    serialized with. It will not work with just an expected schema. Even if it
>>    is "compatible" with the old one.
>>    3. The most common use case for Avro we see is that it is used for
>>    stream processing (e.g. Avro encoded messages in Kafka). In that scenario
>>    we do not have the schema encoded alongside the data. Therefore we assume
>>    the DDL is the only source of truth for the schema of the data (that is
>>    also true for other formats such as e.g. JSON or CSV). I agree in case of
>>    Avro files we have the original schema encoded in files. It would
>>    contradict with the assumption that DDL is the original schema.
>>
>> However I think it is a valid scenario to support such a case for the
>> combination of avro + filesystem. Honestly we do that already for
>> avro-schema-registry format (where we look up the writer schema in SR and
>> convert to the schema of DDL). Moreover it should be relatively easy to do
>> that. @Jingsong What do you think?
>>
>> Best,
>>
>> Dawid
>>
>>
>> On 16/09/2020 09:49, Dan Hill wrote:
>>
>> I might be misunderstanding Flink Avro support.  I assumed not including
>> a field in "CREATE TABLE" would work fine.  If I leave out any field before
>> a nested row, "CREATE TABLE" fails.  If I include all of the fields, this
>> succeeds.  I assumed fields would be optional.
>>
>> I'm using Flink v1.11.1 with the Table SQL API.
>>
>> *Problem*
>> If I do not include one of the fields, I get the following exception.  If
>> I add back the missing field, "contentId", this works.
>>
>> "CREATE TABLE `default.mydb.mytable` (\n" +
>>         "`userId` STRING, \n" +
>>         "`timeEpochMillis` BIGINT, \n" +
>>         //"`contentId` BIGINT, \n" +        "`contentDetails` ROW<\n" +
>>             "`contentId` BIGINT >\n" +        ") WITH (...)\n"
>>
>>
>> Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast
>> to org.apache.avro.generic.IndexedRecord
>>     at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema
>> .lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema
>> .java:203)
>>     at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema
>> .lambda$createNullableConverter$c3bac5d8$1(
>> AvroRowDataDeserializationSchema.java:221)
>>     at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema
>> .lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema
>> .java:206)
>>     at org.apache.flink.formats.avro.
>> AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(
>> AvroFileSystemFormatFactory.java:204)
>>     at org.apache.flink.streaming.api.functions.source.
>> InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
>>     at org.apache.flink.streaming.api.operators.StreamSource.run(
>> StreamSource.java:100)
>>     at org.apache.flink.streaming.api.operators.StreamSource.run(
>> StreamSource.java:63)
>>     at org.apache.flink.streaming.runtime.tasks.
>> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201
>> )
>>
>>

Reply via email to