Hi Xuyang, Thanks again for giving me some insights on how to use the Datastream API for my use case, I will explore it and experiment with it.
I wanted to use the value inside the row datatype as a primary key because, I might get multiple records for the same id and when I try to make a join with another similar table for correlation purpose it produces numerous results which could be avoided if I can make the id inside the employee object as primary key ,also I cannot modify the avro schema since it is a standard used across multiple consumers. Thanks, Elakiya On Wed, Nov 1, 2023 at 3:00 PM Xuyang <xyzhong...@163.com> wrote: > Hi, Elakiya, > I think you can get what you need here[1] with many examples briging > DataStream api and Table API. > > There may be some redundancy, and I'm not sure this is a best way to > resolve the question. First, use the StreamTableEnvironment to execute > your original ddl without pk. > Second, use > ``` > > *val table = tEnv* > > .toChangelogStream(*tEnv*.sqlQuery("select employee.id, employee.name > from Employee")) > > .toTable(tableEnv, Schema.*newBuilder*().column(xxx, > xxx).primaryKey(xxx).build()); > > *tEnv.*createTemporaryView("Employee2", table); > ``` > to build the table with schema you want. > Third, use > ``` > *tEnv*.executeSql(xxx) > ``` > to execute the DML. > > Another good way is to build a separate job to extract the 'employee' to > a single sink table and use it directly. > > BTW, why you need the semantics about the pk? > > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/ > > > > -- > Best! > Xuyang > > > At 2023-11-01 15:28:25, "elakiya udhayanan" <laks....@gmail.com> wrote: > > Hi Xuyang, > > Thank you for your response. Since, I have no access to create a ticket in > the ASF jira I have requested for the access and once I get the access will > raise a ticket for the same. > > Also, you have asked me to use Datastream API to extract the id and then > use the TableAPI feature, since I have not used the Datastream API, could > you help me with any example if possible, meanwhile i will try to do some > learning on using the DataStream API. > > Thanks, > Elakiya > > On Tue, Oct 31, 2023 at 7:34 AM Xuyang <xyzhong...@163.com> wrote: > >> Hi, Flink SQL doesn't support a inline field in struct type as pk. You >> can try to raise an issue about this feature in community[1]. >> >> For a quick solution, you can try to transform it by DataStream API first >> by extracting the 'id' and then convert it to Table API to use SQL. >> >> [1] >> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-33400?filter=allopenissues >> >> -- >> Best! >> Xuyang >> >> >> At 2023-10-30 16:42:03, "elakiya udhayanan" <laks....@gmail.com> wrote: >> >> Hi team, >> >> I have a Kafka topic named employee which uses confluent avro schema and >> will emit the payload as below: >> >> { >> "employee": { >> "id": "123456", >> "name": "sampleName" >> } >> } >> I am using the upsert-kafka connector to consume the events from the >> above Kafka topic as below using the Flink SQL DDL statement, also here I >> want to use the id field as the Primary key. But I am unable to use the id >> field since it is inside the object. >> >> DDL Statement: >> String statement = "CREATE TABLE Employee (\r\n" + >> " employee ROW(id STRING, name STRING\r\n" + >> " ),\r\n" + >> " PRIMARY KEY (employee.id) NOT ENFORCED\r\n" + >> ") WITH (\r\n" + >> " 'connector' = 'upsert-kafka',\r\n" + >> " 'topic' = 'employee',\r\n" + >> " 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" + >> " 'key.format' = 'raw',\r\n" + >> " 'value.format' = 'avro-confluent',\r\n" + >> " 'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" >> + >> ")"; >> Any help is appreciated TIA >> >> Thanks, >> Elakiya >> >>