Hi, Elakiya.
If everything is right for the KafkaTable, I think there must be a
`user_id` field in the Kafka message key.
We could see the code in the method `createKeyValueProjections` of
`UpsertKafkaDynamicTableFactory` as follows.
```
private Tuple2<int[], int[]>
createKeyValueProjections(ResolvedCatalogTable catalogTable) {
ResolvedSchema schema = catalogTable.getResolvedSchema();
// primary key should validated earlier
List<String> keyFields = schema.getPrimaryKey().get().getColumns();
DataType physicalDataType = schema.toPhysicalRowDataType();
Configuration tableOptions =
Configuration.fromMap(catalogTable.getOptions());
// upsert-kafka will set key.fields to primary key fields by default
tableOptions.set(KEY_FIELDS, keyFields);
int[] keyProjection = createKeyFormatProjection(tableOptions,
physicalDataType);
int[] valueProjection = createValueFormatProjection(tableOptions,
physicalDataType);
return Tuple2.of(keyProjection, valueProjection);
}
```
The primary keys will be put in the KEY_FIELDS option to create the key
format projection, which will be used to get fields from Kafka message key.
Best,
Hang
elakiya udhayanan <[email protected]> 于2023年7月10日周一 16:41写道:
> Hi Hang,
>
> The select query works fine absolutely, we have also implemented join
> queries which also works without any issues.
>
> Thanks,
> Elakiya
>
> On Mon, Jul 10, 2023 at 2:03 PM Hang Ruan <[email protected]> wrote:
>
>> Hi, Elakiya.
>>
>> Maybe this DDL could be executed. Please execute the select sql `select *
>> from KafkaTable`. Then I think there will be some error or the `user_id`
>> will not be read correctly.
>>
>> Best,
>> Hang
>>
>> elakiya udhayanan <[email protected]> 于2023年7月10日周一 16:25写道:
>>
>>> Hi Hang Ruan,
>>>
>>> Thanks for your response. But in the documentation, they have an example
>>> of defining the Primary Key for the DDL statement (code below). In that
>>> case we should be able to define the primary key for the DDL rite. We have
>>> defined the primary key in our earlier use cases when it wasn't a nested
>>> field. Please correct me , If I have misunderstood anything.
>>>
>>> CREATE TABLE KafkaTable ( `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
>>> `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING, PRIMARY KEY
>>> (`user_id`) NOT ENFORCED) WITH ( 'connector' = 'upsert-kafka', ...
>>> 'key.format' = 'json', 'key.json.ignore-parse-errors' = 'true',
>>> 'value.format' = 'json', 'value.json.fail-on-missing-field' = 'false',
>>> 'value.fields-include' = 'EXCEPT_KEY')
>>>
>>> Thanks,
>>> Elakiya
>>>
>>> On Mon, Jul 10, 2023 at 1:09 PM Hang Ruan <[email protected]>
>>> wrote:
>>>
>>>> Hi, elakiya.
>>>>
>>>> The upsert-kafka connector will read the primary keys from the Kafka
>>>> message keys. We cannot define the fields in the Kafka message values as
>>>> the primary key.
>>>>
>>>> Best,
>>>> Hang
>>>>
>>>> elakiya udhayanan <[email protected]> 于2023年7月10日周一 13:49写道:
>>>>
>>>>> Hi team,
>>>>>
>>>>> I have a Kafka topic named employee which uses confluent avro schema
>>>>> and will emit the payload as below:
>>>>>
>>>>> {
>>>>> "employee": {
>>>>> "id": "123456",
>>>>> "name": "sampleName"
>>>>> }
>>>>> }
>>>>> I am using the upsert-kafka connector to consume the events from the
>>>>> above Kafka topic as below using the Flink SQL DDL statement, also here I
>>>>> want to use the id field as the Primary key. But I am unable to use the id
>>>>> field since it is inside the object.
>>>>>
>>>>> DDL Statement:
>>>>> String statement = "CREATE TABLE Employee (\r\n" +
>>>>> " employee ROW(id STRING, name STRING\r\n" +
>>>>> " ),\r\n" +
>>>>> " PRIMARY KEY (employee.id) NOT ENFORCED\r\n" +
>>>>> ") WITH (\r\n" +
>>>>> " 'connector' = 'upsert-kafka',\r\n" +
>>>>> " 'topic' = 'employee',\r\n" +
>>>>> " 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
>>>>> " 'key.format' = 'raw',\r\n" +
>>>>> " 'value.format' = 'avro-confluent',\r\n" +
>>>>> " 'value.avro-confluent.url' =
>>>>> 'http://kafka-cp-schema-registry:8081',\r\n"
>>>>> +
>>>>> ")";
>>>>> Any help is appreciated TIA
>>>>>
>>>>> Thanks,
>>>>> Elakiya
>>>>>
>>>>