Hi Chen/ Feng,

Thanks for pointing out the mistake I made, after correcting the query I am
able to run the job with two sinks successfully.

Thanks,
Elakiya

On Thu, Dec 7, 2023 at 4:37 AM Chen Yu <yuchen.e...@gmail.com> wrote:

> Hi  Chen,
> You should tell flink which table to insert by “INSERT INTO XXX SELECT
> XXX”.
>
> For single non insert query, flink will collect output to the console
> automatically. Therefore, you don’t need to add insert also works.
>
> But you must point out target table specifically when you need to write
> data to external storage.
>
> Like,
>
> String relateQuery = "insert into xxx select correlator_id , name, 
> relationship from Correlation; ;
>
>
> Best,
> Yu Chen
>
>
> 获取 Outlook for iOS <https://aka.ms/o0ukef>
> ------------------------------
> *发件人:* Zhanghao Chen <zhanghao.c...@outlook.com>
> *发送时间:* Wednesday, December 6, 2023 7:21:50 PM
> *收件人:* elakiya udhayanan <laks....@gmail.com>; user@flink.apache.org <
> user@flink.apache.org>
> *主题:* Re: Query on using two sinks for a Flink job (Flink SQL)
>
> Hi Elakiya,
>
> You can try executing TableEnvironmentImpl#executeInternal for non-insert
> statements, then using StatementSet.addInsertSql to add multiple insertion
> statetments, and finally calling StatementSet#execute.
>
> Best,
> Zhanghao Chen
> ------------------------------
> *From:* elakiya udhayanan <laks....@gmail.com>
> *Sent:* Wednesday, December 6, 2023 17:49
> *To:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Query on using two sinks for a Flink job (Flink SQL)
>
> Hi Team,
>  I would like to know the possibility of having two sinks in a
> single Flink job. In my case I am using the Flink SQL based job where I try
> to consume from two different Kafka topics using the create table (as
> below) DDL and then use a join condition to correlate them and at present
> write it to an external database (PostgreSQL - as a sink). I would like to
> know if I can add another sink where I want to also write it to kafka topic
> (as the second sink).
> I tried using two sql scripts (two create and two insert for the same) but
> was facing an exception* "Cannot have more than one execute() or
> executeAsync() call in a single environment. at "*
> Also tried to use the StatementSet functionality which again gave me an
> exception *"org.apache.flink.table.api.TableException: Only insert
> statement is supported now. at ".*
> I am looking for some help in regards to this. TIA
>
> *Note:* I am using the Flink UI to submit my job.
>
> *Sample DDL statement used: *String statement = "CREATE TABLE Person
> (\r\n" +
> "  person ROW(id STRING, name STRING\r\n" +
> "  ),\r\n" +
> "  PRIMARY KEY (id) NOT ENFORCED\r\n" +
> ") WITH (\r\n" +
> "  'connector' = 'upsert-kafka',\r\n" +
> "  'topic' = 'employee',\r\n" +
> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
> "  'key.format' = 'raw',\r\n" +
> "  'value.format' = 'avro-confluent',\r\n" +
> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
> +
> ")";
>
> Thanks,
> Elakiya
>

Reply via email to