[ 
https://issues.apache.org/jira/browse/FLINK-24544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17434165#comment-17434165
 ] 

Peter Schrott commented on FLINK-24544:
---------------------------------------

I found a workaround for that issue - which works at least for my use case. 

The main idea was customizing 
{{org.apache.flink.formats.avro.registry.confluent.RegistryAvroFormatFactory}} 
such that the expected avro schema is not gained from the {{CREATE TABLE}} SQL 
statement, rather than passed in as parameter. This results in matching schemas 
(actual and expected) when avro is deserialized under the hood by avro lib. 

This gives me the freedom to create a source table exactly matching the records 
in Kafka. In my case, the \{{CREATE TABLE}} SQL statement is generated from the 
avro schema to ensure avro and SQL are matching. So actually doing it the other 
way round as it is done in current implementation. 

Please compare: [https://github.com/peterschrott/flinkDeserAvroEnum]
 Maybe this helps someone.

> Failure when using Kafka connector in Table API with Avro and Confluent 
> schema registry 
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-24544
>                 URL: https://issues.apache.org/jira/browse/FLINK-24544
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC, 
> SequenceFile), Table SQL / Ecosystem
>    Affects Versions: 1.13.1
>            Reporter: Francesco Guardiani
>            Priority: Major
>         Attachments: flink-deser-avro-enum.zip
>
>
> A user reported in the [mailing 
> list|https://lists.apache.org/thread.html/re38a07f6121cc580737a20c11574719cfe554e58d99817f79db9bb4a%40%3Cuser.flink.apache.org%3E]
>  that Avro deserialization fails when using Kafka, Avro and Confluent Schema 
> Registry:  
> {code:java}
> Caused by: java.io.IOException: Failed to deserialize Avro record.
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>   
>   at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>   at 
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>  
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> Caused by: org.apache.avro.AvroTypeException: Found my.type.avro.MyEnumType, 
> expecting union
>   at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>   at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>   at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>   at 
> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>   at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at 
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>   at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>   at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>   at 
> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>   ... 9 more
> {code}
> Look in the attachments for a reproducer.
> Same data serialized to a file works fine (look the filesystem example in the 
> reproducer) 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to