Hi Elakiya, You can try executing TableEnvironmentImpl#executeInternal for non-insert statements, then using StatementSet.addInsertSql to add multiple insertion statetments, and finally calling StatementSet#execute.
Best, Zhanghao Chen ________________________________ From: elakiya udhayanan <laks....@gmail.com> Sent: Wednesday, December 6, 2023 17:49 To: user@flink.apache.org <user@flink.apache.org> Subject: Query on using two sinks for a Flink job (Flink SQL) Hi Team, I would like to know the possibility of having two sinks in a single Flink job. In my case I am using the Flink SQL based job where I try to consume from two different Kafka topics using the create table (as below) DDL and then use a join condition to correlate them and at present write it to an external database (PostgreSQL - as a sink). I would like to know if I can add another sink where I want to also write it to kafka topic (as the second sink). I tried using two sql scripts (two create and two insert for the same) but was facing an exception "Cannot have more than one execute() or executeAsync() call in a single environment. at " Also tried to use the StatementSet functionality which again gave me an exception "org.apache.flink.table.api.TableException: Only insert statement is supported now. at ". I am looking for some help in regards to this. TIA Note: I am using the Flink UI to submit my job. Sample DDL statement used: String statement = "CREATE TABLE Person (\r\n" + " person ROW(id STRING, name STRING\r\n" + " ),\r\n" + " PRIMARY KEY (id) NOT ENFORCED\r\n" + ") WITH (\r\n" + " 'connector' = 'upsert-kafka',\r\n" + " 'topic' = 'employee',\r\n" + " 'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" + " 'key.format' = 'raw',\r\n" + " 'value.format' = 'avro-confluent',\r\n" + " 'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" + ")"; Thanks, Elakiya