It reproduces on my machine, so I've opened a JIRA issue about that: FLINK-24544 <https://issues.apache.org/jira/browse/FLINK-24544>. Unfortunately, I don't have any ready to use workarounds for you.
On Wed, Oct 13, 2021 at 8:43 PM Dongwon Kim <eastcirc...@gmail.com> wrote: > Can you provide a minimal reproducer (without confluent schema registry) >> with a valid input? >> > > Please download and unzip the attached file. > > - src/main/avro/MyProtocol.avdl > - MyRecord, MyEntry, and the MyEnumType is defined > - "mvn generate-sources" will auto-generate Java classes under > "target/generated-sources" > - "org.example.fs" contains > - "org.example.fs.Writer" which writes a single record of MyRecord > type to "output.avro" > - "org.example.fs.Reader" which reads the record from "output.avro" > - "org.example.fs.ExampleFromFileSystem" executes CREATE TABLE > defined in "my_table.ddl" and shows that it successfully deserialize > MyRecord from a Avro record written in a file as you mentioned. > - "org.example.kafka.ExampleFromKafkaAndSR" does almost the same as > "org.example.fs.ExampleFromFileSystem" except that it reads from Kafka and > looks up the schema from Schema Registry > - However, it produces the same exception unlike > ExampleFromFileSystem > - What I produced to a Kafka topic is {"entries": [{"type": > "TypeVal1"}, {"type": "TypeVal2"}, {"type": "TypeVal2"}]} which is a > Avro > record saved on output.avro. > - The size of "output.avro" is 321 bytes on the disk while the size > of the value of a Kafka record is 10 bytes. > > Hope this provides enough information. > > Best, > > Dongwon > > On Wed, Oct 13, 2021 at 4:50 PM Francesco Guardiani < > france...@ververica.com> wrote: > >> First of all, are you sure the input data is correct? From the stacktrace >> it seems to me the issue might be that the input data is invalid. >> >> Looking at the code of AvroToRowDataConverters, It sounds like STRING >> should work with avro enums. Can you provide a minimal reproducer (without >> confluent schema registry) with a valid input? >> >> On Tue, Oct 12, 2021 at 6:19 PM Dongwon Kim <eastcirc...@gmail.com> >> wrote: >> >>> Hi community, >>> >>> Can I get advice on this question? >>> >>> Another user just sent me an email asking whether I found a solution or >>> a workaround for this question, but I'm still stuck there. >>> >>> Any suggestions? >>> >>> Thanks in advance, >>> >>> Dongwon >>> >>> ---------- Forwarded message --------- >>> From: Dongwon Kim <eastcirc...@gmail.com> >>> Date: Mon, Aug 9, 2021 at 7:26 PM >>> Subject: How to deserialize Avro enum type in Flink SQL? >>> To: user <user@flink.apache.org> >>> >>> >>> Hi community, >>> >>> I have a Kafka topic where the schema of its values is defined by the >>> "MyRecord" record in the following Avro IDL and registered to the Confluent >>> Schema Registry. >>> >>>> @namespace("my.type.avro") >>>> protocol MyProtocol { >>>> enum MyEnumType { >>>> TypeVal1, TypeVal2 >>>> } >>>> record MyEntry { >>>> MyEnumType type; >>>> } >>>> record MyRecord { >>>> array<MyEntry> entries; >>>> } >>>> } >>> >>> >>> To read from the topic, I've defined the following DDL: >>> >>>> CREATE TABLE my_table >>> >>> ( >>>> `entries` ARRAY<ROW< >>>> *`type` ??? (This is the main question)* >>>> >> >>>> ) WITH ( >>>> 'connector' = 'kafka', >>>> 'topic' = 'my-topic', >>>> 'properties.bootstrap.servers' = '...:9092', >>>> 'scan.startup.mode' = 'latest-offset', >>>> 'value.format' = 'avro-confluent', >>>> 'value.avro-confluent.schema-registry.url' = 'http://...:8081' >>>> >>> ) >>> >>> >>> And I run the following query : >>> >>>> SELECT * FROM my_table >>> >>> >>> Now I got the following messages in Flink-1.13.1 when I use *STRING* >>> for the type: >>> >>>> *Caused by: java.io.IOException: Failed to deserialize Avro record.* >>>> at >>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106) >>>> at >>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46) >>>> at >>>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) >>>> *Caused by: org.apache.avro.AvroTypeException: Found >>>> my.type.avro.MyEnumType, expecting union* >>>> at >>>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308) >>>> at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) >>>> at >>>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) >>>> at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>> at >>>> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81) >>>> at >>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103) >>>> ... 9 more >>> >>> The reason I use the STRING type is just for fast-prototyping. >>> >>> While reading through [1], I've been thinking about using *RAW('class', >>> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure >>> whether it is a good idea and if so, what can be a value for the snapshot. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw >>> >>> Thanks in advance, >>> >>> Dongwon >>> >>