[ https://issues.apache.org/jira/browse/FLINK-18145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17155058#comment-17155058 ]
godfrey he commented on FLINK-18145: ------------------------------------ [~hehuiyuan] from 1.11 both {{StreamTableEnvrionment}} and {{TableEnvironment}} support DAG optimization, but only Table API or SQL support it. Once your table program is converted {{DataStream}} (e.g. the first sink part in above example), DAG optimization is not supported. before 1.11, just as [~libenchao] said, only {{TableEnvironment}} supports DAG optimization. > Segment optimization does not work in blink ? > --------------------------------------------- > > Key: FLINK-18145 > URL: https://issues.apache.org/jira/browse/FLINK-18145 > Project: Flink > Issue Type: Wish > Components: Table SQL / Planner > Reporter: hehuiyuan > Priority: Minor > Attachments: image-2020-06-05-14-56-01-710.png, > image-2020-06-05-14-56-48-625.png, image-2020-06-05-14-57-11-287.png, > image-2020-06-09-14-58-44-221.png > > > DAG Segement Optimization: > > !image-2020-06-05-14-56-01-710.png|width=762,height=264! > Code: > {code:java} > StreamExecutionEnvironment env = EnvUtil.getEnv(); > env.setParallelism(1); > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env,bsSettings); > GeneratorTableSource tableSource = new GeneratorTableSource(2, 1, 70, 0); > tableEnv.registerTableSource("myTble",tableSource); > Table mytable = tableEnv.scan("myTble"); > mytable.printSchema(); > tableEnv.toAppendStream(mytable,Row.class).addSink(new > PrintSinkFunction<>()).setParallelism(2); > Table tableproc = tableEnv.sqlQuery("SELECT key, count(rowtime_string) as > countkey,TUMBLE_START(proctime, INTERVAL '30' SECOND) as tumblestart FROM > myTble group by TUMBLE(proctime, INTERVAL '30' SECOND) ,key"); > tableproc.printSchema(); > tableEnv.registerTable("t4",tableproc); > Table table = tableEnv.sqlQuery("SELECT key,count(rowtime_string) as > countkey,TUMBLE_START(proctime, INTERVAL '24' HOUR) as tumblestart FROM > myTble group by TUMBLE(proctime, INTERVAL '24' HOUR) ,key"); > table.printSchema(); > tableEnv.registerTable("t3",table); > String[] fields = new String[]{"key","countkey","tumblestart"}; > TypeInformation[] fieldsType = new TypeInformation[3]; > fieldsType[0] = Types.INT; > fieldsType[1] = Types.LONG; > fieldsType[2] = Types.SQL_TIMESTAMP; > PrintTableUpsertSink printTableSink = new > PrintTableUpsertSink(fields,fieldsType,true); > tableEnv.registerTableSink("inserttable",printTableSink); > tableEnv.sqlUpdate("insert into inserttable select key,countkey,tumblestart > from t3"); > String[] fieldsproc = new String[]{"key","countkey","tumblestart"}; > TypeInformation[] fieldsTypeproc = new TypeInformation[3]; > fieldsTypeproc[0] = Types.INT; > fieldsTypeproc[1] = Types.LONG; > fieldsTypeproc[2] = Types.SQL_TIMESTAMP; > PrintTableUpsertSink printTableSinkproc = new > PrintTableUpsertSink(fieldsproc,fieldsTypeproc,true); > tableEnv.registerTableSink("inserttableproc",printTableSinkproc); > tableEnv.sqlUpdate("insert into inserttableproc select > key,countkey,tumblestart from t4"); > {code} > I have a custom table source , then > (1) transform datastream to use `toAppendStream` method , then sink > (2) use tumble ,then sink > (3) use another tumbel ,then sink > but the segement optimization did't work. > > !image-2020-06-05-14-57-11-287.png|width=546,height=388! > > *The source is executed by 3 threads and generate duplicate data for 3 times* > > !image-2020-06-05-14-56-48-625.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)