[ https://issues.apache.org/jira/browse/FLINK-31777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17712853#comment-17712853 ]
Alvin Ge commented on FLINK-31777: ---------------------------------- [~jark] Thank you! > Upsert Kafka use Avro Confluent, key is ok, but all values are null. > -------------------------------------------------------------------- > > Key: FLINK-31777 > URL: https://issues.apache.org/jira/browse/FLINK-31777 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Affects Versions: 1.16.0 > Environment: Flink: 1.16.0 > Confluent version: 7.3.3 > Debezium version: 2.1.0/2.0.0 > > Reporter: Alvin Ge > Priority: Major > > I use debezium send data to kafka with confluent avro format, when I use > 'upsert-kafka' connector, all values are null (primary key has value), but in > 'kafka' connector all values are well. > My upsert-kafka table like this: > {code:java} > // code placeholder > create table TEA02 > ( > SUB_SYSTEM_ENAME varchar(255), > REC_CREATOR varchar(255), > REC_CREATE_TIME varchar(255), > REC_REVISOR varchar(255), > REC_REVISE_TIME varchar(255), > ARCHIVE_FLAG varchar(255), > SUB_SYSTEM_CNAME varchar(255), > SUB_SYSTEM_FNAME varchar(255), > SUB_SYSTEM_LEVEL varchar(255), > primary key (SUB_SYSTEM_ENAME) not enforced > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'dev.oracle.JNMMM1.TEA02', > 'properties.bootstrap.servers' = > '10.0.170.213:9092,10.0.170.214:9092,10.0.170.215:9092', > 'properties.group.id' = 'TEA02', > 'key.format' = 'avro-confluent', > 'key.avro-confluent.url' = 'http://10.0.170.213:8081', > 'value.format' = 'avro-confluent', > 'value.avro-confluent.url' = 'http://10.0.170.213:8081', > 'value.fields-include' = 'EXCEPT_KEY' > ); {code} > query result: > ||SUB_SYSTEM_ENAME(this columns is > pk)||REC_CREATOR||REC_CREATE_TIME||.......|| > |CJ|null|null|null| > Specified subject still not working. > {code:java} > // code placeholder > 'key.avro-confluent.subject' = 'dev.oracle.JNMMM1.TEA02-key', > 'value.avro-confluent.subject' = 'dev.oracle.JNMMM1.TEA02-value' {code} > BTW: All debezium events are READ operation. > The confluent schemas are here: > {code:java} > // code placeholder > [{ > "subject": "dev.oracle-key", > "version": 1, > "id": 1, > "schema": > "{\"type\":\"record\",\"name\":\"SchemaChangeKey\",\"namespace\":\"io.debezium.connector.oracle\",\"fields\":[{\"name\":\"databaseName\",\"type\":\"string\"}],\"connect.version\":1,\"connect.name\":\"io.debezium.connector.oracle.SchemaChangeKey\"}" > }, { > "subject": "dev.oracle-value", > "version": 1, > "id": 2, > "schema": > "{\"type\":\"record\",\"name\":\"SchemaChangeValue\",\"namespace\":\"io.debezium.connector.oracle\",\"fields\":[{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false,incremental\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"sequence\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"schema\",\"type\":\"string\"},{\"name\":\"table\",\"type\":\"string\"},{\"name\":\"txId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"scn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"commit_scn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"lcr_position\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"rs_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ssn\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"redo_thread\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"user_name\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.oracle.Source\"}},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"databaseName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"schemaName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ddl\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tableChanges\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Change\",\"namespace\":\"io.debezium.connector.schema\",\"fields\":[{\"name\":\"type\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"table\",\"type\":{\"type\":\"record\",\"name\":\"Table\",\"fields\":[{\"name\":\"defaultCharsetName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"primaryKeyColumnNames\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null},{\"name\":\"columns\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Column\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"jdbcType\",\"type\":\"int\"},{\"name\":\"nativeType\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"typeName\",\"type\":\"string\"},{\"name\":\"typeExpression\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"charsetName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"length\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"scale\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"position\",\"type\":\"int\"},{\"name\":\"optional\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"autoIncremented\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"generated\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"defaultValueExpression\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"enumValues\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null}],\"connect.version\":1,\"connect.name\":\"io.debezium.connector.schema.Column\"}}},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.version\":1,\"connect.name\":\"io.debezium.connector.schema.Table\"}}],\"connect.version\":1,\"connect.name\":\"io.debezium.connector.schema.Change\"}}}],\"connect.version\":1,\"connect.name\":\"io.debezium.connector.oracle.SchemaChangeValue\"}" > }, { > "subject": "dev.oracle.JNMMM1.TEA02-key", > "version": 1, > "id": 3, > "schema": > "{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"dev.oracle.JNMMM1.TEA02\",\"fields\":[{\"name\":\"SUB_SYSTEM_ENAME\",\"type\":{\"type\":\"string\",\"connect.default\":\" > \"},\"default\":\" \"}],\"connect.name\":\"dev.oracle.JNMMM1.TEA02.Key\"}" > }, { > "subject": "dev.oracle.JNMMM1.TEA02-value", > "version": 1, > "id": 4, > "schema": > "{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"dev.oracle.JNMMM1.TEA02\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"REC_CREATOR\",\"type\":[{\"type\":\"string\",\"connect.default\":\" > \"},\"null\"],\"default\":\" > \"},{\"name\":\"REC_CREATE_TIME\",\"type\":[{\"type\":\"string\",\"connect.default\":\" > \"},\"null\"],\"default\":\" > \"},{\"name\":\"REC_REVISOR\",\"type\":[{\"type\":\"string\",\"connect.default\":\" > \"},\"null\"],\"default\":\" > \"},{\"name\":\"REC_REVISE_TIME\",\"type\":[{\"type\":\"string\",\"connect.default\":\" > \"},\"null\"],\"default\":\" > \"},{\"name\":\"ARCHIVE_FLAG\",\"type\":{\"type\":\"string\",\"connect.default\":\" > \"},\"default\":\" > \"},{\"name\":\"SUB_SYSTEM_ENAME\",\"type\":{\"type\":\"string\",\"connect.default\":\" > \"},\"default\":\" > \"},{\"name\":\"SUB_SYSTEM_CNAME\",\"type\":{\"type\":\"string\",\"connect.default\":\" > \"},\"default\":\" > \"},{\"name\":\"SUB_SYSTEM_FNAME\",\"type\":{\"type\":\"string\",\"connect.default\":\" > \"},\"default\":\" > \"},{\"name\":\"SUB_SYSTEM_LEVEL\",\"type\":{\"type\":\"string\",\"connect.default\":\" > \"},\"default\":\" > \"}],\"connect.name\":\"dev.oracle.JNMMM1.TEA02.Value\"}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.oracle\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false,incremental\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"sequence\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"schema\",\"type\":\"string\"},{\"name\":\"table\",\"type\":\"string\"},{\"name\":\"txId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"scn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"commit_scn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"lcr_position\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"rs_id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ssn\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"redo_thread\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"user_name\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.oracle.Source\"}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"transaction\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"block\",\"namespace\":\"event\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"total_order\",\"type\":\"long\"},{\"name\":\"data_collection_order\",\"type\":\"long\"}],\"connect.version\":1,\"connect.name\":\"event.block\"}],\"default\":null}],\"connect.version\":1,\"connect.name\":\"dev.oracle.JNMMM1.TEA02.Envelope\"}" > }] {code} > > > --------------- > > I tried kafka connector, the values are still null, may be confluent version > 7.3.3 is not siutable flink? > my table look like: > > > {code:java} > // code placeholder > create table TEA02 > ( > SUB_SYSTEM_ENAME string, > REC_CREATOR string, > REC_CREATE_TIME string, > REC_REVISOR string, > REC_REVISE_TIME string, > ARCHIVE_FLAG string, > SUB_SYSTEM_CNAME string, > SUB_SYSTEM_FNAME string, > SUB_SYSTEM_LEVEL string > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'dev.oracle.JNMMM1.TEA02', > 'properties.bootstrap.servers' = > '10.0.170.213:9092,10.0.170.214:9092,10.0.170.215:9092', > 'properties.group.id' = 'TEA02', > 'key.format' = 'avro-confluent', > 'key.fields' = 'SUB_SYSTEM_ENAME', > 'key.avro-confluent.url' = 'http://10.0.170.213:8081', > 'value.format' = 'avro-confluent', > 'value.avro-confluent.url' = 'http://10.0.170.213:8081', > 'value.fields-include' = 'EXCEPT_KEY', > 'key.avro-confluent.subject' = 'dev.oracle.JNMMM1.TEA02-key', > 'value.avro-confluent.subject' = 'dev.oracle.JNMMM1.TEA02-value', > 'scan.startup.mode' = 'earliest-offset' > ); {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)