Hi community, I have a Kafka topic where the schema of its values is defined by the "MyRecord" record in the following Avro IDL and registered to the Confluent Schema Registry.
> @namespace("my.type.avro") > protocol MyProtocol { > enum MyEnumType { > TypeVal1, TypeVal2 > } > record MyEntry { > MyEnumType type; > } > record MyRecord { > array<MyEntry> entries; > } > } To read from the topic, I've defined the following DDL: > CREATE TABLE my_table ( > `entries` ARRAY<ROW< > *`type` ??? (This is the main question)* > >> > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'my-topic', > 'properties.bootstrap.servers' = '...:9092', > 'scan.startup.mode' = 'latest-offset', > 'value.format' = 'avro-confluent', > 'value.avro-confluent.schema-registry.url' = 'http://...:8081' > ) And I run the following query : > SELECT * FROM my_table Now I got the following messages in Flink-1.13.1 when I use *STRING* for the type: > *Caused by: java.io.IOException: Failed to deserialize Avro record.* > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) > at > org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) > *Caused by: org.apache.avro.AvroTypeException: Found > my.type.avro.MyEnumType, expecting union* > at > org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308) > at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) > at > org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) > at > org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) > at > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103) > ... 9 more The reason I use the STRING type is just for fast-prototyping. While reading through [1], I've been thinking about using *RAW('class', 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure whether it is a good idea and if so, what can be a value for the snapshot. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw Thanks in advance, Dongwon