Thank you for the quick reply Xingbo! Is there some documented webpage example that I can refer to in the future for the latest pyFlink 1.11 API? I couldn't find anything related to awaiting asynchronous results.
Thanks, Manas On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang <hxbks...@gmail.com> wrote: > Hi Manas, > > > I tested your code, but there are no errors. Because execute_sql is an > asynchronous method, you need to await through TableResult, you can try the > following code: > > > from pyflink.datastream import StreamExecutionEnvironment, > TimeCharacteristic > from pyflink.table import StreamTableEnvironment, EnvironmentSettings > > > def test(): > exec_env = StreamExecutionEnvironment.get_execution_environment() > exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > t_env = StreamTableEnvironment.create(exec_env, > > environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build() > ) > > INPUT_TABLE = "test" > INPUT_TOPIC = "test" > LOCAL_KAFKA = "localhost:2181" > OUTPUT_TABLE = "test_output" > OUTPUT_TOPIC = "test_output" > ddl_source = f""" > CREATE TABLE {INPUT_TABLE} ( > `monitorId` VARCHAR, > `time_st` TIMESTAMP(3), > WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND, > `data` DOUBLE > ) WITH ( > 'connector' = 'kafka', > 'topic' = '{INPUT_TOPIC}', > 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', > 'properties.group.id' = 'myGroup', > 'format' = 'json' > ) > """ > > ddl_sink = f""" > CREATE TABLE {OUTPUT_TABLE} ( > `monitorId` VARCHAR, > `max` DOUBLE > ) WITH ( > 'connector' = 'kafka', > 'topic' = '{OUTPUT_TOPIC}', > 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', > 'format' = 'json' > ) > """ > t_env.execute_sql(ddl_source) > t_env.execute_sql(ddl_sink) > > result = t_env.execute_sql(f""" > INSERT INTO {OUTPUT_TABLE} > SELECT monitorId, data > FROM {INPUT_TABLE} > """) > result.get_job_client().get_job_execution_result().result() > > > if __name__ == '__main__': > test() > > > Best, > Xingbo > > Manas Kale <manaskal...@gmail.com> 于2020年7月14日周二 下午3:31写道: > >> Hi, >> I am trying to get a simple streaming job running in pyFlink and >> understand the new 1.11 API. I just want to read from and write to kafka >> topics. >> Previously I was using t_env.execute("jobname"), register_table_source() >> and register_table_sink() but in 1.11 all 3 were deprecated and replaced by >> execute_sql() in the deprecation warning. >> >> exec_env = StreamExecutionEnvironment.get_execution_environment() >> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >> t_env = StreamTableEnvironment.create(exec_env, >> >> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build() >> ) >> >> ddl_source = f""" >> CREATE TABLE {INPUT_TABLE} ( >> `monitorId` VARCHAR, >> `time_st` TIMESTAMP(3), >> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND, >> `data` DOUBLE >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = '{INPUT_TOPIC}', >> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', >> 'properties.group.id' = 'myGroup', >> 'format' = 'json' >> ) >> """ >> >> ddl_sink = f""" >> CREATE TABLE {OUTPUT_TABLE} ( >> `monitorId` VARCHAR, >> `max` DOUBLE >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = '{OUTPUT_TOPIC}', >> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', >> 'format' = 'json' >> ) >> """ >> t_env.execute_sql(ddl_source) >> t_env.execute_sql(ddl_sink) >> >> t_env.execute_sql(f""" >> INSERT INTO {OUTPUT_TABLE} >> SELECT monitorId, data >> FROM {INPUT_TABLE} >> """) >> >> >> This gives me the error : >> : java.lang.IllegalStateException: No operators defined in streaming >> topology. Cannot generate StreamGraph. >> >> I am aware this is lazily evaluated, so is there some equivalent SQL >> statement for t_env.execute() that I should be calling? >> >> Thanks, >> Manas >> >