Hi & thanks,

with your solution you are referring the the reported exception: 
`Found my.type.avro.MyEnumType, expecting union`

I investigated on the "union" part and added "NOT NULL" to the SQL statement, 
such that the attribute is NOT nullable on avro AND SQL. This actually "fixed" 
the reported exception but the following exception was thrown:
`AvroTypeException: Found org.example.MyEnumType, expecting string`
So there is a problem working with enums on input from Kafka combined with 
confluent schema registry.

I also tried your suggestion, make the attribute nullable in avro and SQL by 
defining it actually as a union in avro (which is expected according to the 
exception). But the exception did not change. Please compare: 
https://issues.apache.org/jira/browse/FLINK-24544?focusedCommentId=17429243&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17429243

Thanks & Best
Peter


On 2021/10/18 08:39:41 Arvid Heise wrote:
> Just as an idea for a workaround as Flink apparently expects the enum field
> to be nullable.
> 
>   record MyEntry {
>     MyEnumType type; <- make that nullable
>   }
> 
> Of course that is only an option if you are able to change the producer.
> 
> On Thu, Oct 14, 2021 at 11:17 AM Francesco Guardiani <
> france...@ververica.com> wrote:
> 
> > It reproduces on my machine, so I've opened a JIRA issue about that:
> > FLINK-24544 <https://issues.apache.org/jira/browse/FLINK-24544>.
> > Unfortunately, I don't have any ready to use workarounds for you.
> >
> > On Wed, Oct 13, 2021 at 8:43 PM Dongwon Kim <eastcirc...@gmail.com> wrote:
> >
> >> Can you provide a minimal reproducer (without confluent schema registry)
> >>> with a valid input?
> >>>
> >>
> >> Please download and unzip the attached file.
> >>
> >>    - src/main/avro/MyProtocol.avdl
> >>       - MyRecord, MyEntry, and the MyEnumType is defined
> >>       - "mvn generate-sources" will auto-generate Java classes under
> >>       "target/generated-sources"
> >>    - "org.example.fs" contains
> >>       - "org.example.fs.Writer" which writes a single record of MyRecord
> >>       type to "output.avro"
> >>       - "org.example.fs.Reader" which reads the record from
> >>       "output.avro"
> >>       - "org.example.fs.ExampleFromFileSystem" executes CREATE TABLE
> >>       defined in "my_table.ddl" and shows that it successfully deserialize
> >>       MyRecord from a Avro record written in a file as you mentioned.
> >>    - "org.example.kafka.ExampleFromKafkaAndSR" does almost the same as
> >>    "org.example.fs.ExampleFromFileSystem" except that it reads from Kafka 
> >> and
> >>    looks up the schema from Schema Registry
> >>       - However, it produces the same exception unlike
> >>       ExampleFromFileSystem
> >>       - What I produced to a Kafka topic is {"entries": [{"type":
> >>       "TypeVal1"}, {"type": "TypeVal2"}, {"type": "TypeVal2"}]} which is a 
> >> Avro
> >>       record saved on output.avro.
> >>       - The size of "output.avro" is 321 bytes on the disk while the
> >>       size of the value of a Kafka record is 10 bytes.
> >>
> >> Hope this provides enough information.
> >>
> >> Best,
> >>
> >> Dongwon
> >>
> >> On Wed, Oct 13, 2021 at 4:50 PM Francesco Guardiani <
> >> france...@ververica.com> wrote:
> >>
> >>> First of all, are you sure the input data is correct? From the
> >>> stacktrace it seems to me the issue might be that the input data is 
> >>> invalid.
> >>>
> >>> Looking at the code of AvroToRowDataConverters, It sounds like STRING
> >>> should work with avro enums. Can you provide a minimal reproducer (without
> >>> confluent schema registry) with a valid input?
> >>>
> >>> On Tue, Oct 12, 2021 at 6:19 PM Dongwon Kim <eastcirc...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi community,
> >>>>
> >>>> Can I get advice on this question?
> >>>>
> >>>> Another user just sent me an email asking whether I found a solution or
> >>>> a workaround for this question, but I'm still stuck there.
> >>>>
> >>>> Any suggestions?
> >>>>
> >>>> Thanks in advance,
> >>>>
> >>>> Dongwon
> >>>>
> >>>> ---------- Forwarded message ---------
> >>>> From: Dongwon Kim <eastcirc...@gmail.com>
> >>>> Date: Mon, Aug 9, 2021 at 7:26 PM
> >>>> Subject: How to deserialize Avro enum type in Flink SQL?
> >>>> To: user <user@flink.apache.org>
> >>>>
> >>>>
> >>>> Hi community,
> >>>>
> >>>> I have a Kafka topic where the schema of its values is defined by the
> >>>> "MyRecord" record in the following Avro IDL and registered to the 
> >>>> Confluent
> >>>> Schema Registry.
> >>>>
> >>>>> @namespace("my.type.avro")
> >>>>> protocol MyProtocol {
> >>>>>   enum MyEnumType {
> >>>>>     TypeVal1, TypeVal2
> >>>>>   }
> >>>>>   record MyEntry {
> >>>>>     MyEnumType type;
> >>>>>   }
> >>>>>   record MyRecord {
> >>>>>     array<MyEntry> entries;
> >>>>>   }
> >>>>> }
> >>>>
> >>>>
> >>>> To read from the topic, I've defined the following DDL:
> >>>>
> >>>>> CREATE TABLE my_table
> >>>>
> >>>> (
> >>>>>     `entries` ARRAY<ROW<
> >>>>>         *`type` ??? (This is the main question)*
> >>>>>     >>
> >>>>> ) WITH (
> >>>>>     'connector' = 'kafka',
> >>>>>     'topic' = 'my-topic',
> >>>>>     'properties.bootstrap.servers' = '...:9092',
> >>>>>     'scan.startup.mode' = 'latest-offset',
> >>>>>     'value.format' = 'avro-confluent',
> >>>>>     'value.avro-confluent.schema-registry.url' = 'http://...:8081'
> >>>>>
> >>>> )
> >>>>
> >>>>
> >>>> And I run the following query :
> >>>>
> >>>>> SELECT * FROM my_table
> >>>>
> >>>>
> >>>> Now I got the following messages in Flink-1.13.1 when I use *STRING*
> >>>> for the type:
> >>>>
> >>>>> *Caused by: java.io.IOException: Failed to deserialize Avro record.*
> >>>>>   at
> >>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
> >>>>>   at
> >>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
> >>>>>   at
> >>>>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> >>>>>   at
> >>>>> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
> >>>>>   at
> >>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> >>>>>   at
> >>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> >>>>>   at
> >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> >>>>>   at
> >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> >>>>>   at
> >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> >>>>>   at
> >>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> >>>>> *Caused by: org.apache.avro.AvroTypeException: Found
> >>>>> my.type.avro.MyEnumType, expecting union*
> >>>>>   at
> >>>>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
> >>>>>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
> >>>>>   at
> >>>>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >>>>>   at
> >>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> >>>>>   at
> >>>>> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
> >>>>>   at
> >>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
> >>>>>   ... 9 more
> >>>>
> >>>> The reason I use the STRING type is just for fast-prototyping.
> >>>>
> >>>> While reading through [1], I've been thinking about using *RAW('class',
> >>>> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not
> >>>> sure whether it is a good idea and if so, what can be a value for the
> >>>> snapshot.
> >>>>
> >>>> [1]
> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw
> >>>>
> >>>> Thanks in advance,
> >>>>
> >>>> Dongwon
> >>>>
> >>>
> 

Reply via email to