Hi people,

I found a workaround for that issue - which works at least for my use case.

The main idea was customizing 
"org.apache.flink.formats.avro.registry.confluent.RegistryAvroFormatFactory" 
such that the expected avro schema is not gained from the CREATE TABLE SQL 
statement, rather than passed in as parameter. This results in matching schemas 
(actual and expected) when avro is deserialized under the hood by avro lib.

This gives me the freedom to create a source table exactly matching the records 
in Kafka. In my case the CREATE TABLE SQL statement is generated from the avro 
schema. So actually doing it the other way round as it is done in current 
implementation.

Please compare: https://github.com/peterschrott/flinkDeserAvroEnum

Maybe this helps someone.

Best, Peter


On 2021/10/12 16:18:30 Dongwon Kim wrote:
> Hi community,
> 
> Can I get advice on this question?
> 
> Another user just sent me an email asking whether I found a solution or a
> workaround for this question, but I'm still stuck there.
> 
> Any suggestions?
> 
> Thanks in advance,
> 
> Dongwon
> 
> ---------- Forwarded message ---------
> From: Dongwon Kim <eastcirc...@gmail.com>
> Date: Mon, Aug 9, 2021 at 7:26 PM
> Subject: How to deserialize Avro enum type in Flink SQL?
> To: user <user@flink.apache.org>
> 
> 
> Hi community,
> 
> I have a Kafka topic where the schema of its values is defined by the
> "MyRecord" record in the following Avro IDL and registered to the Confluent
> Schema Registry.
> 
> > @namespace("my.type.avro")
> > protocol MyProtocol {
> >   enum MyEnumType {
> >     TypeVal1, TypeVal2
> >   }
> >   record MyEntry {
> >     MyEnumType type;
> >   }
> >   record MyRecord {
> >     array<MyEntry> entries;
> >   }
> > }
> 
> 
> To read from the topic, I've defined the following DDL:
> 
> > CREATE TABLE my_table
> 
> (
> >     `entries` ARRAY<ROW<
> >         *`type` ??? (This is the main question)*
> >     >>
> > ) WITH (
> >     'connector' = 'kafka',
> >     'topic' = 'my-topic',
> >     'properties.bootstrap.servers' = '...:9092',
> >     'scan.startup.mode' = 'latest-offset',
> >     'value.format' = 'avro-confluent',
> >     'value.avro-confluent.schema-registry.url' = 'http://...:8081'
> >
> )
> 
> 
> And I run the following query :
> 
> > SELECT * FROM my_table
> 
> 
> Now I got the following messages in Flink-1.13.1 when I use *STRING* for
> the type:
> 
> > *Caused by: java.io.IOException: Failed to deserialize Avro record.*
> >   at
> > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
> >   at
> > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
> >   at
> > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> >   at
> > org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
> >   at
> > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> >   at
> > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> >   at
> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> >   at
> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> >   at
> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> >   at
> > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> > *Caused by: org.apache.avro.AvroTypeException: Found
> > my.type.avro.MyEnumType, expecting union*
> >   at
> > org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
> >   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
> >   at
> > org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
> >   at
> > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> >   at
> > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> >   at
> > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >   at
> > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> >   at
> > org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
> >   at
> > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
> >   ... 9 more
> 
> The reason I use the STRING type is just for fast-prototyping.
> 
> While reading through [1], I've been thinking about using *RAW('class',
> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure
> whether it is a good idea and if so, what can be a value for the snapshot.
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw
> 
> Thanks in advance,
> 
> Dongwon
> 

Reply via email to