Hi Shamit, I think the main problem is the wrong usage of the upsert kafka ddl. In the ddl, you use avro as the format rather than avro-confluent.
The dev mail list is used to discuss implementation details. Please send emails to user mail list for help. [1] https://flink.apache.org/gettinghelp.html#user-mailing-list Shamit <jainsha...@gmail.com> 于2021年2月7日周日 下午1:21写道: > Hello Team, > > I am facing issue with "upsert-kafka" connector which should read the Avro > message serialized using > "io.confluent.kafka.serializers.KafkaAvroDeserializer". Same is working > with > "kafka" connector. > > Looks like we are not able to pass the schema registry url and subject name > like the way we are passing while using "kafka" connector. > > Please help. > > > Table definition with upsert-kafka is below (not working), > > CREATE TABLE proposalLine (PROPOSAL_LINE_ID > bigint,LAST_MODIFIED_BY STRING ,PRIMARY KEY (PROPOSAL_LINE_ID) NOT ENFORCED > ) "WITH ('connector' = 'upsert-kafka', 'properties.bootstrap.servers' = > 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' = > 'lndcdcadsprpslproposalline', 'key.format' = 'avro', 'value.format' = > 'avro', 'properties.group.id'='dd', 'properties.schema.registry.url'=' > http://localhost:8081', > > 'properties.key.deserializer'='org.apache.kafka.common.serialization.LongDeserializer', > > 'properties.value.deserializer'='io.confluent.kafka.serializers.KafkaAvroDeserializer') > > ERROR: > Caused by: java.io.IOException: Failed to deserialize Avro record. > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101) > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44) > at > > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) > at > > org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130) > at > > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) > at > > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > at > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) > Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 > at > org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460) > at > org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283) > at > > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) > at > > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) > at > > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) > at > > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) > at > > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) > at > > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) > at > > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > > org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:139) > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98) > ... 9 more > > > Table definition with kafka connector is below (working), > CREATE TABLE proposalLine (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY String > ) > WITH ('connector' = 'kafka', 'properties.bootstrap.servers' = > 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' = > 'lndcdcadsprpslproposalline', > 'format'='avro-confluent','avro-confluent.schema-registry.url' = ' > <http://localhost:8081'> http://localhost:8081', > 'avro-confluent.schema-registry.subject' = > 'lndcdcadsprpslproposalline-value') > > Regards, > Shamit <http://localhost:8081'> > > > > -- > Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ >