[ https://issues.apache.org/jira/browse/FLINK-24544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17431204#comment-17431204 ]
Peter Schrott edited comment on FLINK-24544 at 10/20/21, 12:59 PM: ------------------------------------------------------------------- The underlying problem with deserialization of records with enums form Kafka & schema registry lies in the initialization of \{{GenericDatumReader}}: Case Kafka & SR: In \{{AvroDeserializationSchema.java}} the \{{GenericDatumReader}} is initialized with \{{writerSchema = null}} and \{{readerSchema = the schma gained from table ddl}} -> When calling \{{RegistryAvroDeserializationSchema.deserialize(.)}} \{{datumReader.setSchema()}} sets the attribute \{{actual}} is set to the actual avro schema, whereas \{{expected}} is already set to \{{readerSchema}} -> The inequality of \{{actual}} and \{{expected}} causes the exception on serializing as type of \{{actual}} and \{{expected}} do not match --> Root of this is: the initialization of \{{DeserializationSchema}} in \{{RegistryAvroFormatFactory.java}} uses the \{{rowType}} && \{{ConfluentRegistryAvroDeserializationSchema.forGeneric(.)}} when creating the \{{ConfluentRegistryAvroDeserializationSchema}} Case FS: In \{{AvroInputFormat.java}} the \{{GenericDatumReader}} is initialized with \{{writerSchema = null}} and \{{readerSchema = null}} -> This leads in initialization of \{{DataFileStream}}, where \{{reader.getSchema(.)}} is called with the actual avro, to the fact that in the \{{GenericDatumReader}} attribute \{{expected}} and \{{actual}} is set to the passed value -> The avro schema is taken from file -> The equality of \{{actual}} and \{{expected}} leads to the fact that serialized data can be read from file was (Author: peter.schrott): The underlying problem with deserialization of records with enums form Kafka & schema registry lies in the initialization of {{GenericDatumReader}}: Case Kafka & SR: In {{AvroDeserializationSchema.java}} the {{GenericDatumReader}} is initialized with {{writerSchema = null}} and {{readerSchema = the schma gained from table ddl}} -> When calling {{RegistryAvroDeserializationSchema.deserialize(.)}} {{datumReader.setSchema()}} sets the attribute {{actual}} is set to the actual avro schema, whereas {{expected}} is already set to {{readerSchema}} -> The inequality of {{actual}} and {{expected}} causes the exception on serializing as type of {{actual}} and {{expected}} do not match --> Root of this is: the initialization of {{DeserializationSchema}} in {{RegistryAvroFormatFactory.java}} uses the {{rowType}} && {{ }} when creating the {{ConfluentRegistryAvroDeserializationSchema}} Case FS: In {{AvroInputFormat.java}} the {{GenericDatumReader}} is initialized with {{writerSchema = null}} and {{readerSchema = null}} -> This leads in initialization of {{DataFileStream}}, where {{reader.getSchema(.)}} is called with the actual avro, to the fact that in the {{GenericDatumReader}} attribute {{expected}} and {{actual}} is set to the passed value -> The avro schema is taken from file -> The equality of {{actual}} and {{expected}} leads to the fact that serialized data can be read from file > Failure when using Kafka connector in Table API with Avro and Confluent > schema registry > ---------------------------------------------------------------------------------------- > > Key: FLINK-24544 > URL: https://issues.apache.org/jira/browse/FLINK-24544 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC, > SequenceFile), Table SQL / Ecosystem > Affects Versions: 1.13.1 > Reporter: Francesco Guardiani > Priority: Major > Attachments: flink-deser-avro-enum.zip > > > A user reported in the [mailing > list|https://lists.apache.org/thread.html/re38a07f6121cc580737a20c11574719cfe554e58d99817f79db9bb4a%40%3Cuser.flink.apache.org%3E] > that Avro deserialization fails when using Kafka, Avro and Confluent Schema > Registry: > {code:java} > Caused by: java.io.IOException: Failed to deserialize Avro record. > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46) > > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) > at > org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) > > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) > Caused by: org.apache.avro.AvroTypeException: Found my.type.avro.MyEnumType, > expecting union > at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308) > at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) > at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) > at > org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) > at > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103) > ... 9 more > {code} > Look in the attachments for a reproducer. > Same data serialized to a file works fine (look the filesystem example in the > reproducer) -- This message was sent by Atlassian Jira (v8.3.4#803005)