Hi Jark Thanks for your reploy. In my code, i use `TableEnvironemnt` with Blink planner.
this.tableEnv = StreamTableEnvironment.create( env, EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build()); // update three sql sqlsWithoutFunc.forEach(sql -> { tableEnv.sqlUpdate(sql); }); env.execute(jobName); Best Wishes At 2020-04-10 16:35:33, "Jark Wu" <imj...@gmail.com> wrote: Hi forideal, Are you using `StreamTableEnvironment` or SQL CLI? Currently, only `TableEnvironemnt` with Blink planner have the multi-sink optimization (reuse shared upstream operators). Best, Jark On Fri, 10 Apr 2020 at 16:31, forideal <fszw...@163.com> wrote: Hello There are 3 SQLs all querying the same table, but the generated GAG is 3 independent topologies.I think, the better result is that there is one Source and 3 Sinks. createtablegood_sink(datavarchar)with( 'connector.type'='console', 'connector.dry-run'='false','connector.property-version'='1','update-mode'='append');createtableatomic_sink(datavarchar)with('connector.type'='console','connector.dry-run'='false','connector.property-version'='1','update-mode'='append');createtablebad_sink(datavarchar)with('connector.type'='console','connector.dry-run'='false','connector.property-version'='1','update-mode'='append');createtablesource_stream(datavarchar,`key`varchar)with(xxx);insertintogood_sinkselectdatafromsource_streamwhere`key`='good';insertintoatomic_sinkselectdatafromsource_streamwhere`key`='atomic';insertintoatomic_sinkselectdatafromsource_streamwhere`key`='bad'; DAG picture Link:https://pic4.zhimg.com/80/v2-7db1417bd2607d3a939f38cc19228df3_1440w.jpg Question Link:https://zhuanlan.zhihu.com/p/128590984 Best Wishes