Hi forideal, You are using `StreamTableEnvironment` which doesn't support multi-sink optimization in 1.10 :) You should change `StreamTableEnvironment.create` to `TableEnvironment.create`.
Btw, StreamTableEnvironment will also support multi-sink optimization in 1.11. Best, Jark On Fri, 10 Apr 2020 at 16:46, forideal <fszw...@163.com> wrote: > Hi Jark > > Thanks for your reploy. > In my code, i use `TableEnvironemnt` with Blink planner. > > this.tableEnv = StreamTableEnvironment.create( > env, > EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build()); > > // update three sql > sqlsWithoutFunc.forEach(sql -> { > tableEnv.sqlUpdate(sql); > }); > > env.execute(jobName); > > Best Wishes > > At 2020-04-10 16:35:33, "Jark Wu" <imj...@gmail.com> wrote: > > Hi forideal, > > Are you using `StreamTableEnvironment` or SQL CLI? > Currently, only `TableEnvironemnt` with Blink planner have the multi-sink > optimization (reuse shared upstream operators). > > Best, > Jark > > On Fri, 10 Apr 2020 at 16:31, forideal <fszw...@163.com> wrote: > >> Hello >> >> There are 3 SQLs all querying the same table, but the generated GAG >> is 3 independent topologies.I think, the better result is that there is one >> Source and 3 Sinks. >> >> >> create table good_sink (data varchar) with ( >> 'connector.type' = 'console', >> 'connector.dry-run' = 'false', >> 'connector.property-version' = '1', >> 'update-mode' = 'append');create table atomic_sink (data varchar) with ( >> 'connector.type' = 'console', >> 'connector.dry-run' = 'false', >> 'connector.property-version' = '1', >> 'update-mode' = 'append');create table bad_sink (data varchar) with ( >> 'connector.type' = 'console', >> 'connector.dry-run' = 'false', >> 'connector.property-version' = '1', >> 'update-mode' = 'append');create table source_stream (data varchar, `key` >> varchar) with ( >> xxx);insert into >> good_sinkselect >> datafrom >> source_streamwhere >> `key` = 'good';insert into >> atomic_sinkselect >> datafrom >> source_streamwhere >> `key` = 'atomic';insert into >> atomic_sinkselect >> datafrom >> source_streamwhere >> `key` = 'bad'; >> >> DAG picture Link: >> https://pic4.zhimg.com/80/v2-7db1417bd2607d3a939f38cc19228df3_1440w.jpg >> Question Link:https://zhuanlan.zhihu.com/p/128590984 >> >> Best Wishes >> >> >> >> > > > >