Thank you Xingbo, this will certainly help!

On Wed, Jul 15, 2020 at 7:39 AM Xingbo Huang <hxbks...@gmail.com> wrote:

> Hi Manas,
>
> I have created a issue[1] to add related doc
>
> [1] https://issues.apache.org/jira/browse/FLINK-18598
>
> Best,
> Xingbo
>
> Manas Kale <manaskal...@gmail.com> 于2020年7月14日周二 下午4:15写道:
>
>> Thank you for the quick reply Xingbo!
>>  Is there some documented webpage example that I can refer to in the
>> future for the latest pyFlink 1.11 API? I couldn't find anything related to
>> awaiting asynchronous results.
>>
>> Thanks,
>> Manas
>>
>> On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang <hxbks...@gmail.com> wrote:
>>
>>> Hi Manas,
>>>
>>>
>>> I tested your code, but there are no errors. Because execute_sql is an
>>> asynchronous method, you need to await through TableResult, you can try the
>>> following code:
>>>
>>>
>>> from pyflink.datastream import StreamExecutionEnvironment,
>>> TimeCharacteristic
>>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
>>>
>>>
>>> def test():
>>>     exec_env = StreamExecutionEnvironment.get_execution_environment()
>>>     exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>     t_env = StreamTableEnvironment.create(exec_env,
>>>
>>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>>                                           )
>>>
>>>     INPUT_TABLE = "test"
>>>     INPUT_TOPIC = "test"
>>>     LOCAL_KAFKA = "localhost:2181"
>>>     OUTPUT_TABLE = "test_output"
>>>     OUTPUT_TOPIC = "test_output"
>>>     ddl_source = f"""
>>>         CREATE TABLE {INPUT_TABLE} (
>>>             `monitorId` VARCHAR,
>>>             `time_st` TIMESTAMP(3),
>>>             WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>>>             `data` DOUBLE
>>>         ) WITH (
>>>             'connector' = 'kafka',
>>>             'topic' = '{INPUT_TOPIC}',
>>>             'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>>             'properties.group.id' = 'myGroup',
>>>             'format' = 'json'
>>>         )
>>>     """
>>>
>>>     ddl_sink = f"""
>>>         CREATE TABLE {OUTPUT_TABLE} (
>>>             `monitorId` VARCHAR,
>>>             `max` DOUBLE
>>>         ) WITH (
>>>             'connector' = 'kafka',
>>>             'topic' = '{OUTPUT_TOPIC}',
>>>             'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>>             'format' = 'json'
>>>         )
>>>     """
>>>     t_env.execute_sql(ddl_source)
>>>     t_env.execute_sql(ddl_sink)
>>>
>>>     result = t_env.execute_sql(f"""
>>>         INSERT INTO {OUTPUT_TABLE}
>>>         SELECT monitorId, data
>>>         FROM {INPUT_TABLE}
>>>     """)
>>>     result.get_job_client().get_job_execution_result().result()
>>>
>>>
>>> if __name__ == '__main__':
>>>     test()
>>>
>>>
>>> Best,
>>> Xingbo
>>>
>>> Manas Kale <manaskal...@gmail.com> 于2020年7月14日周二 下午3:31写道:
>>>
>>>> Hi,
>>>> I am trying to get a simple streaming job running in pyFlink and
>>>> understand the new 1.11 API. I just want to read from and write to kafka
>>>> topics.
>>>> Previously I was using t_env.execute("jobname"),
>>>> register_table_source() and register_table_sink() but in 1.11 all 3 were
>>>> deprecated and replaced by execute_sql() in the deprecation warning.
>>>>
>>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>> t_env = StreamTableEnvironment.create(exec_env,
>>>>                                       
>>>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>>>                                       )
>>>>
>>>> ddl_source = f"""
>>>>     CREATE TABLE {INPUT_TABLE} (
>>>>         `monitorId` VARCHAR,
>>>>         `time_st` TIMESTAMP(3),
>>>>         WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>>>>         `data` DOUBLE
>>>>     ) WITH (
>>>>         'connector' = 'kafka',
>>>>         'topic' = '{INPUT_TOPIC}',
>>>>         'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>>>         'properties.group.id' = 'myGroup',
>>>>         'format' = 'json'
>>>>     )
>>>> """
>>>>
>>>> ddl_sink = f"""
>>>>     CREATE TABLE {OUTPUT_TABLE} (
>>>>         `monitorId` VARCHAR,
>>>>         `max` DOUBLE
>>>>     ) WITH (
>>>>         'connector' = 'kafka',
>>>>         'topic' = '{OUTPUT_TOPIC}',
>>>>         'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>>>         'format' = 'json'
>>>>     )
>>>> """
>>>> t_env.execute_sql(ddl_source)
>>>> t_env.execute_sql(ddl_sink)
>>>>
>>>> t_env.execute_sql(f"""
>>>>     INSERT INTO {OUTPUT_TABLE}
>>>>     SELECT monitorId, data
>>>>     FROM {INPUT_TABLE}
>>>> """)
>>>>
>>>>
>>>> This gives me the error :
>>>> : java.lang.IllegalStateException: No operators defined in streaming
>>>> topology. Cannot generate StreamGraph.
>>>>
>>>> I am aware this is lazily evaluated, so is there some equivalent SQL
>>>> statement for t_env.execute() that I should be calling?
>>>>
>>>> Thanks,
>>>> Manas
>>>>
>>>

Reply via email to