Just as an idea for a workaround as Flink apparently expects the enum field to be nullable.
record MyEntry { MyEnumType type; <- make that nullable } Of course that is only an option if you are able to change the producer. On Thu, Oct 14, 2021 at 11:17 AM Francesco Guardiani < france...@ververica.com> wrote: > It reproduces on my machine, so I've opened a JIRA issue about that: > FLINK-24544 <https://issues.apache.org/jira/browse/FLINK-24544>. > Unfortunately, I don't have any ready to use workarounds for you. > > On Wed, Oct 13, 2021 at 8:43 PM Dongwon Kim <eastcirc...@gmail.com> wrote: > >> Can you provide a minimal reproducer (without confluent schema registry) >>> with a valid input? >>> >> >> Please download and unzip the attached file. >> >> - src/main/avro/MyProtocol.avdl >> - MyRecord, MyEntry, and the MyEnumType is defined >> - "mvn generate-sources" will auto-generate Java classes under >> "target/generated-sources" >> - "org.example.fs" contains >> - "org.example.fs.Writer" which writes a single record of MyRecord >> type to "output.avro" >> - "org.example.fs.Reader" which reads the record from >> "output.avro" >> - "org.example.fs.ExampleFromFileSystem" executes CREATE TABLE >> defined in "my_table.ddl" and shows that it successfully deserialize >> MyRecord from a Avro record written in a file as you mentioned. >> - "org.example.kafka.ExampleFromKafkaAndSR" does almost the same as >> "org.example.fs.ExampleFromFileSystem" except that it reads from Kafka and >> looks up the schema from Schema Registry >> - However, it produces the same exception unlike >> ExampleFromFileSystem >> - What I produced to a Kafka topic is {"entries": [{"type": >> "TypeVal1"}, {"type": "TypeVal2"}, {"type": "TypeVal2"}]} which is a >> Avro >> record saved on output.avro. >> - The size of "output.avro" is 321 bytes on the disk while the >> size of the value of a Kafka record is 10 bytes. >> >> Hope this provides enough information. >> >> Best, >> >> Dongwon >> >> On Wed, Oct 13, 2021 at 4:50 PM Francesco Guardiani < >> france...@ververica.com> wrote: >> >>> First of all, are you sure the input data is correct? From the >>> stacktrace it seems to me the issue might be that the input data is invalid. >>> >>> Looking at the code of AvroToRowDataConverters, It sounds like STRING >>> should work with avro enums. Can you provide a minimal reproducer (without >>> confluent schema registry) with a valid input? >>> >>> On Tue, Oct 12, 2021 at 6:19 PM Dongwon Kim <eastcirc...@gmail.com> >>> wrote: >>> >>>> Hi community, >>>> >>>> Can I get advice on this question? >>>> >>>> Another user just sent me an email asking whether I found a solution or >>>> a workaround for this question, but I'm still stuck there. >>>> >>>> Any suggestions? >>>> >>>> Thanks in advance, >>>> >>>> Dongwon >>>> >>>> ---------- Forwarded message --------- >>>> From: Dongwon Kim <eastcirc...@gmail.com> >>>> Date: Mon, Aug 9, 2021 at 7:26 PM >>>> Subject: How to deserialize Avro enum type in Flink SQL? >>>> To: user <user@flink.apache.org> >>>> >>>> >>>> Hi community, >>>> >>>> I have a Kafka topic where the schema of its values is defined by the >>>> "MyRecord" record in the following Avro IDL and registered to the Confluent >>>> Schema Registry. >>>> >>>>> @namespace("my.type.avro") >>>>> protocol MyProtocol { >>>>> enum MyEnumType { >>>>> TypeVal1, TypeVal2 >>>>> } >>>>> record MyEntry { >>>>> MyEnumType type; >>>>> } >>>>> record MyRecord { >>>>> array<MyEntry> entries; >>>>> } >>>>> } >>>> >>>> >>>> To read from the topic, I've defined the following DDL: >>>> >>>>> CREATE TABLE my_table >>>> >>>> ( >>>>> `entries` ARRAY<ROW< >>>>> *`type` ??? (This is the main question)* >>>>> >> >>>>> ) WITH ( >>>>> 'connector' = 'kafka', >>>>> 'topic' = 'my-topic', >>>>> 'properties.bootstrap.servers' = '...:9092', >>>>> 'scan.startup.mode' = 'latest-offset', >>>>> 'value.format' = 'avro-confluent', >>>>> 'value.avro-confluent.schema-registry.url' = 'http://...:8081' >>>>> >>>> ) >>>> >>>> >>>> And I run the following query : >>>> >>>>> SELECT * FROM my_table >>>> >>>> >>>> Now I got the following messages in Flink-1.13.1 when I use *STRING* >>>> for the type: >>>> >>>>> *Caused by: java.io.IOException: Failed to deserialize Avro record.* >>>>> at >>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106) >>>>> at >>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46) >>>>> at >>>>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) >>>>> at >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) >>>>> at >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) >>>>> *Caused by: org.apache.avro.AvroTypeException: Found >>>>> my.type.avro.MyEnumType, expecting union* >>>>> at >>>>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308) >>>>> at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) >>>>> at >>>>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275) >>>>> at >>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) >>>>> at >>>>> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298) >>>>> at >>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183) >>>>> at >>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) >>>>> at >>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) >>>>> at >>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) >>>>> at >>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) >>>>> at >>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) >>>>> at >>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) >>>>> at >>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) >>>>> at >>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>>> at >>>>> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81) >>>>> at >>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103) >>>>> ... 9 more >>>> >>>> The reason I use the STRING type is just for fast-prototyping. >>>> >>>> While reading through [1], I've been thinking about using *RAW('class', >>>> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not >>>> sure whether it is a good idea and if so, what can be a value for the >>>> snapshot. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw >>>> >>>> Thanks in advance, >>>> >>>> Dongwon >>>> >>>