Hi, you can only get the execution plan for programs that have a data sink and haven't been executed. In your code print() defines the data sink, however it also eagerly executes a program. After execution the program is "removed" from the execution environment. Therefore, Flink complains that no sink has been defined.
You can print the execution plan if you use a data sink that does not eagerly execute. For example, you can replace the sum.print() statement with sum.output(new DiscardingOutputFormat()) or sum.writeAsText("file://some/path"). Cheers, Fabian 2016-01-01 10:21 GMT+01:00 madhu phatak <phatak....@gmail.com>: > Hi, > I am trying to get execution plan for wordcount using below code in local > mode inside IntelliJ IDEA. I am using flink 0.10.0. > > val env = ExecutionEnvironment.getExecutionEnvironment > > val data = List("hi","how are you","hi") > > val dataSet = env.fromCollection(data) > > val words = dataSet.flatMap(value => value.split("\\s+")) > > val mappedWords = words.map(value => (value,1)) > > val grouped = mappedWords.groupBy(0) > > val sum = grouped.sum(1) > > sum.print() > > println(env.getExecutionPlan()) > > > The program computes sum correctly, but fails with following exception for > last line > > Exception in thread "main" java.lang.RuntimeException: No new data sinks > have been defined since the last execution. The last execution refers to > the latest call to 'execute()', 'count()', 'collect()', or 'print()'. > at > org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:925) > at > org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:95) > at > org.apache.flink.api.scala.ExecutionEnvironment.getExecutionPlan(ExecutionEnvironment.scala:635) > at com.madhukaraphatak.flink.WordCount$.main(WordCount.scala:30) > > > I tried placing the getExecutionPlan in different places. But I get same > error. Is there any other way to get the execution plan in local mode? > > -- > Regards, > Madhukara Phatak > http://datamantra.io/ >