Hi Hang,

 The select query works fine absolutely, we have also implemented join
queries which also works without any issues.


On Mon, Jul 10, 2023 at 2:03 PM Hang Ruan <ruanhang1...@gmail.com> wrote:

> Hi, Elakiya.
> Maybe this DDL could be executed. Please execute the select sql `select *
> from KafkaTable`. Then I think there will be some error or the `user_id`
> will not be read correctly.
> Best,
> Hang
> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 16:25写道:
>> Hi Hang Ruan,
>> Thanks for your response. But in the documentation, they have an example
>> of defining the Primary Key for the DDL statement (code below). In that
>> case we should be able to define the primary key for the DDL rite. We have
>> defined the primary key in our earlier use cases when it wasn't a nested
>> field. Please correct me , If I have misunderstood anything.
>> CREATE TABLE KafkaTable (  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',  
>> `user_id` BIGINT,  `item_id` BIGINT,  `behavior` STRING,  PRIMARY KEY 
>> (`user_id`) NOT ENFORCED) WITH (  'connector' = 'upsert-kafka',  ...  
>> 'key.format' = 'json',  'key.json.ignore-parse-errors' = 'true',  
>> 'value.format' = 'json',  'value.json.fail-on-missing-field' = 'false',  
>> 'value.fields-include' = 'EXCEPT_KEY')
>> Thanks,
>> Elakiya
>> On Mon, Jul 10, 2023 at 1:09 PM Hang Ruan <ruanhang1...@gmail.com> wrote:
>>> Hi, elakiya.
>>> The upsert-kafka connector will read the primary keys from the Kafka
>>> message keys. We cannot define the fields in the Kafka message values as
>>> the primary key.
>>> Best,
>>> Hang
>>> elakiya udhayanan <laks....@gmail.com> 于2023年7月10日周一 13:49写道:
>>>> Hi team,
>>>> I have a Kafka topic named employee which uses confluent avro schema
>>>> and will emit the payload as below:
>>>> {
>>>> "employee": {
>>>> "id": "123456",
>>>> "name": "sampleName"
>>>> }
>>>> }
>>>> I am using the upsert-kafka connector to consume the events from the
>>>> above Kafka topic as below using the Flink SQL DDL statement, also here I
>>>> want to use the id field as the Primary key. But I am unable to use the id
>>>> field since it is inside the object.
>>>> DDL Statement:
>>>> String statement = "CREATE TABLE Employee (\r\n" +
>>>> "  employee  ROW(id STRING, name STRING\r\n" +
>>>> "  ),\r\n" +
>>>> "  PRIMARY KEY (employee.id) NOT ENFORCED\r\n" +
>>>> ") WITH (\r\n" +
>>>> "  'connector' = 'upsert-kafka',\r\n" +
>>>> "  'topic' = 'employee',\r\n" +
>>>> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
>>>> "  'key.format' = 'raw',\r\n" +
>>>> "  'value.format' = 'avro-confluent',\r\n" +
>>>> "  'value.avro-confluent.url' = 
>>>> 'http://kafka-cp-schema-registry:8081',\r\n"
>>>> +
>>>> ")";
>>>> Any help is appreciated TIA
>>>> Thanks,
>>>> Elakiya

Reply via email to