Hi Xuyang, Thank you for your response. Since, I have no access to create a ticket in the ASF jira I have requested for the access and once I get the access will raise a ticket for the same.
Also, you have asked me to use Datastream API to extract the id and then use the TableAPI feature, since I have not used the Datastream API, could you help me with any example if possible, meanwhile i will try to do some learning on using the DataStream API. Thanks, Elakiya On Tue, Oct 31, 2023 at 7:34 AM Xuyang <xyzhong...@163.com> wrote: > Hi, Flink SQL doesn't support a inline field in struct type as pk. You can > try to raise an issue about this feature in community[1]. > > For a quick solution, you can try to transform it by DataStream API first > by extracting the 'id' and then convert it to Table API to use SQL. > > [1] > https://issues.apache.org/jira/projects/FLINK/issues/FLINK-33400?filter=allopenissues > > -- > Best! > Xuyang > > > At 2023-10-30 16:42:03, "elakiya udhayanan" <laks....@gmail.com> wrote: > > Hi team, > > I have a Kafka topic named employee which uses confluent avro schema and > will emit the payload as below: > > { > "employee": { > "id": "123456", > "name": "sampleName" > } > } > I am using the upsert-kafka connector to consume the events from the above > Kafka topic as below using the Flink SQL DDL statement, also here I want to > use the id field as the Primary key. But I am unable to use the id field > since it is inside the object. > > DDL Statement: > String statement = "CREATE TABLE Employee (\r\n" + > " employee ROW(id STRING, name STRING\r\n" + > " ),\r\n" + > " PRIMARY KEY (employee.id) NOT ENFORCED\r\n" + > ") WITH (\r\n" + > " 'connector' = 'upsert-kafka',\r\n" + > " 'topic' = 'employee',\r\n" + > " 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" + > " 'key.format' = 'raw',\r\n" + > " 'value.format' = 'avro-confluent',\r\n" + > " 'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" > + > ")"; > Any help is appreciated TIA > > Thanks, > Elakiya > >