Hi Elakiya,

Did you encounter a ParserException when executing the DDL? AFAIK, Flink
SQL does not support declaring a nested column (compound identifier) as
primary key at syntax level.

A possible workaround is to change the schema to not contain record type,
then you can change the DDL to the following

CREATE TABLE Employee (
  id STRING PRIMARY KEY NOT ENFORCED,
  name STRING
) WITH (
  ...
)

Best regards,
Jane

On Mon, Jul 10, 2023 at 7:32 PM elakiya udhayanan <laks....@gmail.com>
wrote:

> Hi Hang,
>  Once again thanks for your response, but I think you have misunderstood
> my question. At present we are only using the DDL format of Table API and
> the only issue we face is , we are unable to set the primary key field for
> the Flink table since the value we want to use as primary key is present
> inside the object as mentioned in my question earlier.
>
> Re-posting my question again here:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *I have a Kafka topic named employee which uses confluent avro schema and
> will emit the payload as below:{"employee": {"id": "123456","name":
> "sampleName"}}I am using the upsert-kafka connector to consume the events
> from the above Kafka topic as below using the Flink SQL DDL statement, also
> here I want to use the id field as the Primary key. But I am unable to use
> the id field since it is inside the object.DDL Statement:String statement =
> "CREATE TABLE Employee (\r\n" +"  employee  ROW(id STRING, name STRING\r\n"
> +"  ),\r\n" +"  PRIMARY KEY (employee.id <http://employee.id/>) NOT
> ENFORCED\r\n" +") WITH (\r\n" +"  'connector' = 'upsert-kafka',\r\n" +"
>  'topic' = 'employee',\r\n" +"  'properties.bootstrap.servers' =
> 'kafka-cp-kafka:9092',\r\n" +"  'key.format' = 'raw',\r\n" +"
>  'value.format' = 'avro-confluent',\r\n" +"  'value.avro-confluent.url' =
> 'http://kafka-cp-schema-registry:8081
> <http://kafka-cp-schema-registry:8081/>',\r\n" +")";*
>
> On Mon, Jul 10, 2023 at 2:27 PM Hang Ruan <ruanhang1...@gmail.com> wrote:
>
>> Hi, Elakiya.
>>
>> If everything is right for the KafkaTable, I think there must be a
>> `user_id` field in the Kafka message key.
>> We could see the code in the method `createKeyValueProjections` of
>> `UpsertKafkaDynamicTableFactory` as follows.
>>
>> ```
>>     private Tuple2<int[], int[]>
>> createKeyValueProjections(ResolvedCatalogTable catalogTable) {
>>         ResolvedSchema schema = catalogTable.getResolvedSchema();
>>         // primary key should validated earlier
>>         List<String> keyFields =
>> schema.getPrimaryKey().get().getColumns();
>>         DataType physicalDataType = schema.toPhysicalRowDataType();
>>
>>         Configuration tableOptions =
>> Configuration.fromMap(catalogTable.getOptions());
>>         // upsert-kafka will set key.fields to primary key fields by
>> default
>>         tableOptions.set(KEY_FIELDS, keyFields);
>>
>>         int[] keyProjection = createKeyFormatProjection(tableOptions,
>> physicalDataType);
>>         int[] valueProjection = createValueFormatProjection(tableOptions,
>> physicalDataType);
>>
>>         return Tuple2.of(keyProjection, valueProjection);
>>     }
>> ```
>> The primary keys will be put in the KEY_FIELDS option to create the key
>> format projection, which will be used to get fields from Kafka message key.
>>
>> Best,
>> Hang
>>
>> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 16:41写道:
>>
>>> Hi Hang,
>>>
>>>  The select query works fine absolutely, we have also implemented join
>>> queries which also works without any issues.
>>>
>>> Thanks,
>>> Elakiya
>>>
>>> On Mon, Jul 10, 2023 at 2:03 PM Hang Ruan <ruanhang1...@gmail.com>
>>> wrote:
>>>
>>>> Hi, Elakiya.
>>>>
>>>> Maybe this DDL could be executed. Please execute the select sql `select
>>>> * from KafkaTable`. Then I think there will be some error or the `user_id`
>>>> will not be read correctly.
>>>>
>>>> Best,
>>>> Hang
>>>>
>>>> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 16:25写道:
>>>>
>>>>> Hi Hang Ruan,
>>>>>
>>>>> Thanks for your response. But in the documentation, they have an
>>>>> example of defining the Primary Key for the DDL statement (code below). In
>>>>> that case we should be able to define the primary key for the DDL rite. We
>>>>> have defined the primary key in our earlier use cases when it wasn't a
>>>>> nested field. Please correct me , If I have misunderstood anything.
>>>>>
>>>>> CREATE TABLE KafkaTable (  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',  
>>>>> `user_id` BIGINT,  `item_id` BIGINT,  `behavior` STRING,  PRIMARY KEY 
>>>>> (`user_id`) NOT ENFORCED) WITH (  'connector' = 'upsert-kafka',  ...  
>>>>> 'key.format' = 'json',  'key.json.ignore-parse-errors' = 'true',  
>>>>> 'value.format' = 'json',  'value.json.fail-on-missing-field' = 'false',  
>>>>> 'value.fields-include' = 'EXCEPT_KEY')
>>>>>
>>>>> Thanks,
>>>>> Elakiya
>>>>>
>>>>> On Mon, Jul 10, 2023 at 1:09 PM Hang Ruan <ruanhang1...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi, elakiya.
>>>>>>
>>>>>> The upsert-kafka connector will read the primary keys from the Kafka
>>>>>> message keys. We cannot define the fields in the Kafka message values as
>>>>>> the primary key.
>>>>>>
>>>>>> Best,
>>>>>> Hang
>>>>>>
>>>>>> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 13:49写道:
>>>>>>
>>>>>>> Hi team,
>>>>>>>
>>>>>>> I have a Kafka topic named employee which uses confluent avro schema
>>>>>>> and will emit the payload as below:
>>>>>>>
>>>>>>> {
>>>>>>> "employee": {
>>>>>>> "id": "123456",
>>>>>>> "name": "sampleName"
>>>>>>> }
>>>>>>> }
>>>>>>> I am using the upsert-kafka connector to consume the events from the
>>>>>>> above Kafka topic as below using the Flink SQL DDL statement, also here 
>>>>>>> I
>>>>>>> want to use the id field as the Primary key. But I am unable to use the 
>>>>>>> id
>>>>>>> field since it is inside the object.
>>>>>>>
>>>>>>> DDL Statement:
>>>>>>> String statement = "CREATE TABLE Employee (\r\n" +
>>>>>>> "  employee  ROW(id STRING, name STRING\r\n" +
>>>>>>> "  ),\r\n" +
>>>>>>> "  PRIMARY KEY (employee.id) NOT ENFORCED\r\n" +
>>>>>>> ") WITH (\r\n" +
>>>>>>> "  'connector' = 'upsert-kafka',\r\n" +
>>>>>>> "  'topic' = 'employee',\r\n" +
>>>>>>> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
>>>>>>> "  'key.format' = 'raw',\r\n" +
>>>>>>> "  'value.format' = 'avro-confluent',\r\n" +
>>>>>>> "  'value.avro-confluent.url' = '
>>>>>>> http://kafka-cp-schema-registry:8081',\r\n" +
>>>>>>> ")";
>>>>>>> Any help is appreciated TIA
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Elakiya
>>>>>>>
>>>>>>

Reply via email to