Hi Manas,

I have created a issue[1] to add related doc

[1] https://issues.apache.org/jira/browse/FLINK-18598

Best,
Xingbo

Manas Kale <manaskal...@gmail.com> 于2020年7月14日周二 下午4:15写道:

> Thank you for the quick reply Xingbo!
>  Is there some documented webpage example that I can refer to in the
> future for the latest pyFlink 1.11 API? I couldn't find anything related to
> awaiting asynchronous results.
>
> Thanks,
> Manas
>
> On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang <hxbks...@gmail.com> wrote:
>
>> Hi Manas,
>>
>>
>> I tested your code, but there are no errors. Because execute_sql is an
>> asynchronous method, you need to await through TableResult, you can try the
>> following code:
>>
>>
>> from pyflink.datastream import StreamExecutionEnvironment,
>> TimeCharacteristic
>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
>>
>>
>> def test():
>>     exec_env = StreamExecutionEnvironment.get_execution_environment()
>>     exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>     t_env = StreamTableEnvironment.create(exec_env,
>>
>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>                                           )
>>
>>     INPUT_TABLE = "test"
>>     INPUT_TOPIC = "test"
>>     LOCAL_KAFKA = "localhost:2181"
>>     OUTPUT_TABLE = "test_output"
>>     OUTPUT_TOPIC = "test_output"
>>     ddl_source = f"""
>>         CREATE TABLE {INPUT_TABLE} (
>>             `monitorId` VARCHAR,
>>             `time_st` TIMESTAMP(3),
>>             WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>>             `data` DOUBLE
>>         ) WITH (
>>             'connector' = 'kafka',
>>             'topic' = '{INPUT_TOPIC}',
>>             'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>             'properties.group.id' = 'myGroup',
>>             'format' = 'json'
>>         )
>>     """
>>
>>     ddl_sink = f"""
>>         CREATE TABLE {OUTPUT_TABLE} (
>>             `monitorId` VARCHAR,
>>             `max` DOUBLE
>>         ) WITH (
>>             'connector' = 'kafka',
>>             'topic' = '{OUTPUT_TOPIC}',
>>             'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>             'format' = 'json'
>>         )
>>     """
>>     t_env.execute_sql(ddl_source)
>>     t_env.execute_sql(ddl_sink)
>>
>>     result = t_env.execute_sql(f"""
>>         INSERT INTO {OUTPUT_TABLE}
>>         SELECT monitorId, data
>>         FROM {INPUT_TABLE}
>>     """)
>>     result.get_job_client().get_job_execution_result().result()
>>
>>
>> if __name__ == '__main__':
>>     test()
>>
>>
>> Best,
>> Xingbo
>>
>> Manas Kale <manaskal...@gmail.com> 于2020年7月14日周二 下午3:31写道:
>>
>>> Hi,
>>> I am trying to get a simple streaming job running in pyFlink and
>>> understand the new 1.11 API. I just want to read from and write to kafka
>>> topics.
>>> Previously I was using t_env.execute("jobname"), register_table_source()
>>> and register_table_sink() but in 1.11 all 3 were deprecated and replaced by
>>> execute_sql() in the deprecation warning.
>>>
>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>> t_env = StreamTableEnvironment.create(exec_env,
>>>                                       
>>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>>                                       )
>>>
>>> ddl_source = f"""
>>>     CREATE TABLE {INPUT_TABLE} (
>>>         `monitorId` VARCHAR,
>>>         `time_st` TIMESTAMP(3),
>>>         WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>>>         `data` DOUBLE
>>>     ) WITH (
>>>         'connector' = 'kafka',
>>>         'topic' = '{INPUT_TOPIC}',
>>>         'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>>         'properties.group.id' = 'myGroup',
>>>         'format' = 'json'
>>>     )
>>> """
>>>
>>> ddl_sink = f"""
>>>     CREATE TABLE {OUTPUT_TABLE} (
>>>         `monitorId` VARCHAR,
>>>         `max` DOUBLE
>>>     ) WITH (
>>>         'connector' = 'kafka',
>>>         'topic' = '{OUTPUT_TOPIC}',
>>>         'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>>         'format' = 'json'
>>>     )
>>> """
>>> t_env.execute_sql(ddl_source)
>>> t_env.execute_sql(ddl_sink)
>>>
>>> t_env.execute_sql(f"""
>>>     INSERT INTO {OUTPUT_TABLE}
>>>     SELECT monitorId, data
>>>     FROM {INPUT_TABLE}
>>> """)
>>>
>>>
>>> This gives me the error :
>>> : java.lang.IllegalStateException: No operators defined in streaming
>>> topology. Cannot generate StreamGraph.
>>>
>>> I am aware this is lazily evaluated, so is there some equivalent SQL
>>> statement for t_env.execute() that I should be calling?
>>>
>>> Thanks,
>>> Manas
>>>
>>

Reply via email to