Hello Team, As we have two kafka connectors "upsert-kafka" and "kafka".
I am facing issue with "upsert-kafka" while reading avro message serialized using "io.confluent.kafka.serializers.KafkaAvroDeserializer". Please note "kafka" connector is working while reading avro message serialized using "io.confluent.kafka.serializers.KafkaAvroDeserializer". Below are the definitions with both the Kafka connector:- *Table definition with "kafka"connector which is working fine.* /CREATE TABLE proposalLine (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY String ) WITH ('connector' = 'kafka', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' = 'lndcdcadsprpslproposalline', 'format'='avro-confluent','avro-confluent.schema-registry.url' = ' http://localhost:8081', 'avro-confluent.schema-registry.subject' = 'lndcdcadsprpslproposalline-value') <http://localhost:8081'> / *Table definition and error with "upsert-kafka"connector which is not working fine.* / CREATE TABLE proposalLine (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY STRING ,PRIMARY KEY (PROPOSAL_LINE_ID) NOT ENFORCED ) "WITH ('connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' = 'lndcdcadsprpslproposalline', 'key.format' = 'avro', 'value.format' = 'avro', 'properties.group.id'='dd', 'properties.schema.registry.url'=' http://localhost:8081', 'properties.key.deserializer'='org.apache.kafka.common.serialization.LongDeserializer', 'properties.value.deserializer'='io.confluent.kafka.serializers.KafkaAvroDeserializer') ERROR: Caused by: java.io.IOException: Failed to deserialize Avro record. at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460) at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:139) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98) ... 9 more <http://localhost:8081'> / Please help. Regards, Shamit -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/