Hi Manas, I have created a issue[1] to add related doc
[1] https://issues.apache.org/jira/browse/FLINK-18598 Best, Xingbo Manas Kale <manaskal...@gmail.com> 于2020年7月14日周二 下午4:15写道: > Thank you for the quick reply Xingbo! > Is there some documented webpage example that I can refer to in the > future for the latest pyFlink 1.11 API? I couldn't find anything related to > awaiting asynchronous results. > > Thanks, > Manas > > On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang <hxbks...@gmail.com> wrote: > >> Hi Manas, >> >> >> I tested your code, but there are no errors. Because execute_sql is an >> asynchronous method, you need to await through TableResult, you can try the >> following code: >> >> >> from pyflink.datastream import StreamExecutionEnvironment, >> TimeCharacteristic >> from pyflink.table import StreamTableEnvironment, EnvironmentSettings >> >> >> def test(): >> exec_env = StreamExecutionEnvironment.get_execution_environment() >> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >> t_env = StreamTableEnvironment.create(exec_env, >> >> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build() >> ) >> >> INPUT_TABLE = "test" >> INPUT_TOPIC = "test" >> LOCAL_KAFKA = "localhost:2181" >> OUTPUT_TABLE = "test_output" >> OUTPUT_TOPIC = "test_output" >> ddl_source = f""" >> CREATE TABLE {INPUT_TABLE} ( >> `monitorId` VARCHAR, >> `time_st` TIMESTAMP(3), >> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND, >> `data` DOUBLE >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = '{INPUT_TOPIC}', >> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', >> 'properties.group.id' = 'myGroup', >> 'format' = 'json' >> ) >> """ >> >> ddl_sink = f""" >> CREATE TABLE {OUTPUT_TABLE} ( >> `monitorId` VARCHAR, >> `max` DOUBLE >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = '{OUTPUT_TOPIC}', >> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', >> 'format' = 'json' >> ) >> """ >> t_env.execute_sql(ddl_source) >> t_env.execute_sql(ddl_sink) >> >> result = t_env.execute_sql(f""" >> INSERT INTO {OUTPUT_TABLE} >> SELECT monitorId, data >> FROM {INPUT_TABLE} >> """) >> result.get_job_client().get_job_execution_result().result() >> >> >> if __name__ == '__main__': >> test() >> >> >> Best, >> Xingbo >> >> Manas Kale <manaskal...@gmail.com> 于2020年7月14日周二 下午3:31写道: >> >>> Hi, >>> I am trying to get a simple streaming job running in pyFlink and >>> understand the new 1.11 API. I just want to read from and write to kafka >>> topics. >>> Previously I was using t_env.execute("jobname"), register_table_source() >>> and register_table_sink() but in 1.11 all 3 were deprecated and replaced by >>> execute_sql() in the deprecation warning. >>> >>> exec_env = StreamExecutionEnvironment.get_execution_environment() >>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >>> t_env = StreamTableEnvironment.create(exec_env, >>> >>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build() >>> ) >>> >>> ddl_source = f""" >>> CREATE TABLE {INPUT_TABLE} ( >>> `monitorId` VARCHAR, >>> `time_st` TIMESTAMP(3), >>> WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND, >>> `data` DOUBLE >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = '{INPUT_TOPIC}', >>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', >>> 'properties.group.id' = 'myGroup', >>> 'format' = 'json' >>> ) >>> """ >>> >>> ddl_sink = f""" >>> CREATE TABLE {OUTPUT_TABLE} ( >>> `monitorId` VARCHAR, >>> `max` DOUBLE >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = '{OUTPUT_TOPIC}', >>> 'properties.bootstrap.servers' = '{LOCAL_KAFKA}', >>> 'format' = 'json' >>> ) >>> """ >>> t_env.execute_sql(ddl_source) >>> t_env.execute_sql(ddl_sink) >>> >>> t_env.execute_sql(f""" >>> INSERT INTO {OUTPUT_TABLE} >>> SELECT monitorId, data >>> FROM {INPUT_TABLE} >>> """) >>> >>> >>> This gives me the error : >>> : java.lang.IllegalStateException: No operators defined in streaming >>> topology. Cannot generate StreamGraph. >>> >>> I am aware this is lazily evaluated, so is there some equivalent SQL >>> statement for t_env.execute() that I should be calling? >>> >>> Thanks, >>> Manas >>> >>