> > Can you provide a minimal reproducer (without confluent schema registry) > with a valid input? >
Please download and unzip the attached file. - src/main/avro/MyProtocol.avdl - MyRecord, MyEntry, and the MyEnumType is defined - "mvn generate-sources" will auto-generate Java classes under "target/generated-sources" - "org.example.fs" contains - "org.example.fs.Writer" which writes a single record of MyRecord type to "output.avro" - "org.example.fs.Reader" which reads the record from "output.avro" - "org.example.fs.ExampleFromFileSystem" executes CREATE TABLE defined in "my_table.ddl" and shows that it successfully deserialize MyRecord from a Avro record written in a file as you mentioned. - "org.example.kafka.ExampleFromKafkaAndSR" does almost the same as "org.example.fs.ExampleFromFileSystem" except that it reads from Kafka and looks up the schema from Schema Registry - However, it produces the same exception unlike ExampleFromFileSystem - What I produced to a Kafka topic is {"entries": [{"type": "TypeVal1"}, {"type": "TypeVal2"}, {"type": "TypeVal2"}]} which is a Avro record saved on output.avro. - The size of "output.avro" is 321 bytes on the disk while the size of the value of a Kafka record is 10 bytes. Hope this provides enough information. Best, Dongwon On Wed, Oct 13, 2021 at 4:50 PM Francesco Guardiani <france...@ververica.com> wrote: > First of all, are you sure the input data is correct? From the stacktrace > it seems to me the issue might be that the input data is invalid. > > Looking at the code of AvroToRowDataConverters, It sounds like STRING > should work with avro enums. Can you provide a minimal reproducer (without > confluent schema registry) with a valid input? > > On Tue, Oct 12, 2021 at 6:19 PM Dongwon Kim <eastcirc...@gmail.com> wrote: > >> Hi community, >> >> Can I get advice on this question? >> >> Another user just sent me an email asking whether I found a solution or a >> workaround for this question, but I'm still stuck there. >> >> Any suggestions? >> >> Thanks in advance, >> >> Dongwon >> >> ---------- Forwarded message --------- >> From: Dongwon Kim <eastcirc...@gmail.com> >> Date: Mon, Aug 9, 2021 at 7:26 PM >> Subject: How to deserialize Avro enum type in Flink SQL? >> To: user <user@flink.apache.org> >> >> >> Hi community, >> >> I have a Kafka topic where the schema of its values is defined by the >> "MyRecord" record in the following Avro IDL and registered to the Confluent >> Schema Registry. >> >>> @namespace("my.type.avro") >>> protocol MyProtocol { >>> enum MyEnumType { >>> TypeVal1, TypeVal2 >>> } >>> record MyEntry { >>> MyEnumType type; >>> } >>> record MyRecord { >>> array<MyEntry> entries; >>> } >>> } >> >> >> To read from the topic, I've defined the following DDL: >> >>> CREATE TABLE my_table >> >> ( >>> `entries` ARRAY<ROW< >>> *`type` ??? (This is the main question)* >>> >> >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'my-topic', >>> 'properties.bootstrap.servers' = '...:9092', >>> 'scan.startup.mode' = 'latest-offset', >>> 'value.format' = 'avro-confluent', >>> 'value.avro-confluent.schema-registry.url' = 'http://...:8081' >>> >> ) >> >> >> And I run the following query : >> >>> SELECT * FROM my_table >> >> >> Now I got the following messages in Flink-1.13.1 when I use *STRING* for >> the type: >> >>> *Caused by: java.io.IOException: Failed to deserialize Avro record.* >>> at >>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106) >>> at >>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46) >>> at >>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) >>> at >>> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) >>> at >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) >>> *Caused by: org.apache.avro.AvroTypeException: Found >>> my.type.avro.MyEnumType, expecting union* >>> at >>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308) >>> at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) >>> at >>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275) >>> at >>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) >>> at >>> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298) >>> at >>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183) >>> at >>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) >>> at >>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) >>> at >>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) >>> at >>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) >>> at >>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) >>> at >>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) >>> at >>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) >>> at >>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>> at >>> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81) >>> at >>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103) >>> ... 9 more >> >> The reason I use the STRING type is just for fast-prototyping. >> >> While reading through [1], I've been thinking about using *RAW('class', >> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure >> whether it is a good idea and if so, what can be a value for the snapshot. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw >> >> Thanks in advance, >> >> Dongwon >> >
flink-deser-avro-enum.tar.gz
Description: GNU Zip compressed data