Thank you Xingbo, this will certainly help! On Wed, Jul 15, 2020 at 7:39 AM Xingbo Huang <hxbks...@gmail.com> wrote:
> Hi Manas, > > I have created a issue[1] to add related doc > > [1] https://issues.apache.org/jira/browse/FLINK-18598 > > Best, > Xingbo > > Manas Kale <manaskal...@gmail.com> 于2020年7月14日周二 下午4:15写道: > >> Thank you for the quick reply Xingbo! >> Is there some documented webpage example that I can refer to in the >> future for the latest pyFlink 1.11 API? I couldn't find anything related to >> awaiting asynchronous results. >> >> Thanks, >> Manas >> >> On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang <hxbks...@gmail.com> wrote: >> >>> Hi Manas, >>> >>> >>> I tested your code, but there are no errors. Because execute_sql is an >>> asynchronous method, you need to await through TableResult, you can try the >>> following code: >>> >>> >>> from pyflink.datastream import StreamExecutionEnvironment, >>> TimeCharacteristic >>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings >>> >>> >>> def test(): >>> exec_env = StreamExecutionEnvironment.get_execution_environment() >>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >>> t_env = StreamTableEnvironment.create(exec_env, >>> >>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build() >>> ) >>> >>> INPUT_TABLE = "test" >>> INPUT_TOPIC = "test" >>> LOCAL_KAFKA = "localhost:2181" >>> OUTPUT_TABLE = "test_output" >>> OUTPUT_TOPIC = "test_output" >>> ddl_source = f""" >>> CREATE TABLE {INPUT_TABLE} ( >>> `monitorId` VARCHAR, >>> `time_st` TIMESTAMP(3), >>> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND, >>> `data` DOUBLE >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = '{INPUT_TOPIC}', >>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', >>> 'properties.group.id' = 'myGroup', >>> 'format' = 'json' >>> ) >>> """ >>> >>> ddl_sink = f""" >>> CREATE TABLE {OUTPUT_TABLE} ( >>> `monitorId` VARCHAR, >>> `max` DOUBLE >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = '{OUTPUT_TOPIC}', >>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', >>> 'format' = 'json' >>> ) >>> """ >>> t_env.execute_sql(ddl_source) >>> t_env.execute_sql(ddl_sink) >>> >>> result = t_env.execute_sql(f""" >>> INSERT INTO {OUTPUT_TABLE} >>> SELECT monitorId, data >>> FROM {INPUT_TABLE} >>> """) >>> result.get_job_client().get_job_execution_result().result() >>> >>> >>> if __name__ == '__main__': >>> test() >>> >>> >>> Best, >>> Xingbo >>> >>> Manas Kale <manaskal...@gmail.com> 于2020年7月14日周二 下午3:31写道: >>> >>>> Hi, >>>> I am trying to get a simple streaming job running in pyFlink and >>>> understand the new 1.11 API. I just want to read from and write to kafka >>>> topics. >>>> Previously I was using t_env.execute("jobname"), >>>> register_table_source() and register_table_sink() but in 1.11 all 3 were >>>> deprecated and replaced by execute_sql() in the deprecation warning. >>>> >>>> exec_env = StreamExecutionEnvironment.get_execution_environment() >>>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >>>> t_env = StreamTableEnvironment.create(exec_env, >>>> >>>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build() >>>> ) >>>> >>>> ddl_source = f""" >>>> CREATE TABLE {INPUT_TABLE} ( >>>> `monitorId` VARCHAR, >>>> `time_st` TIMESTAMP(3), >>>> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND, >>>> `data` DOUBLE >>>> ) WITH ( >>>> 'connector' = 'kafka', >>>> 'topic' = '{INPUT_TOPIC}', >>>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', >>>> 'properties.group.id' = 'myGroup', >>>> 'format' = 'json' >>>> ) >>>> """ >>>> >>>> ddl_sink = f""" >>>> CREATE TABLE {OUTPUT_TABLE} ( >>>> `monitorId` VARCHAR, >>>> `max` DOUBLE >>>> ) WITH ( >>>> 'connector' = 'kafka', >>>> 'topic' = '{OUTPUT_TOPIC}', >>>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', >>>> 'format' = 'json' >>>> ) >>>> """ >>>> t_env.execute_sql(ddl_source) >>>> t_env.execute_sql(ddl_sink) >>>> >>>> t_env.execute_sql(f""" >>>> INSERT INTO {OUTPUT_TABLE} >>>> SELECT monitorId, data >>>> FROM {INPUT_TABLE} >>>> """) >>>> >>>> >>>> This gives me the error : >>>> : java.lang.IllegalStateException: No operators defined in streaming >>>> topology. Cannot generate StreamGraph. >>>> >>>> I am aware this is lazily evaluated, so is there some equivalent SQL >>>> statement for t_env.execute() that I should be calling? >>>> >>>> Thanks, >>>> Manas >>>> >>>