Just as an idea for a workaround as Flink apparently expects the enum field
to be nullable.

  record MyEntry {
    MyEnumType type; <- make that nullable
  }

Of course that is only an option if you are able to change the producer.

On Thu, Oct 14, 2021 at 11:17 AM Francesco Guardiani <
france...@ververica.com> wrote:

> It reproduces on my machine, so I've opened a JIRA issue about that:
> FLINK-24544 <https://issues.apache.org/jira/browse/FLINK-24544>.
> Unfortunately, I don't have any ready to use workarounds for you.
>
> On Wed, Oct 13, 2021 at 8:43 PM Dongwon Kim <eastcirc...@gmail.com> wrote:
>
>> Can you provide a minimal reproducer (without confluent schema registry)
>>> with a valid input?
>>>
>>
>> Please download and unzip the attached file.
>>
>>    - src/main/avro/MyProtocol.avdl
>>       - MyRecord, MyEntry, and the MyEnumType is defined
>>       - "mvn generate-sources" will auto-generate Java classes under
>>       "target/generated-sources"
>>    - "org.example.fs" contains
>>       - "org.example.fs.Writer" which writes a single record of MyRecord
>>       type to "output.avro"
>>       - "org.example.fs.Reader" which reads the record from
>>       "output.avro"
>>       - "org.example.fs.ExampleFromFileSystem" executes CREATE TABLE
>>       defined in "my_table.ddl" and shows that it successfully deserialize
>>       MyRecord from a Avro record written in a file as you mentioned.
>>    - "org.example.kafka.ExampleFromKafkaAndSR" does almost the same as
>>    "org.example.fs.ExampleFromFileSystem" except that it reads from Kafka and
>>    looks up the schema from Schema Registry
>>       - However, it produces the same exception unlike
>>       ExampleFromFileSystem
>>       - What I produced to a Kafka topic is {"entries": [{"type":
>>       "TypeVal1"}, {"type": "TypeVal2"}, {"type": "TypeVal2"}]} which is a 
>> Avro
>>       record saved on output.avro.
>>       - The size of "output.avro" is 321 bytes on the disk while the
>>       size of the value of a Kafka record is 10 bytes.
>>
>> Hope this provides enough information.
>>
>> Best,
>>
>> Dongwon
>>
>> On Wed, Oct 13, 2021 at 4:50 PM Francesco Guardiani <
>> france...@ververica.com> wrote:
>>
>>> First of all, are you sure the input data is correct? From the
>>> stacktrace it seems to me the issue might be that the input data is invalid.
>>>
>>> Looking at the code of AvroToRowDataConverters, It sounds like STRING
>>> should work with avro enums. Can you provide a minimal reproducer (without
>>> confluent schema registry) with a valid input?
>>>
>>> On Tue, Oct 12, 2021 at 6:19 PM Dongwon Kim <eastcirc...@gmail.com>
>>> wrote:
>>>
>>>> Hi community,
>>>>
>>>> Can I get advice on this question?
>>>>
>>>> Another user just sent me an email asking whether I found a solution or
>>>> a workaround for this question, but I'm still stuck there.
>>>>
>>>> Any suggestions?
>>>>
>>>> Thanks in advance,
>>>>
>>>> Dongwon
>>>>
>>>> ---------- Forwarded message ---------
>>>> From: Dongwon Kim <eastcirc...@gmail.com>
>>>> Date: Mon, Aug 9, 2021 at 7:26 PM
>>>> Subject: How to deserialize Avro enum type in Flink SQL?
>>>> To: user <user@flink.apache.org>
>>>>
>>>>
>>>> Hi community,
>>>>
>>>> I have a Kafka topic where the schema of its values is defined by the
>>>> "MyRecord" record in the following Avro IDL and registered to the Confluent
>>>> Schema Registry.
>>>>
>>>>> @namespace("my.type.avro")
>>>>> protocol MyProtocol {
>>>>>   enum MyEnumType {
>>>>>     TypeVal1, TypeVal2
>>>>>   }
>>>>>   record MyEntry {
>>>>>     MyEnumType type;
>>>>>   }
>>>>>   record MyRecord {
>>>>>     array<MyEntry> entries;
>>>>>   }
>>>>> }
>>>>
>>>>
>>>> To read from the topic, I've defined the following DDL:
>>>>
>>>>> CREATE TABLE my_table
>>>>
>>>> (
>>>>>     `entries` ARRAY<ROW<
>>>>>         *`type` ??? (This is the main question)*
>>>>>     >>
>>>>> ) WITH (
>>>>>     'connector' = 'kafka',
>>>>>     'topic' = 'my-topic',
>>>>>     'properties.bootstrap.servers' = '...:9092',
>>>>>     'scan.startup.mode' = 'latest-offset',
>>>>>     'value.format' = 'avro-confluent',
>>>>>     'value.avro-confluent.schema-registry.url' = 'http://...:8081'
>>>>>
>>>> )
>>>>
>>>>
>>>> And I run the following query :
>>>>
>>>>> SELECT * FROM my_table
>>>>
>>>>
>>>> Now I got the following messages in Flink-1.13.1 when I use *STRING*
>>>> for the type:
>>>>
>>>>> *Caused by: java.io.IOException: Failed to deserialize Avro record.*
>>>>>   at
>>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>>>>>   at
>>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>>>>>   at
>>>>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>>>>>   at
>>>>> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>>>>>   at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>>>>>   at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>>>>>   at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>>>>>   at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>>>>>   at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>>>>>   at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>>>>> *Caused by: org.apache.avro.AvroTypeException: Found
>>>>> my.type.avro.MyEnumType, expecting union*
>>>>>   at
>>>>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>>>>>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>>>>>   at
>>>>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>>>   at
>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>>   at
>>>>> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>>>>>   at
>>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>>>>>   ... 9 more
>>>>
>>>> The reason I use the STRING type is just for fast-prototyping.
>>>>
>>>> While reading through [1], I've been thinking about using *RAW('class',
>>>> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not
>>>> sure whether it is a good idea and if so, what can be a value for the
>>>> snapshot.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw
>>>>
>>>> Thanks in advance,
>>>>
>>>> Dongwon
>>>>
>>>

Reply via email to