Hi Elakiya,
You should use DML in the statement set instead of DQL . Here is a simple example: executeSql("CREATE TABLE source_table1 ......"); executeSql("CREATE TABLE source_table2 ......"); executeSql("CREATE TABLE sink_table1 ......"); executeSql("CREATE TABLE sink_table1 ......"); stmtSet.addInsertSql("INSERT INTO sink_tabl1 SELECT xxx from source_table1 join source_table2 ..."); stmtSet.addInsertSql("INSERT INTO sink_tabl2 SELECT xxx from source_table1 join source_table2 ..."); stmtSet.execute(); Best, Feng On Thu, Dec 7, 2023 at 12:48 AM elakiya udhayanan <laks....@gmail.com> wrote: > Hi Xuyang, Zhangao, > > Thanks for your response, I have attached sample job files that I tried > with the Statementset and with two queries. Please let me know if you are > able to point out where I am possibly going wrong. > > Thanks, > Elakiya > > On Wed, Dec 6, 2023 at 4:51 PM Xuyang <xyzhong...@163.com> wrote: > >> Hi, Elakiya. >> Are you following the example here[1]? Could you attach a minimal, >> reproducible SQL? >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/insert/ >> >> >> >> -- >> Best! >> Xuyang >> >> >> At 2023-12-06 17:49:17, "elakiya udhayanan" <laks....@gmail.com> wrote: >> >> Hi Team, >> I would like to know the possibility of having two sinks in a >> single Flink job. In my case I am using the Flink SQL based job where I try >> to consume from two different Kafka topics using the create table (as >> below) DDL and then use a join condition to correlate them and at present >> write it to an external database (PostgreSQL - as a sink). I would like to >> know if I can add another sink where I want to also write it to kafka topic >> (as the second sink). >> I tried using two sql scripts (two create and two insert for the same) >> but was facing an exception* "Cannot have more than one execute() or >> executeAsync() call in a single environment. at "* >> Also tried to use the StatementSet functionality which again gave me an >> exception *"org.apache.flink.table.api.TableException: Only insert >> statement is supported now. at ".* >> I am looking for some help in regards to this. TIA >> >> *Note:* I am using the Flink UI to submit my job. >> >> *Sample DDL statement used:*String statement = "CREATE TABLE Person >> (\r\n" + >> " person ROW(id STRING, name STRING\r\n" + >> " ),\r\n" + >> " PRIMARY KEY (id) NOT ENFORCED\r\n" + >> ") WITH (\r\n" + >> " 'connector' = 'upsert-kafka',\r\n" + >> " 'topic' = 'employee',\r\n" + >> " 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" + >> " 'key.format' = 'raw',\r\n" + >> " 'value.format' = 'avro-confluent',\r\n" + >> " 'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" >> + >> ")"; >> >> Thanks, >> Elakiya >> >>