Hi community,
Can I get advice on this question?
Another user just sent me an email asking whether I found a solution or a
workaround for this question, but I'm still stuck there.
Any suggestions?
Thanks in advance,
Dongwon
---------- Forwarded message ---------
From: Dongwon Kim <eastcirc...@gmail.com>
Date: Mon, Aug 9, 2021 at 7:26 PM
Subject: How to deserialize Avro enum type in Flink SQL?
To: user <user@flink.apache.org>
Hi community,
I have a Kafka topic where the schema of its values is defined by the
"MyRecord" record in the following Avro IDL and registered to the Confluent
Schema Registry.
@namespace("my.type.avro")
protocol MyProtocol {
enum MyEnumType {
TypeVal1, TypeVal2
}
record MyEntry {
MyEnumType type;
}
record MyRecord {
array<MyEntry> entries;
}
}
To read from the topic, I've defined the following DDL:
CREATE TABLE my_table
(
`entries` ARRAY<ROW<
*`type` ??? (This is the main question)*
>>
) WITH (
'connector' = 'kafka',
'topic' = 'my-topic',
'properties.bootstrap.servers' = '...:9092',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://...:8081'
)
And I run the following query :
SELECT * FROM my_table
Now I got the following messages in Flink-1.13.1 when I use *STRING* for
the type:
*Caused by: java.io.IOException: Failed to deserialize Avro record.*
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
*Caused by: org.apache.avro.AvroTypeException: Found
my.type.avro.MyEnumType, expecting union*
at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
at
org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
... 9 more
The reason I use the STRING type is just for fast-prototyping.
While reading through [1], I've been thinking about using *RAW('class',
'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure
whether it is a good idea and if so, what can be a value for the snapshot.
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw
Thanks in advance,
Dongwon