A current workaround is to use DataStream API to read the data and provide your custom Avro schema to configure the format. Then switch to Table API.

StreamTableEnvironment.fromDataStream(...) accepts all data types. Enum classes will be represented as RAW types but you can forward them as blackboxes or convert them in a UDF.

We will further improve the support of external types in the Table API type system in the near future.

Regards,
Timo

On 20.10.21 15:51, Peter Schrott wrote:
Hi people!

I was digging deeper this days and found the "root cause" of the issue and the 
difference between avro reading from files and avro reading from Kafka & SR.

plz see: 
https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E

The main problem with Kafka & SR is, that the "org.apache.avro.generic.GenericDatumReader" is initialized 
with and "expected" schema which is taken from the flinks sql table definition. When it comes to deserializing 
the and attribute with type "enum" it does not match with the expected schema where this same attribute is typed 
as "string". Hence avro deserializer breaks here.

Not sure how to tackle that issue. The functioning of the 
"GeneraticDatumReader" can not really be changed. A solution could be to create 
an analogues reader for reading data based on SQL ddl.

Cheers, Peter

On 2021/10/12 16:18:30 Dongwon Kim wrote:
Hi community,

Can I get advice on this question?

Another user just sent me an email asking whether I found a solution or a
workaround for this question, but I'm still stuck there.

Any suggestions?

Thanks in advance,

Dongwon

---------- Forwarded message ---------
From: Dongwon Kim <eastcirc...@gmail.com>
Date: Mon, Aug 9, 2021 at 7:26 PM
Subject: How to deserialize Avro enum type in Flink SQL?
To: user <user@flink.apache.org>


Hi community,

I have a Kafka topic where the schema of its values is defined by the
"MyRecord" record in the following Avro IDL and registered to the Confluent
Schema Registry.

@namespace("my.type.avro")
protocol MyProtocol {
   enum MyEnumType {
     TypeVal1, TypeVal2
   }
   record MyEntry {
     MyEnumType type;
   }
   record MyRecord {
     array<MyEntry> entries;
   }
}


To read from the topic, I've defined the following DDL:

CREATE TABLE my_table

(
     `entries` ARRAY<ROW<
         *`type` ??? (This is the main question)*
     >>
) WITH (
     'connector' = 'kafka',
     'topic' = 'my-topic',
     'properties.bootstrap.servers' = '...:9092',
     'scan.startup.mode' = 'latest-offset',
     'value.format' = 'avro-confluent',
     'value.avro-confluent.schema-registry.url' = 'http://...:8081'

)


And I run the following query :

SELECT * FROM my_table


Now I got the following messages in Flink-1.13.1 when I use *STRING* for
the type:

*Caused by: java.io.IOException: Failed to deserialize Avro record.*
   at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
   at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
   at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
   at
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
   at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
   at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
   at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
   at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
   at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
   at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
*Caused by: org.apache.avro.AvroTypeException: Found
my.type.avro.MyEnumType, expecting union*
   at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
   at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
   at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
   at
org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
   at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
   at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
   at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
   at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
   at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
   at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
   at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
   at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
   at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
   at
org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
   at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
   ... 9 more

The reason I use the STRING type is just for fast-prototyping.

While reading through [1], I've been thinking about using *RAW('class',
'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure
whether it is a good idea and if so, what can be a value for the snapshot.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw

Thanks in advance,

Dongwon



Reply via email to