Thanks Timo,

So no need to use execute() method in Flink SQL If I do all the thins from 
source to sink in SQL.

Best Regards,
Lu

> 2020年8月13日 下午3:41,Timo Walther <twal...@apache.org> 写道:
> 
> Hi Lu,
> 
> `env.execute("table api");` is not necessary after FLIP-84 [1]. Every method 
> that has `execute` in its name will immediately execute a job. Therefore your 
> `env.execute` has an empty pipeline.
> 
> Regards,
> Timo
> 
> [1] https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> 
> On 13.08.20 09:34, Lu Weizheng wrote:
>> Hi,
>> I am using Flink 1.11 SQL using java. All my operations are in SQL. I create 
>> source tables and insert result into sink tables. No other Java operators. I 
>> execute it in Intellij. I can get the final result in the sink tables. 
>> However I get the following error. I am not sure it is a bug or there is 
>> something wrong in my code? Acutally it does not affect the computation.
>> /Exception in thread "main" java.lang.IllegalStateException: No operators 
>> defined in streaming topology. Cannot execute./
>> /at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1872)/
>> /at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)/
>> /at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)/
>> /at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)/
>> /at com.flink.tutorials.java.projects.iot.IoTSQLDemo.main()/
>> Here's my code:
>>         EnvironmentSettings fsSettings =    
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>         StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
>> fsSettings);
>>         // create source and sink tables...
>>         tEnv.executeSql("INSERT INTO sensor_1min_avg " +
>>                 "SELECT " +
>>                 "  room, " +
>>                 "  AVG(temp) AS avg_temp," +
>>                 "  TUMBLE_END(ts, INTERVAL '1' MINUTE) AS end_ts " +
>>                 "FROM sensor " +
>>                 "GROUP BY room, TUMBLE(ts, INTERVAL '1' MINUTE)");
>>         env.execute("table api");
> 

Reply via email to