Hi Elakiya, Did you encounter a ParserException when executing the DDL? AFAIK, Flink SQL does not support declaring a nested column (compound identifier) as primary key at syntax level.
A possible workaround is to change the schema to not contain record type, then you can change the DDL to the following CREATE TABLE Employee ( id STRING PRIMARY KEY NOT ENFORCED, name STRING ) WITH ( ... ) Best regards, Jane On Mon, Jul 10, 2023 at 7:32 PM elakiya udhayanan <laks....@gmail.com> wrote: > Hi Hang, > Once again thanks for your response, but I think you have misunderstood > my question. At present we are only using the DDL format of Table API and > the only issue we face is , we are unable to set the primary key field for > the Flink table since the value we want to use as primary key is present > inside the object as mentioned in my question earlier. > > Re-posting my question again here: > > > > > > > > > > > > > > > > > > > > > > > > *I have a Kafka topic named employee which uses confluent avro schema and > will emit the payload as below:{"employee": {"id": "123456","name": > "sampleName"}}I am using the upsert-kafka connector to consume the events > from the above Kafka topic as below using the Flink SQL DDL statement, also > here I want to use the id field as the Primary key. But I am unable to use > the id field since it is inside the object.DDL Statement:String statement = > "CREATE TABLE Employee (\r\n" +" employee ROW(id STRING, name STRING\r\n" > +" ),\r\n" +" PRIMARY KEY (employee.id <http://employee.id/>) NOT > ENFORCED\r\n" +") WITH (\r\n" +" 'connector' = 'upsert-kafka',\r\n" +" > 'topic' = 'employee',\r\n" +" 'properties.bootstrap.servers' = > 'kafka-cp-kafka:9092',\r\n" +" 'key.format' = 'raw',\r\n" +" > 'value.format' = 'avro-confluent',\r\n" +" 'value.avro-confluent.url' = > 'http://kafka-cp-schema-registry:8081 > <http://kafka-cp-schema-registry:8081/>',\r\n" +")";* > > On Mon, Jul 10, 2023 at 2:27 PM Hang Ruan <ruanhang1...@gmail.com> wrote: > >> Hi, Elakiya. >> >> If everything is right for the KafkaTable, I think there must be a >> `user_id` field in the Kafka message key. >> We could see the code in the method `createKeyValueProjections` of >> `UpsertKafkaDynamicTableFactory` as follows. >> >> ``` >> private Tuple2<int[], int[]> >> createKeyValueProjections(ResolvedCatalogTable catalogTable) { >> ResolvedSchema schema = catalogTable.getResolvedSchema(); >> // primary key should validated earlier >> List<String> keyFields = >> schema.getPrimaryKey().get().getColumns(); >> DataType physicalDataType = schema.toPhysicalRowDataType(); >> >> Configuration tableOptions = >> Configuration.fromMap(catalogTable.getOptions()); >> // upsert-kafka will set key.fields to primary key fields by >> default >> tableOptions.set(KEY_FIELDS, keyFields); >> >> int[] keyProjection = createKeyFormatProjection(tableOptions, >> physicalDataType); >> int[] valueProjection = createValueFormatProjection(tableOptions, >> physicalDataType); >> >> return Tuple2.of(keyProjection, valueProjection); >> } >> ``` >> The primary keys will be put in the KEY_FIELDS option to create the key >> format projection, which will be used to get fields from Kafka message key. >> >> Best, >> Hang >> >> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 16:41写道: >> >>> Hi Hang, >>> >>> The select query works fine absolutely, we have also implemented join >>> queries which also works without any issues. >>> >>> Thanks, >>> Elakiya >>> >>> On Mon, Jul 10, 2023 at 2:03 PM Hang Ruan <ruanhang1...@gmail.com> >>> wrote: >>> >>>> Hi, Elakiya. >>>> >>>> Maybe this DDL could be executed. Please execute the select sql `select >>>> * from KafkaTable`. Then I think there will be some error or the `user_id` >>>> will not be read correctly. >>>> >>>> Best, >>>> Hang >>>> >>>> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 16:25写道: >>>> >>>>> Hi Hang Ruan, >>>>> >>>>> Thanks for your response. But in the documentation, they have an >>>>> example of defining the Primary Key for the DDL statement (code below). In >>>>> that case we should be able to define the primary key for the DDL rite. We >>>>> have defined the primary key in our earlier use cases when it wasn't a >>>>> nested field. Please correct me , If I have misunderstood anything. >>>>> >>>>> CREATE TABLE KafkaTable ( `ts` TIMESTAMP(3) METADATA FROM 'timestamp', >>>>> `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING, PRIMARY KEY >>>>> (`user_id`) NOT ENFORCED) WITH ( 'connector' = 'upsert-kafka', ... >>>>> 'key.format' = 'json', 'key.json.ignore-parse-errors' = 'true', >>>>> 'value.format' = 'json', 'value.json.fail-on-missing-field' = 'false', >>>>> 'value.fields-include' = 'EXCEPT_KEY') >>>>> >>>>> Thanks, >>>>> Elakiya >>>>> >>>>> On Mon, Jul 10, 2023 at 1:09 PM Hang Ruan <ruanhang1...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, elakiya. >>>>>> >>>>>> The upsert-kafka connector will read the primary keys from the Kafka >>>>>> message keys. We cannot define the fields in the Kafka message values as >>>>>> the primary key. >>>>>> >>>>>> Best, >>>>>> Hang >>>>>> >>>>>> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 13:49写道: >>>>>> >>>>>>> Hi team, >>>>>>> >>>>>>> I have a Kafka topic named employee which uses confluent avro schema >>>>>>> and will emit the payload as below: >>>>>>> >>>>>>> { >>>>>>> "employee": { >>>>>>> "id": "123456", >>>>>>> "name": "sampleName" >>>>>>> } >>>>>>> } >>>>>>> I am using the upsert-kafka connector to consume the events from the >>>>>>> above Kafka topic as below using the Flink SQL DDL statement, also here >>>>>>> I >>>>>>> want to use the id field as the Primary key. But I am unable to use the >>>>>>> id >>>>>>> field since it is inside the object. >>>>>>> >>>>>>> DDL Statement: >>>>>>> String statement = "CREATE TABLE Employee (\r\n" + >>>>>>> " employee ROW(id STRING, name STRING\r\n" + >>>>>>> " ),\r\n" + >>>>>>> " PRIMARY KEY (employee.id) NOT ENFORCED\r\n" + >>>>>>> ") WITH (\r\n" + >>>>>>> " 'connector' = 'upsert-kafka',\r\n" + >>>>>>> " 'topic' = 'employee',\r\n" + >>>>>>> " 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" + >>>>>>> " 'key.format' = 'raw',\r\n" + >>>>>>> " 'value.format' = 'avro-confluent',\r\n" + >>>>>>> " 'value.avro-confluent.url' = ' >>>>>>> http://kafka-cp-schema-registry:8081',\r\n" + >>>>>>> ")"; >>>>>>> Any help is appreciated TIA >>>>>>> >>>>>>> Thanks, >>>>>>> Elakiya >>>>>>> >>>>>>