Hi Jark

Thanks for your reploy.
In my code, i use `TableEnvironemnt` with Blink planner.

this.tableEnv = StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build());

// update three sql
sqlsWithoutFunc.forEach(sql -> {
tableEnv.sqlUpdate(sql);
});

env.execute(jobName);

Best Wishes



At 2020-04-10 16:35:33, "Jark Wu" <imj...@gmail.com> wrote:

Hi forideal,


Are you using `StreamTableEnvironment` or SQL CLI? 
Currently, only `TableEnvironemnt` with Blink planner have the multi-sink 
optimization (reuse shared upstream operators).


Best,
Jark


On Fri, 10 Apr 2020 at 16:31, forideal <fszw...@163.com> wrote:

Hello
   
   There are 3 SQLs all querying the same table, but the generated GAG is 3 
independent topologies.I think, the better result is that there is one Source 
and 3 Sinks.
 
  createtablegood_sink(datavarchar)with(
  'connector.type'='console',
'connector.dry-run'='false','connector.property-version'='1','update-mode'='append');createtableatomic_sink(datavarchar)with('connector.type'='console','connector.dry-run'='false','connector.property-version'='1','update-mode'='append');createtablebad_sink(datavarchar)with('connector.type'='console','connector.dry-run'='false','connector.property-version'='1','update-mode'='append');createtablesource_stream(datavarchar,`key`varchar)with(xxx);insertintogood_sinkselectdatafromsource_streamwhere`key`='good';insertintoatomic_sinkselectdatafromsource_streamwhere`key`='atomic';insertintoatomic_sinkselectdatafromsource_streamwhere`key`='bad';
DAG picture 
Link:https://pic4.zhimg.com/80/v2-7db1417bd2607d3a939f38cc19228df3_1440w.jpg
Question Link:https://zhuanlan.zhihu.com/p/128590984


Best Wishes 




 

Reply via email to