Hi Chen/ Feng, Thanks for pointing out the mistake I made, after correcting the query I am able to run the job with two sinks successfully.
Thanks, Elakiya On Thu, Dec 7, 2023 at 4:37 AM Chen Yu <yuchen.e...@gmail.com> wrote: > Hi Chen, > You should tell flink which table to insert by “INSERT INTO XXX SELECT > XXX”. > > For single non insert query, flink will collect output to the console > automatically. Therefore, you don’t need to add insert also works. > > But you must point out target table specifically when you need to write > data to external storage. > > Like, > > String relateQuery = "insert into xxx select correlator_id , name, > relationship from Correlation; ; > > > Best, > Yu Chen > > > 获取 Outlook for iOS <https://aka.ms/o0ukef> > ------------------------------ > *发件人:* Zhanghao Chen <zhanghao.c...@outlook.com> > *发送时间:* Wednesday, December 6, 2023 7:21:50 PM > *收件人:* elakiya udhayanan <laks....@gmail.com>; user@flink.apache.org < > user@flink.apache.org> > *主题:* Re: Query on using two sinks for a Flink job (Flink SQL) > > Hi Elakiya, > > You can try executing TableEnvironmentImpl#executeInternal for non-insert > statements, then using StatementSet.addInsertSql to add multiple insertion > statetments, and finally calling StatementSet#execute. > > Best, > Zhanghao Chen > ------------------------------ > *From:* elakiya udhayanan <laks....@gmail.com> > *Sent:* Wednesday, December 6, 2023 17:49 > *To:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Query on using two sinks for a Flink job (Flink SQL) > > Hi Team, > I would like to know the possibility of having two sinks in a > single Flink job. In my case I am using the Flink SQL based job where I try > to consume from two different Kafka topics using the create table (as > below) DDL and then use a join condition to correlate them and at present > write it to an external database (PostgreSQL - as a sink). I would like to > know if I can add another sink where I want to also write it to kafka topic > (as the second sink). > I tried using two sql scripts (two create and two insert for the same) but > was facing an exception* "Cannot have more than one execute() or > executeAsync() call in a single environment. at "* > Also tried to use the StatementSet functionality which again gave me an > exception *"org.apache.flink.table.api.TableException: Only insert > statement is supported now. at ".* > I am looking for some help in regards to this. TIA > > *Note:* I am using the Flink UI to submit my job. > > *Sample DDL statement used: *String statement = "CREATE TABLE Person > (\r\n" + > " person ROW(id STRING, name STRING\r\n" + > " ),\r\n" + > " PRIMARY KEY (id) NOT ENFORCED\r\n" + > ") WITH (\r\n" + > " 'connector' = 'upsert-kafka',\r\n" + > " 'topic' = 'employee',\r\n" + > " 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" + > " 'key.format' = 'raw',\r\n" + > " 'value.format' = 'avro-confluent',\r\n" + > " 'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" > + > ")"; > > Thanks, > Elakiya >