Hi, Flink SQL doesn't support a inline field in struct type as pk. You can try 
to raise an issue about this feature in community[1]. 

For a quick solution, you can try to transform it by DataStream API first by 
extracting the 'id' and then convert it to Table API to use SQL.



[1] 
https://issues.apache.org/jira/projects/FLINK/issues/FLINK-33400?filter=allopenissues

--

    Best!
    Xuyang




At 2023-10-30 16:42:03, "elakiya udhayanan" <laks....@gmail.com> wrote:

Hi team,


I have a Kafka topic named employee which uses confluent avro schema and will 
emit the payload as below:

{
"employee": {
"id": "123456",
"name": "sampleName"
}
}
I am using the upsert-kafka connector to consume the events from the above 
Kafka topic as below using the Flink SQL DDL statement, also here I want to use 
the id field as the Primary key. But I am unable to use the id field since it 
is inside the object.

DDL Statement:
String statement = "CREATE TABLE Employee (\r\n" +
"  employee  ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (employee.id) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" +
")";

Any help is appreciated TIA


Thanks,
Elakiya

Reply via email to