Hi Xuyang, Zhangao,

Thanks for your response, I have attached sample job files that I tried
with the Statementset and with two queries. Please let me know if you are
able to point out where I am possibly going wrong.

Thanks,
Elakiya

On Wed, Dec 6, 2023 at 4:51 PM Xuyang <xyzhong...@163.com> wrote:

> Hi, Elakiya.
> Are you following the example here[1]? Could you attach a minimal,
> reproducible SQL?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/insert/
>
>
>
> --
>     Best!
>     Xuyang
>
>
> At 2023-12-06 17:49:17, "elakiya udhayanan" <laks....@gmail.com> wrote:
>
> Hi Team,
>  I would like to know the possibility of having two sinks in a
> single Flink job. In my case I am using the Flink SQL based job where I try
> to consume from two different Kafka topics using the create table (as
> below) DDL and then use a join condition to correlate them and at present
> write it to an external database (PostgreSQL - as a sink). I would like to
> know if I can add another sink where I want to also write it to kafka topic
> (as the second sink).
> I tried using two sql scripts (two create and two insert for the same) but
> was facing an exception* "Cannot have more than one execute() or
> executeAsync() call in a single environment. at "*
> Also tried to use the StatementSet functionality which again gave me an
> exception *"org.apache.flink.table.api.TableException: Only insert
> statement is supported now. at ".*
> I am looking for some help in regards to this. TIA
>
> *Note:* I am using the Flink UI to submit my job.
>
> *Sample DDL statement used:*String statement = "CREATE TABLE Person
> (\r\n" +
> "  person ROW(id STRING, name STRING\r\n" +
> "  ),\r\n" +
> "  PRIMARY KEY (id) NOT ENFORCED\r\n" +
> ") WITH (\r\n" +
> "  'connector' = 'upsert-kafka',\r\n" +
> "  'topic' = 'employee',\r\n" +
> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
> "  'key.format' = 'raw',\r\n" +
> "  'value.format' = 'avro-confluent',\r\n" +
> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
> +
> ")";
>
> Thanks,
> Elakiya
>
>

Attachment: TwoSinks.java
Description: Binary data

Attachment: TwoSinks2.java
Description: Binary data

Reply via email to