Hi Hang Ruan,

Thanks for your response. But in the documentation, they have an example of
defining the Primary Key for the DDL statement (code below). In that case
we should be able to define the primary key for the DDL rite. We have
defined the primary key in our earlier use cases when it wasn't a nested
field. Please correct me , If I have misunderstood anything.

CREATE TABLE KafkaTable (  `ts` TIMESTAMP(3) METADATA FROM
'timestamp',  `user_id` BIGINT,  `item_id` BIGINT,  `behavior` STRING,
 PRIMARY KEY (`user_id`) NOT ENFORCED) WITH (  'connector' =
'upsert-kafka',  ...  'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',  'value.format' = 'json',
'value.json.fail-on-missing-field' = 'false',  'value.fields-include'
= 'EXCEPT_KEY')

Thanks,
Elakiya

On Mon, Jul 10, 2023 at 1:09 PM Hang Ruan <ruanhang1...@gmail.com> wrote:

> Hi, elakiya.
>
> The upsert-kafka connector will read the primary keys from the Kafka
> message keys. We cannot define the fields in the Kafka message values as
> the primary key.
>
> Best,
> Hang
>
> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 13:49写道:
>
>> Hi team,
>>
>> I have a Kafka topic named employee which uses confluent avro schema and
>> will emit the payload as below:
>>
>> {
>> "employee": {
>> "id": "123456",
>> "name": "sampleName"
>> }
>> }
>> I am using the upsert-kafka connector to consume the events from the
>> above Kafka topic as below using the Flink SQL DDL statement, also here I
>> want to use the id field as the Primary key. But I am unable to use the id
>> field since it is inside the object.
>>
>> DDL Statement:
>> String statement = "CREATE TABLE Employee (\r\n" +
>> "  employee  ROW(id STRING, name STRING\r\n" +
>> "  ),\r\n" +
>> "  PRIMARY KEY (employee.id) NOT ENFORCED\r\n" +
>> ") WITH (\r\n" +
>> "  'connector' = 'upsert-kafka',\r\n" +
>> "  'topic' = 'employee',\r\n" +
>> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
>> "  'key.format' = 'raw',\r\n" +
>> "  'value.format' = 'avro-confluent',\r\n" +
>> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
>> +
>> ")";
>> Any help is appreciated TIA
>>
>> Thanks,
>> Elakiya
>>
>

Reply via email to