Hi community,

I have a Kafka topic where the schema of its values is defined by the
"MyRecord" record in the following Avro IDL and registered to the Confluent
Schema Registry.

> @namespace("my.type.avro")
> protocol MyProtocol {
>   enum MyEnumType {
>     TypeVal1, TypeVal2
>   }
>   record MyEntry {
>     MyEnumType type;
>   }
>   record MyRecord {
>     array<MyEntry> entries;
>   }
> }


To read from the topic, I've defined the following DDL:

> CREATE TABLE my_table

(
>     `entries` ARRAY<ROW<
>         *`type` ??? (This is the main question)*
>     >>
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 'my-topic',
>     'properties.bootstrap.servers' = '...:9092',
>     'scan.startup.mode' = 'latest-offset',
>     'value.format' = 'avro-confluent',
>     'value.avro-confluent.schema-registry.url' = 'http://...:8081'
>
)


And I run the following query :

> SELECT * FROM my_table


Now I got the following messages in Flink-1.13.1 when I use *STRING* for
the type:

> *Caused by: java.io.IOException: Failed to deserialize Avro record.*
>   at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>   at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>   at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>   at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>   at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>   at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>   at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>   at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> *Caused by: org.apache.avro.AvroTypeException: Found
> my.type.avro.MyEnumType, expecting union*
>   at
> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>   at
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>   at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>   at
> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>   at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>   at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>   at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>   at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>   at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>   at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>   at
> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>   at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>   ... 9 more

The reason I use the STRING type is just for fast-prototyping.

While reading through [1], I've been thinking about using *RAW('class',
'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure
whether it is a good idea and if so, what can be a value for the snapshot.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw

Thanks in advance,

Dongwon

Reply via email to