[ 
https://issues.apache.org/jira/browse/FLINK-24544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17429243#comment-17429243
 ] 

Peter Schrott edited comment on FLINK-24544 at 10/15/21, 12:04 PM:
-------------------------------------------------------------------

I was wondering about the message of the exception: 
{noformat}
Found org.example.MyEnumType, expecting union{noformat}
It seems that the enum is not the cause here. So I stripped down the example 
avro like:
{noformat}
@namespace("org.example")
protocol MyProtocl {
  enum MyEnumType {
    TypeVal1, TypeVal2
  }
  record MyRecord {
    MyEnumType type;
  }
}{noformat}
and the source table sql accordingly:
{noformat}
CREATE TABLE my_table_from_kafka
(
  `type` STRING
) WITH ( ... ){noformat}
then I get the same exception:
{noformat}
Caused by: org.apache.avro.AvroTypeException: Found org.example.MyEnumType, 
expecting union{noformat}
This union-exception is somehow understandable as in the avro the field "type" 
is defied "not nullable" but in flink table SQL its nullable, so if you add 
"NOT NULL" such that avro and SQL match: 
{noformat}
CREATE TABLE my_table_from_kafka
(
  `type` STRING NOT NULL
) WITH ( ... ){noformat}
avro is actually complaining about not being able to deserialise the enum to 
string:
{noformat}
Caused by: org.apache.avro.AvroTypeException: Found org.example.MyEnumType, 
expecting string{noformat}

But if i change the avro such that the attribute "type" is actually nullable 
(union type of null and MyEnumType).
{noformat}
record MyRecord {
  union { null, MyEnumType } type;
}{noformat}
and leave the SQL without the "NOT NULL" (again avro and SQL match in the type 
of `type`, either nullable),  I still get the union-excpetion unexpectedly. I 
have no logical answer to that.


was (Author: peter.schrott):
I was wondering about the message of the exception: 
{noformat}
Found org.example.MyEnumType, expecting union{noformat}
It seems that the enum is not the cause here. So I stripped down the example 
avro like:
{noformat}
@namespace("org.example")
protocol MyProtocl {
  enum MyEnumType {
    TypeVal1, TypeVal2
  }
  record MyRecord {
    MyEnumType type;
  }
}{noformat}
and the source table sql accordingly:
{noformat}
CREATE TABLE my_table_from_kafka
(
  `type` STRING
) WITH ...{noformat}
then I get the same exception:
{noformat}
Caused by: org.apache.avro.AvroTypeException: Found org.example.MyEnumType, 
expecting union{noformat}
This union-exception is somehow understandable as in the avro the field "type" 
is defied "not nullable" but in flink table SQL its nullable, so if you add 
"NOT NULL" to the SQL like, 
{noformat}
CREATE TABLE my_table_from_kafka
(
  `type` STRING NOT NULL
) WITH ...{noformat}
avro is actually complaining about not being able to deserialise the enum to 
string:
{noformat}
Caused by: org.apache.avro.AvroTypeException: Found org.example.MyEnumType, 
expecting string{noformat}
But if i change the avro such that the attribute "type" is actually nullable 
(union type of null and MyEnumType):
{noformat}
record MyRecord {
  union { null, MyEnumType } type;
}{noformat}
and leave the SQL without the "NOT NULL" (also nullable) I still get the 
union-excpetion unexpectedly. I have no logical answer to that.

> Failure when using Kafka connector in Table API with Avro and Confluent 
> schema registry 
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-24544
>                 URL: https://issues.apache.org/jira/browse/FLINK-24544
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC, 
> SequenceFile), Table SQL / Ecosystem
>    Affects Versions: 1.13.1
>            Reporter: Francesco Guardiani
>            Priority: Major
>         Attachments: flink-deser-avro-enum.zip
>
>
> A user reported in the [mailing 
> list|https://lists.apache.org/thread.html/re38a07f6121cc580737a20c11574719cfe554e58d99817f79db9bb4a%40%3Cuser.flink.apache.org%3E]
>  that Avro deserialization fails when using Kafka, Avro and Confluent Schema 
> Registry:  
> {code:java}
> Caused by: java.io.IOException: Failed to deserialize Avro record.
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>   
>   at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>   at 
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>  
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> Caused by: org.apache.avro.AvroTypeException: Found my.type.avro.MyEnumType, 
> expecting union
>   at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>   at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>   at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>   at 
> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>   at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at 
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>   at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>   at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>   at 
> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>   ... 9 more
> {code}
> Look in the attachments for a reproducer.
> Same data serialized to a file works fine (look the filesystem example in the 
> reproducer) 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to