>
> Can you provide a minimal reproducer (without confluent schema registry)
> with a valid input?
>

Please download and unzip the attached file.

   - src/main/avro/MyProtocol.avdl
      - MyRecord, MyEntry, and the MyEnumType is defined
      - "mvn generate-sources" will auto-generate Java classes under
      "target/generated-sources"
   - "org.example.fs" contains
      - "org.example.fs.Writer" which writes a single record of MyRecord
      type to "output.avro"
      - "org.example.fs.Reader" which reads the record from "output.avro"
      - "org.example.fs.ExampleFromFileSystem" executes CREATE TABLE
      defined in "my_table.ddl" and shows that it successfully deserialize
      MyRecord from a Avro record written in a file as you mentioned.
   - "org.example.kafka.ExampleFromKafkaAndSR" does almost the same as
   "org.example.fs.ExampleFromFileSystem" except that it reads from Kafka and
   looks up the schema from Schema Registry
      - However, it produces the same exception unlike
      ExampleFromFileSystem
      - What I produced to a Kafka topic is {"entries": [{"type":
      "TypeVal1"}, {"type": "TypeVal2"}, {"type": "TypeVal2"}]} which is a Avro
      record saved on output.avro.
      - The size of "output.avro" is 321 bytes on the disk while the size
      of the value of a Kafka record is 10 bytes.

Hope this provides enough information.

Best,

Dongwon

On Wed, Oct 13, 2021 at 4:50 PM Francesco Guardiani <france...@ververica.com>
wrote:

> First of all, are you sure the input data is correct? From the stacktrace
> it seems to me the issue might be that the input data is invalid.
>
> Looking at the code of AvroToRowDataConverters, It sounds like STRING
> should work with avro enums. Can you provide a minimal reproducer (without
> confluent schema registry) with a valid input?
>
> On Tue, Oct 12, 2021 at 6:19 PM Dongwon Kim <eastcirc...@gmail.com> wrote:
>
>> Hi community,
>>
>> Can I get advice on this question?
>>
>> Another user just sent me an email asking whether I found a solution or a
>> workaround for this question, but I'm still stuck there.
>>
>> Any suggestions?
>>
>> Thanks in advance,
>>
>> Dongwon
>>
>> ---------- Forwarded message ---------
>> From: Dongwon Kim <eastcirc...@gmail.com>
>> Date: Mon, Aug 9, 2021 at 7:26 PM
>> Subject: How to deserialize Avro enum type in Flink SQL?
>> To: user <user@flink.apache.org>
>>
>>
>> Hi community,
>>
>> I have a Kafka topic where the schema of its values is defined by the
>> "MyRecord" record in the following Avro IDL and registered to the Confluent
>> Schema Registry.
>>
>>> @namespace("my.type.avro")
>>> protocol MyProtocol {
>>>   enum MyEnumType {
>>>     TypeVal1, TypeVal2
>>>   }
>>>   record MyEntry {
>>>     MyEnumType type;
>>>   }
>>>   record MyRecord {
>>>     array<MyEntry> entries;
>>>   }
>>> }
>>
>>
>> To read from the topic, I've defined the following DDL:
>>
>>> CREATE TABLE my_table
>>
>> (
>>>     `entries` ARRAY<ROW<
>>>         *`type` ??? (This is the main question)*
>>>     >>
>>> ) WITH (
>>>     'connector' = 'kafka',
>>>     'topic' = 'my-topic',
>>>     'properties.bootstrap.servers' = '...:9092',
>>>     'scan.startup.mode' = 'latest-offset',
>>>     'value.format' = 'avro-confluent',
>>>     'value.avro-confluent.schema-registry.url' = 'http://...:8081'
>>>
>> )
>>
>>
>> And I run the following query :
>>
>>> SELECT * FROM my_table
>>
>>
>> Now I got the following messages in Flink-1.13.1 when I use *STRING* for
>> the type:
>>
>>> *Caused by: java.io.IOException: Failed to deserialize Avro record.*
>>>   at
>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>>>   at
>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>>>   at
>>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>>>   at
>>> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>>>   at
>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>>>   at
>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>>>   at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>>>   at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>>>   at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>>>   at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>>> *Caused by: org.apache.avro.AvroTypeException: Found
>>> my.type.avro.MyEnumType, expecting union*
>>>   at
>>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>>>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>>>   at
>>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>>   at
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>   at
>>> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>>>   at
>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>>>   ... 9 more
>>
>> The reason I use the STRING type is just for fast-prototyping.
>>
>> While reading through [1], I've been thinking about using *RAW('class',
>> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure
>> whether it is a good idea and if so, what can be a value for the snapshot.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw
>>
>> Thanks in advance,
>>
>> Dongwon
>>
>

Attachment: flink-deser-avro-enum.tar.gz
Description: GNU Zip compressed data

Reply via email to