It reproduces on my machine, so I've opened a JIRA issue about that:
FLINK-24544 <https://issues.apache.org/jira/browse/FLINK-24544>.
Unfortunately, I don't have any ready to use workarounds for you.

On Wed, Oct 13, 2021 at 8:43 PM Dongwon Kim <eastcirc...@gmail.com> wrote:

> Can you provide a minimal reproducer (without confluent schema registry)
>> with a valid input?
>>
>
> Please download and unzip the attached file.
>
>    - src/main/avro/MyProtocol.avdl
>       - MyRecord, MyEntry, and the MyEnumType is defined
>       - "mvn generate-sources" will auto-generate Java classes under
>       "target/generated-sources"
>    - "org.example.fs" contains
>       - "org.example.fs.Writer" which writes a single record of MyRecord
>       type to "output.avro"
>       - "org.example.fs.Reader" which reads the record from "output.avro"
>       - "org.example.fs.ExampleFromFileSystem" executes CREATE TABLE
>       defined in "my_table.ddl" and shows that it successfully deserialize
>       MyRecord from a Avro record written in a file as you mentioned.
>    - "org.example.kafka.ExampleFromKafkaAndSR" does almost the same as
>    "org.example.fs.ExampleFromFileSystem" except that it reads from Kafka and
>    looks up the schema from Schema Registry
>       - However, it produces the same exception unlike
>       ExampleFromFileSystem
>       - What I produced to a Kafka topic is {"entries": [{"type":
>       "TypeVal1"}, {"type": "TypeVal2"}, {"type": "TypeVal2"}]} which is a 
> Avro
>       record saved on output.avro.
>       - The size of "output.avro" is 321 bytes on the disk while the size
>       of the value of a Kafka record is 10 bytes.
>
> Hope this provides enough information.
>
> Best,
>
> Dongwon
>
> On Wed, Oct 13, 2021 at 4:50 PM Francesco Guardiani <
> france...@ververica.com> wrote:
>
>> First of all, are you sure the input data is correct? From the stacktrace
>> it seems to me the issue might be that the input data is invalid.
>>
>> Looking at the code of AvroToRowDataConverters, It sounds like STRING
>> should work with avro enums. Can you provide a minimal reproducer (without
>> confluent schema registry) with a valid input?
>>
>> On Tue, Oct 12, 2021 at 6:19 PM Dongwon Kim <eastcirc...@gmail.com>
>> wrote:
>>
>>> Hi community,
>>>
>>> Can I get advice on this question?
>>>
>>> Another user just sent me an email asking whether I found a solution or
>>> a workaround for this question, but I'm still stuck there.
>>>
>>> Any suggestions?
>>>
>>> Thanks in advance,
>>>
>>> Dongwon
>>>
>>> ---------- Forwarded message ---------
>>> From: Dongwon Kim <eastcirc...@gmail.com>
>>> Date: Mon, Aug 9, 2021 at 7:26 PM
>>> Subject: How to deserialize Avro enum type in Flink SQL?
>>> To: user <user@flink.apache.org>
>>>
>>>
>>> Hi community,
>>>
>>> I have a Kafka topic where the schema of its values is defined by the
>>> "MyRecord" record in the following Avro IDL and registered to the Confluent
>>> Schema Registry.
>>>
>>>> @namespace("my.type.avro")
>>>> protocol MyProtocol {
>>>>   enum MyEnumType {
>>>>     TypeVal1, TypeVal2
>>>>   }
>>>>   record MyEntry {
>>>>     MyEnumType type;
>>>>   }
>>>>   record MyRecord {
>>>>     array<MyEntry> entries;
>>>>   }
>>>> }
>>>
>>>
>>> To read from the topic, I've defined the following DDL:
>>>
>>>> CREATE TABLE my_table
>>>
>>> (
>>>>     `entries` ARRAY<ROW<
>>>>         *`type` ??? (This is the main question)*
>>>>     >>
>>>> ) WITH (
>>>>     'connector' = 'kafka',
>>>>     'topic' = 'my-topic',
>>>>     'properties.bootstrap.servers' = '...:9092',
>>>>     'scan.startup.mode' = 'latest-offset',
>>>>     'value.format' = 'avro-confluent',
>>>>     'value.avro-confluent.schema-registry.url' = 'http://...:8081'
>>>>
>>> )
>>>
>>>
>>> And I run the following query :
>>>
>>>> SELECT * FROM my_table
>>>
>>>
>>> Now I got the following messages in Flink-1.13.1 when I use *STRING*
>>> for the type:
>>>
>>>> *Caused by: java.io.IOException: Failed to deserialize Avro record.*
>>>>   at
>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>>>>   at
>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>>>>   at
>>>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>>>>   at
>>>> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>>>>   at
>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>>>>   at
>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>>>>   at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>>>>   at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>>>>   at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>>>>   at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>>>> *Caused by: org.apache.avro.AvroTypeException: Found
>>>> my.type.avro.MyEnumType, expecting union*
>>>>   at
>>>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>>>>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>>>>   at
>>>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>>   at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>>   at
>>>> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>>>>   at
>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>>>>   ... 9 more
>>>
>>> The reason I use the STRING type is just for fast-prototyping.
>>>
>>> While reading through [1], I've been thinking about using *RAW('class',
>>> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure
>>> whether it is a good idea and if so, what can be a value for the snapshot.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw
>>>
>>> Thanks in advance,
>>>
>>> Dongwon
>>>
>>

Reply via email to