Hi Elakiya,

You should use DML in the statement set  instead of DQL .


Here is a simple example:

executeSql("CREATE TABLE source_table1 ......");

executeSql("CREATE TABLE source_table2 ......");

executeSql("CREATE TABLE sink_table1 ......");

executeSql("CREATE TABLE sink_table1 ......");


stmtSet.addInsertSql("INSERT INTO sink_tabl1 SELECT xxx from  source_table1
join source_table2 ...");

stmtSet.addInsertSql("INSERT INTO sink_tabl2 SELECT xxx from  source_table1
join source_table2 ...");

stmtSet.execute();


Best,
Feng


On Thu, Dec 7, 2023 at 12:48 AM elakiya udhayanan <laks....@gmail.com>
wrote:

> Hi Xuyang, Zhangao,
>
> Thanks for your response, I have attached sample job files that I tried
> with the Statementset and with two queries. Please let me know if you are
> able to point out where I am possibly going wrong.
>
> Thanks,
> Elakiya
>
> On Wed, Dec 6, 2023 at 4:51 PM Xuyang <xyzhong...@163.com> wrote:
>
>> Hi, Elakiya.
>> Are you following the example here[1]? Could you attach a minimal,
>> reproducible SQL?
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/insert/
>>
>>
>>
>> --
>>     Best!
>>     Xuyang
>>
>>
>> At 2023-12-06 17:49:17, "elakiya udhayanan" <laks....@gmail.com> wrote:
>>
>> Hi Team,
>>  I would like to know the possibility of having two sinks in a
>> single Flink job. In my case I am using the Flink SQL based job where I try
>> to consume from two different Kafka topics using the create table (as
>> below) DDL and then use a join condition to correlate them and at present
>> write it to an external database (PostgreSQL - as a sink). I would like to
>> know if I can add another sink where I want to also write it to kafka topic
>> (as the second sink).
>> I tried using two sql scripts (two create and two insert for the same)
>> but was facing an exception* "Cannot have more than one execute() or
>> executeAsync() call in a single environment. at "*
>> Also tried to use the StatementSet functionality which again gave me an
>> exception *"org.apache.flink.table.api.TableException: Only insert
>> statement is supported now. at ".*
>> I am looking for some help in regards to this. TIA
>>
>> *Note:* I am using the Flink UI to submit my job.
>>
>> *Sample DDL statement used:*String statement = "CREATE TABLE Person
>> (\r\n" +
>> "  person ROW(id STRING, name STRING\r\n" +
>> "  ),\r\n" +
>> "  PRIMARY KEY (id) NOT ENFORCED\r\n" +
>> ") WITH (\r\n" +
>> "  'connector' = 'upsert-kafka',\r\n" +
>> "  'topic' = 'employee',\r\n" +
>> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
>> "  'key.format' = 'raw',\r\n" +
>> "  'value.format' = 'avro-confluent',\r\n" +
>> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
>> +
>> ")";
>>
>> Thanks,
>> Elakiya
>>
>>

Reply via email to