Hello Team,

I am facing issue with "upsert-kafka" connector which should read the Avro 
message serialized using 
"io.confluent.kafka.serializers.KafkaAvroDeserializer". Same is working with 
"kafka" connector.

Looks like we are not able to pass the schema registry url and subject name 
like the way we are passing while using "kafka" connector.

Please help.


Table definition with upsert-kafka is below (not working),

                CREATE TABLE proposalLine (PROPOSAL_LINE_ID 
bigint,LAST_MODIFIED_BY STRING ,PRIMARY KEY (PROPOSAL_LINE_ID) NOT ENFORCED ) 
"WITH ('connector' = 'upsert-kafka', 'properties.bootstrap.servers' = 
'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' = 
'lndcdcadsprpslproposalline', 'key.format' = 'avro', 'value.format' = 'avro', 
'properties.group.id'='dd', 'properties.schema.registry.url'='<a 
href="http://localhost:8081'">http://localhost:8081', 
'properties.key.deserializer'='org.apache.kafka.common.serialization.LongDeserializer',
 
'properties.value.deserializer'='io.confluent.kafka.serializers.KafkaAvroDeserializer')

ERROR:
     Caused by: java.io.IOException: Failed to deserialize Avro record.
        at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
        at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
        at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
        at 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130)
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at 
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
        at 
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at 
org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:139)
        at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
        ... 9 more


Table definition with kafka connector is below (working),
CREATE TABLE proposalLine (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY String ) 
WITH ('connector' = 'kafka', 'properties.bootstrap.servers' = 'localhost:9092', 
'properties.auto.offset.reset' = 'earliest', 'topic' = 
'lndcdcadsprpslproposalline', 
'format'='avro-confluent','avro-confluent.schema-registry.url' = '<a 
href="http://localhost:8081'">http://localhost:8081', 
'avro-confluent.schema-registry.subject' = 'lndcdcadsprpslproposalline-value')

Regards,
Shamit

Reply via email to