Hi Hang Ruan, Thanks for your response. But in the documentation, they have an example of defining the Primary Key for the DDL statement (code below). In that case we should be able to define the primary key for the DDL rite. We have defined the primary key in our earlier use cases when it wasn't a nested field. Please correct me , If I have misunderstood anything.
CREATE TABLE KafkaTable ( `ts` TIMESTAMP(3) METADATA FROM 'timestamp', `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING, PRIMARY KEY (`user_id`) NOT ENFORCED) WITH ( 'connector' = 'upsert-kafka', ... 'key.format' = 'json', 'key.json.ignore-parse-errors' = 'true', 'value.format' = 'json', 'value.json.fail-on-missing-field' = 'false', 'value.fields-include' = 'EXCEPT_KEY') Thanks, Elakiya On Mon, Jul 10, 2023 at 1:09 PM Hang Ruan <ruanhang1...@gmail.com> wrote: > Hi, elakiya. > > The upsert-kafka connector will read the primary keys from the Kafka > message keys. We cannot define the fields in the Kafka message values as > the primary key. > > Best, > Hang > > elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 13:49写道: > >> Hi team, >> >> I have a Kafka topic named employee which uses confluent avro schema and >> will emit the payload as below: >> >> { >> "employee": { >> "id": "123456", >> "name": "sampleName" >> } >> } >> I am using the upsert-kafka connector to consume the events from the >> above Kafka topic as below using the Flink SQL DDL statement, also here I >> want to use the id field as the Primary key. But I am unable to use the id >> field since it is inside the object. >> >> DDL Statement: >> String statement = "CREATE TABLE Employee (\r\n" + >> " employee ROW(id STRING, name STRING\r\n" + >> " ),\r\n" + >> " PRIMARY KEY (employee.id) NOT ENFORCED\r\n" + >> ") WITH (\r\n" + >> " 'connector' = 'upsert-kafka',\r\n" + >> " 'topic' = 'employee',\r\n" + >> " 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" + >> " 'key.format' = 'raw',\r\n" + >> " 'value.format' = 'avro-confluent',\r\n" + >> " 'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" >> + >> ")"; >> Any help is appreciated TIA >> >> Thanks, >> Elakiya >> >