Thank you for the quick reply Xingbo!
 Is there some documented webpage example that I can refer to in the future
for the latest pyFlink 1.11 API? I couldn't find anything related to
awaiting asynchronous results.

Thanks,
Manas

On Tue, Jul 14, 2020 at 1:29 PM Xingbo Huang <hxbks...@gmail.com> wrote:

> Hi Manas,
>
>
> I tested your code, but there are no errors. Because execute_sql is an
> asynchronous method, you need to await through TableResult, you can try the
> following code:
>
>
> from pyflink.datastream import StreamExecutionEnvironment,
> TimeCharacteristic
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
>
>
> def test():
>     exec_env = StreamExecutionEnvironment.get_execution_environment()
>     exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>     t_env = StreamTableEnvironment.create(exec_env,
>
> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>                                           )
>
>     INPUT_TABLE = "test"
>     INPUT_TOPIC = "test"
>     LOCAL_KAFKA = "localhost:2181"
>     OUTPUT_TABLE = "test_output"
>     OUTPUT_TOPIC = "test_output"
>     ddl_source = f"""
>         CREATE TABLE {INPUT_TABLE} (
>             `monitorId` VARCHAR,
>             `time_st` TIMESTAMP(3),
>             WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>             `data` DOUBLE
>         ) WITH (
>             'connector' = 'kafka',
>             'topic' = '{INPUT_TOPIC}',
>             'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>             'properties.group.id' = 'myGroup',
>             'format' = 'json'
>         )
>     """
>
>     ddl_sink = f"""
>         CREATE TABLE {OUTPUT_TABLE} (
>             `monitorId` VARCHAR,
>             `max` DOUBLE
>         ) WITH (
>             'connector' = 'kafka',
>             'topic' = '{OUTPUT_TOPIC}',
>             'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>             'format' = 'json'
>         )
>     """
>     t_env.execute_sql(ddl_source)
>     t_env.execute_sql(ddl_sink)
>
>     result = t_env.execute_sql(f"""
>         INSERT INTO {OUTPUT_TABLE}
>         SELECT monitorId, data
>         FROM {INPUT_TABLE}
>     """)
>     result.get_job_client().get_job_execution_result().result()
>
>
> if __name__ == '__main__':
>     test()
>
>
> Best,
> Xingbo
>
> Manas Kale <manaskal...@gmail.com> 于2020年7月14日周二 下午3:31写道:
>
>> Hi,
>> I am trying to get a simple streaming job running in pyFlink and
>> understand the new 1.11 API. I just want to read from and write to kafka
>> topics.
>> Previously I was using t_env.execute("jobname"), register_table_source()
>> and register_table_sink() but in 1.11 all 3 were deprecated and replaced by
>> execute_sql() in the deprecation warning.
>>
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>> t_env = StreamTableEnvironment.create(exec_env,
>>                                       
>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>                                       )
>>
>> ddl_source = f"""
>>     CREATE TABLE {INPUT_TABLE} (
>>         `monitorId` VARCHAR,
>>         `time_st` TIMESTAMP(3),
>>         WATERMARK FOR time_st AS time_st - INTERVAL '2' SECOND,
>>         `data` DOUBLE
>>     ) WITH (
>>         'connector' = 'kafka',
>>         'topic' = '{INPUT_TOPIC}',
>>         'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>         'properties.group.id' = 'myGroup',
>>         'format' = 'json'
>>     )
>> """
>>
>> ddl_sink = f"""
>>     CREATE TABLE {OUTPUT_TABLE} (
>>         `monitorId` VARCHAR,
>>         `max` DOUBLE
>>     ) WITH (
>>         'connector' = 'kafka',
>>         'topic' = '{OUTPUT_TOPIC}',
>>         'properties.bootstrap.servers' = '{LOCAL_KAFKA}',
>>         'format' = 'json'
>>     )
>> """
>> t_env.execute_sql(ddl_source)
>> t_env.execute_sql(ddl_sink)
>>
>> t_env.execute_sql(f"""
>>     INSERT INTO {OUTPUT_TABLE}
>>     SELECT monitorId, data
>>     FROM {INPUT_TABLE}
>> """)
>>
>>
>> This gives me the error :
>> : java.lang.IllegalStateException: No operators defined in streaming
>> topology. Cannot generate StreamGraph.
>>
>> I am aware this is lazily evaluated, so is there some equivalent SQL
>> statement for t_env.execute() that I should be calling?
>>
>> Thanks,
>> Manas
>>
>

Reply via email to