Hi, Elakiya,
I think you can get what you need here[1] with many examples briging DataStream
api and Table API.
There may be some redundancy, and I'm not sure this is a best way to resolve
the question. First, use the StreamTableEnvironment to execute your original
ddl without pk.
Second, use
```
val table = tEnv
.toChangelogStream(tEnv.sqlQuery("select employee.id, employee.name from
Employee"))
.toTable(tableEnv, Schema.newBuilder().column(xxx,
xxx).primaryKey(xxx).build());
tEnv.createTemporaryView("Employee2", table);
```
to build the table with schema you want.
Third, use
```
tEnv.executeSql(xxx)
```
to execute the DML.
Another good way is to build a separate job to extract the 'employee' to a
single sink table and use it directly.
BTW, why you need the semantics about the pk?
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/
--
Best!
Xuyang
At 2023-11-01 15:28:25, "elakiya udhayanan" <[email protected]> wrote:
Hi Xuyang,
Thank you for your response. Since, I have no access to create a ticket in the
ASF jira I have requested for the access and once I get the access will raise a
ticket for the same.
Also, you have asked me to use Datastream API to extract the id and then use
the TableAPI feature, since I have not used the Datastream API, could you help
me with any example if possible, meanwhile i will try to do some learning on
using the DataStream API.
Thanks,
Elakiya
On Tue, Oct 31, 2023 at 7:34 AM Xuyang <[email protected]> wrote:
Hi, Flink SQL doesn't support a inline field in struct type as pk. You can try
to raise an issue about this feature in community[1].
For a quick solution, you can try to transform it by DataStream API first by
extracting the 'id' and then convert it to Table API to use SQL.
[1]
https://issues.apache.org/jira/projects/FLINK/issues/FLINK-33400?filter=allopenissues
--
Best!
Xuyang
At 2023-10-30 16:42:03, "elakiya udhayanan" <[email protected]> wrote:
Hi team,
I have a Kafka topic named employee which uses confluent avro schema and will
emit the payload as below:
{
"employee": {
"id": "123456",
"name": "sampleName"
}
}
I am using the upsert-kafka connector to consume the events from the above
Kafka topic as below using the Flink SQL DDL statement, also here I want to use
the id field as the Primary key. But I am unable to use the id field since it
is inside the object.
DDL Statement:
String statement = "CREATE TABLE Employee (\r\n" +
" employee ROW(id STRING, name STRING\r\n" +
" ),\r\n" +
" PRIMARY KEY (employee.id) NOT ENFORCED\r\n" +
") WITH (\r\n" +
" 'connector' = 'upsert-kafka',\r\n" +
" 'topic' = 'employee',\r\n" +
" 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
" 'key.format' = 'raw',\r\n" +
" 'value.format' = 'avro-confluent',\r\n" +
" 'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" +
")";
Any help is appreciated TIA
Thanks,
Elakiya