[ https://issues.apache.org/jira/browse/FLINK-31777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alvin Ge updated FLINK-31777: ----------------------------- Description: I use debezium send data to kafka with confluent avro format, when I use 'upsert-kafka' connector, all values are null, but in 'kafka' connector all values is well. My upsert-kafka table like this: {code:java} // code placeholder create table TEA02 ( SUB_SYSTEM_ENAME varchar(255), REC_CREATOR varchar(255), REC_CREATE_TIME varchar(255), REC_REVISOR varchar(255), REC_REVISE_TIME varchar(255), ARCHIVE_FLAG varchar(255), SUB_SYSTEM_CNAME varchar(255), SUB_SYSTEM_FNAME varchar(255), SUB_SYSTEM_LEVEL varchar(255), primary key (SUB_SYSTEM_ENAME) not enforced ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'dev.oracle.JNMMM1.TEA02', 'properties.bootstrap.servers' = '10.0.170.213:9092,10.0.170.214:9092,10.0.170.215:9092', 'properties.group.id' = 'TEA02', 'key.format' = 'avro-confluent', 'key.avro-confluent.url' = 'http://10.0.170.213:8081', 'value.format' = 'avro-confluent', 'value.avro-confluent.url' = 'http://10.0.170.213:8081', 'value.fields-include' = 'EXCEPT_KEY' ); {code} query result: ||SUB_SYSTEM_ENAME||REC_CREATOR||REC_CREATE_TIME||.......|| |CJ|null|null|null| was: I use debezium send data to kafka with confluent avro format, when I use upsert-kafka connector, all values are null, in 'kafka' connector all values is well. My upsert-kafka table like this: {code:java} // code placeholder create table TEA02 ( SUB_SYSTEM_ENAME varchar(255), REC_CREATOR varchar(255), REC_CREATE_TIME varchar(255), REC_REVISOR varchar(255), REC_REVISE_TIME varchar(255), ARCHIVE_FLAG varchar(255), SUB_SYSTEM_CNAME varchar(255), SUB_SYSTEM_FNAME varchar(255), SUB_SYSTEM_LEVEL varchar(255), primary key (SUB_SYSTEM_ENAME) not enforced ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'dev.oracle.JNMMM1.TEA02', 'properties.bootstrap.servers' = '10.0.170.213:9092,10.0.170.214:9092,10.0.170.215:9092', 'properties.group.id' = 'TEA02', 'key.format' = 'avro-confluent', 'key.avro-confluent.url' = 'http://10.0.170.213:8081', 'value.format' = 'avro-confluent', 'value.avro-confluent.url' = 'http://10.0.170.213:8081', 'value.fields-include' = 'EXCEPT_KEY' ); {code} query result: ||SUB_SYSTEM_ENAME||REC_CREATOR||REC_CREATE_TIME||.......|| |CJ|null|null|null| > Upsert Kafka use Avro Confluent, key is ok, but all values are null. > -------------------------------------------------------------------- > > Key: FLINK-31777 > URL: https://issues.apache.org/jira/browse/FLINK-31777 > Project: Flink > Issue Type: Improvement > Components: kafka > Affects Versions: 1.16.0 > Environment: Flink: 1.6.0 > Confluent version: 7.3.3 > Debezium version: 2.1.0/2.0.0 > > Reporter: Alvin Ge > Priority: Major > > I use debezium send data to kafka with confluent avro format, when I use > 'upsert-kafka' connector, all values are null, but in 'kafka' connector all > values is well. > My upsert-kafka table like this: > > {code:java} > // code placeholder > create table TEA02 > ( > SUB_SYSTEM_ENAME varchar(255), > REC_CREATOR varchar(255), > REC_CREATE_TIME varchar(255), > REC_REVISOR varchar(255), > REC_REVISE_TIME varchar(255), > ARCHIVE_FLAG varchar(255), > SUB_SYSTEM_CNAME varchar(255), > SUB_SYSTEM_FNAME varchar(255), > SUB_SYSTEM_LEVEL varchar(255), > primary key (SUB_SYSTEM_ENAME) not enforced > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'dev.oracle.JNMMM1.TEA02', > 'properties.bootstrap.servers' = > '10.0.170.213:9092,10.0.170.214:9092,10.0.170.215:9092', > 'properties.group.id' = 'TEA02', > 'key.format' = 'avro-confluent', > 'key.avro-confluent.url' = 'http://10.0.170.213:8081', > 'value.format' = 'avro-confluent', > 'value.avro-confluent.url' = 'http://10.0.170.213:8081', > 'value.fields-include' = 'EXCEPT_KEY' > ); {code} > > query result: > > ||SUB_SYSTEM_ENAME||REC_CREATOR||REC_CREATE_TIME||.......|| > |CJ|null|null|null| > > -- This message was sent by Atlassian Jira (v8.20.10#820010)