Hi Hang, The select query works fine absolutely, we have also implemented join queries which also works without any issues.
Thanks, Elakiya On Mon, Jul 10, 2023 at 2:03 PM Hang Ruan <ruanhang1...@gmail.com> wrote: > Hi, Elakiya. > > Maybe this DDL could be executed. Please execute the select sql `select * > from KafkaTable`. Then I think there will be some error or the `user_id` > will not be read correctly. > > Best, > Hang > > elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 16:25写道: > >> Hi Hang Ruan, >> >> Thanks for your response. But in the documentation, they have an example >> of defining the Primary Key for the DDL statement (code below). In that >> case we should be able to define the primary key for the DDL rite. We have >> defined the primary key in our earlier use cases when it wasn't a nested >> field. Please correct me , If I have misunderstood anything. >> >> CREATE TABLE KafkaTable ( `ts` TIMESTAMP(3) METADATA FROM 'timestamp', >> `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING, PRIMARY KEY >> (`user_id`) NOT ENFORCED) WITH ( 'connector' = 'upsert-kafka', ... >> 'key.format' = 'json', 'key.json.ignore-parse-errors' = 'true', >> 'value.format' = 'json', 'value.json.fail-on-missing-field' = 'false', >> 'value.fields-include' = 'EXCEPT_KEY') >> >> Thanks, >> Elakiya >> >> On Mon, Jul 10, 2023 at 1:09 PM Hang Ruan <ruanhang1...@gmail.com> wrote: >> >>> Hi, elakiya. >>> >>> The upsert-kafka connector will read the primary keys from the Kafka >>> message keys. We cannot define the fields in the Kafka message values as >>> the primary key. >>> >>> Best, >>> Hang >>> >>> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 13:49写道: >>> >>>> Hi team, >>>> >>>> I have a Kafka topic named employee which uses confluent avro schema >>>> and will emit the payload as below: >>>> >>>> { >>>> "employee": { >>>> "id": "123456", >>>> "name": "sampleName" >>>> } >>>> } >>>> I am using the upsert-kafka connector to consume the events from the >>>> above Kafka topic as below using the Flink SQL DDL statement, also here I >>>> want to use the id field as the Primary key. But I am unable to use the id >>>> field since it is inside the object. >>>> >>>> DDL Statement: >>>> String statement = "CREATE TABLE Employee (\r\n" + >>>> " employee ROW(id STRING, name STRING\r\n" + >>>> " ),\r\n" + >>>> " PRIMARY KEY (employee.id) NOT ENFORCED\r\n" + >>>> ") WITH (\r\n" + >>>> " 'connector' = 'upsert-kafka',\r\n" + >>>> " 'topic' = 'employee',\r\n" + >>>> " 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" + >>>> " 'key.format' = 'raw',\r\n" + >>>> " 'value.format' = 'avro-confluent',\r\n" + >>>> " 'value.avro-confluent.url' = >>>> 'http://kafka-cp-schema-registry:8081',\r\n" >>>> + >>>> ")"; >>>> Any help is appreciated TIA >>>> >>>> Thanks, >>>> Elakiya >>>> >>>