Thanks Timo, So no need to use execute() method in Flink SQL If I do all the thins from source to sink in SQL.
Best Regards, Lu > 2020年8月13日 下午3:41,Timo Walther <twal...@apache.org> 写道: > > Hi Lu, > > `env.execute("table api");` is not necessary after FLIP-84 [1]. Every method > that has `execute` in its name will immediately execute a job. Therefore your > `env.execute` has an empty pipeline. > > Regards, > Timo > > [1] https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878 > > On 13.08.20 09:34, Lu Weizheng wrote: >> Hi, >> I am using Flink 1.11 SQL using java. All my operations are in SQL. I create >> source tables and insert result into sink tables. No other Java operators. I >> execute it in Intellij. I can get the final result in the sink tables. >> However I get the following error. I am not sure it is a bug or there is >> something wrong in my code? Acutally it does not affect the computation. >> /Exception in thread "main" java.lang.IllegalStateException: No operators >> defined in streaming topology. Cannot execute./ >> /at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)/ >> /at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)/ >> /at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)/ >> /at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)/ >> /at com.flink.tutorials.java.projects.iot.IoTSQLDemo.main()/ >> Here's my code: >> EnvironmentSettings fsSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, >> fsSettings); >> // create source and sink tables... >> tEnv.executeSql("INSERT INTO sensor_1min_avg " + >> "SELECT " + >> " room, " + >> " AVG(temp) AS avg_temp," + >> " TUMBLE_END(ts, INTERVAL '1' MINUTE) AS end_ts " + >> "FROM sensor " + >> "GROUP BY room, TUMBLE(ts, INTERVAL '1' MINUTE)"); >> env.execute("table api"); >