Hi, Elakiya. If everything is right for the KafkaTable, I think there must be a `user_id` field in the Kafka message key. We could see the code in the method `createKeyValueProjections` of `UpsertKafkaDynamicTableFactory` as follows.
``` private Tuple2<int[], int[]> createKeyValueProjections(ResolvedCatalogTable catalogTable) { ResolvedSchema schema = catalogTable.getResolvedSchema(); // primary key should validated earlier List<String> keyFields = schema.getPrimaryKey().get().getColumns(); DataType physicalDataType = schema.toPhysicalRowDataType(); Configuration tableOptions = Configuration.fromMap(catalogTable.getOptions()); // upsert-kafka will set key.fields to primary key fields by default tableOptions.set(KEY_FIELDS, keyFields); int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType); return Tuple2.of(keyProjection, valueProjection); } ``` The primary keys will be put in the KEY_FIELDS option to create the key format projection, which will be used to get fields from Kafka message key. Best, Hang elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 16:41写道: > Hi Hang, > > The select query works fine absolutely, we have also implemented join > queries which also works without any issues. > > Thanks, > Elakiya > > On Mon, Jul 10, 2023 at 2:03 PM Hang Ruan <ruanhang1...@gmail.com> wrote: > >> Hi, Elakiya. >> >> Maybe this DDL could be executed. Please execute the select sql `select * >> from KafkaTable`. Then I think there will be some error or the `user_id` >> will not be read correctly. >> >> Best, >> Hang >> >> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 16:25写道: >> >>> Hi Hang Ruan, >>> >>> Thanks for your response. But in the documentation, they have an example >>> of defining the Primary Key for the DDL statement (code below). In that >>> case we should be able to define the primary key for the DDL rite. We have >>> defined the primary key in our earlier use cases when it wasn't a nested >>> field. Please correct me , If I have misunderstood anything. >>> >>> CREATE TABLE KafkaTable ( `ts` TIMESTAMP(3) METADATA FROM 'timestamp', >>> `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING, PRIMARY KEY >>> (`user_id`) NOT ENFORCED) WITH ( 'connector' = 'upsert-kafka', ... >>> 'key.format' = 'json', 'key.json.ignore-parse-errors' = 'true', >>> 'value.format' = 'json', 'value.json.fail-on-missing-field' = 'false', >>> 'value.fields-include' = 'EXCEPT_KEY') >>> >>> Thanks, >>> Elakiya >>> >>> On Mon, Jul 10, 2023 at 1:09 PM Hang Ruan <ruanhang1...@gmail.com> >>> wrote: >>> >>>> Hi, elakiya. >>>> >>>> The upsert-kafka connector will read the primary keys from the Kafka >>>> message keys. We cannot define the fields in the Kafka message values as >>>> the primary key. >>>> >>>> Best, >>>> Hang >>>> >>>> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 13:49写道: >>>> >>>>> Hi team, >>>>> >>>>> I have a Kafka topic named employee which uses confluent avro schema >>>>> and will emit the payload as below: >>>>> >>>>> { >>>>> "employee": { >>>>> "id": "123456", >>>>> "name": "sampleName" >>>>> } >>>>> } >>>>> I am using the upsert-kafka connector to consume the events from the >>>>> above Kafka topic as below using the Flink SQL DDL statement, also here I >>>>> want to use the id field as the Primary key. But I am unable to use the id >>>>> field since it is inside the object. >>>>> >>>>> DDL Statement: >>>>> String statement = "CREATE TABLE Employee (\r\n" + >>>>> " employee ROW(id STRING, name STRING\r\n" + >>>>> " ),\r\n" + >>>>> " PRIMARY KEY (employee.id) NOT ENFORCED\r\n" + >>>>> ") WITH (\r\n" + >>>>> " 'connector' = 'upsert-kafka',\r\n" + >>>>> " 'topic' = 'employee',\r\n" + >>>>> " 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" + >>>>> " 'key.format' = 'raw',\r\n" + >>>>> " 'value.format' = 'avro-confluent',\r\n" + >>>>> " 'value.avro-confluent.url' = >>>>> 'http://kafka-cp-schema-registry:8081',\r\n" >>>>> + >>>>> ")"; >>>>> Any help is appreciated TIA >>>>> >>>>> Thanks, >>>>> Elakiya >>>>> >>>>