Hi Shamit, thanks for reaching out to the community. I am pulling in Timo who might know more about this problem.
Cheers, Till On Sun, Feb 7, 2021 at 6:22 AM shamit jain <jainsha...@gmail.com> wrote: > Hello Team, > > I am facing issue with "upsert-kafka" connector which should read the Avro > message serialized using > "io.confluent.kafka.serializers.KafkaAvroDeserializer". Same is working > with "kafka" connector. > > Looks like we are not able to pass the schema registry url and subject > name like the way we are passing while using "kafka" connector. > > Please help. > > > Table definition with upsert-kafka is below (not working), > > CREATE TABLE proposalLine (PROPOSAL_LINE_ID > bigint,LAST_MODIFIED_BY STRING ,PRIMARY KEY (PROPOSAL_LINE_ID) NOT ENFORCED > ) "WITH ('connector' = 'upsert-kafka', 'properties.bootstrap.servers' = > 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' = > 'lndcdcadsprpslproposalline', 'key.format' = 'avro', 'value.format' = > 'avro', 'properties.group.id'='dd', 'properties.schema.registry.url'='<a > href="http://localhost:8081'">http://localhost:8081', > 'properties.key.deserializer'='org.apache.kafka.common.serialization.LongDeserializer', > 'properties.value.deserializer'='io.confluent.kafka.serializers.KafkaAvroDeserializer') > > ERROR: > Caused by: java.io.IOException: Failed to deserialize Avro record. > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) > at > org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) > Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 > at > org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460) > at org.apache.avro.io > .ResolvingDecoder.readIndex(ResolvingDecoder.java:283) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) > at > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:139) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98) > ... 9 more > > > Table definition with kafka connector is below (working), > CREATE TABLE proposalLine (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY String > ) WITH ('connector' = 'kafka', 'properties.bootstrap.servers' = > 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' = > 'lndcdcadsprpslproposalline', > 'format'='avro-confluent','avro-confluent.schema-registry.url' = '<a href=" > http://localhost:8081'">http://localhost:8081', > 'avro-confluent.schema-registry.subject' = > 'lndcdcadsprpslproposalline-value') > > Regards, > Shamit