Hi forideal,

You are using `StreamTableEnvironment` which doesn't support multi-sink
optimization in 1.10 :)
You should change `StreamTableEnvironment.create` to
`TableEnvironment.create`.

Btw, StreamTableEnvironment will also support multi-sink optimization in
1.11.

Best,
Jark



On Fri, 10 Apr 2020 at 16:46, forideal <fszw...@163.com> wrote:

> Hi Jark
>
> Thanks for your reploy.
> In my code, i use `TableEnvironemnt` with Blink planner.
>
> this.tableEnv = StreamTableEnvironment.create(
> env,
> EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build());
>
> // update three sql
> sqlsWithoutFunc.forEach(sql -> {
> tableEnv.sqlUpdate(sql);
> });
>
> env.execute(jobName);
>
> Best Wishes
>
> At 2020-04-10 16:35:33, "Jark Wu" <imj...@gmail.com> wrote:
>
> Hi forideal,
>
> Are you using `StreamTableEnvironment` or SQL CLI?
> Currently, only `TableEnvironemnt` with Blink planner have the multi-sink
> optimization (reuse shared upstream operators).
>
> Best,
> Jark
>
> On Fri, 10 Apr 2020 at 16:31, forideal <fszw...@163.com> wrote:
>
>> Hello
>>
>>    There are 3 SQLs all querying the same table, but the generated GAG
>> is 3 independent topologies.I think, the better result is that there is one
>> Source and 3 Sinks.
>>
>>
>>   create table good_sink (data varchar) with (
>>   'connector.type' = 'console',
>>   'connector.dry-run' = 'false',
>>   'connector.property-version' = '1',
>>   'update-mode' = 'append');create table atomic_sink (data varchar) with (
>>   'connector.type' = 'console',
>>   'connector.dry-run' = 'false',
>>   'connector.property-version' = '1',
>>   'update-mode' = 'append');create table bad_sink (data varchar) with (
>>   'connector.type' = 'console',
>>   'connector.dry-run' = 'false',
>>   'connector.property-version' = '1',
>>   'update-mode' = 'append');create table source_stream (data varchar, `key` 
>> varchar) with (
>>  xxx);insert into
>>   good_sinkselect
>>   datafrom
>>   source_streamwhere
>>   `key` = 'good';insert into
>>   atomic_sinkselect
>>   datafrom
>>   source_streamwhere
>>   `key` = 'atomic';insert into
>>   atomic_sinkselect
>>   datafrom
>>   source_streamwhere
>>   `key` = 'bad';
>>
>> DAG picture Link:
>> https://pic4.zhimg.com/80/v2-7db1417bd2607d3a939f38cc19228df3_1440w.jpg
>> Question Link:https://zhuanlan.zhihu.com/p/128590984
>>
>> Best Wishes
>>
>>
>>
>>
>
>
>
>

Reply via email to