Hi Xingbo and Till,
Thank you for your help!

On Wed, Sep 2, 2020 at 9:38 PM Xingbo Huang <hxbks...@gmail.com> wrote:

> Hi Manas,
>
> As Till said, you need to check whether the execution environment used is
> LocalStreamEnvironment. You need to get the class object corresponding to
> the corresponding java object through py4j. You can take a look at the
> example I wrote below, I hope it will help you
>
> ```
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.java_gateway import get_gateway
> from py4j.java_gateway import get_java_class
>
>
> def test():
>     env = StreamExecutionEnvironment.get_execution_environment()
>     table_env = StreamTableEnvironment.create(
>         env, environment_settings=EnvironmentSettings.new_instance()
>         .in_streaming_mode().use_blink_planner().build())
>     gateway = get_gateway()
>
>     # get the execution environment class
>     env_class = table_env._j_tenv.getPlanner().getExecEnv().getClass()
>
>     # get the LocalStreamEnvironment class
>     local_stream_environment_class = get_java_class(
>
> gateway.jvm.org.apache.flink.streaming.api.environment.LocalStreamEnvironment)
>     print(env_class == local_stream_environment_class)
>
>
> if __name__ == '__main__':
>     test()
>
> ```
>
>
> Best,
> Xingbo
>
> Till Rohrmann <trohrm...@apache.org> 于2020年9月2日周三 下午5:03写道:
>
>> Hi Manas,
>>
>> I am not entirely sure but you might try to check whether
>> env._j_stream_execution_environment is an instance of
>> gateway.jvm.org.apache.flink.streaming.api.environment.LocalStreamEnvironment
>> via Python's isinstance function.
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 2, 2020 at 5:46 AM Manas Kale <manaskal...@gmail.com> wrote:
>>
>>> Hi Xingbo,
>>> Thank you for clarifying that. I am indeed maintaining a different
>>> version of the code by commenting those lines, but I was just wondering if
>>> it was possible to detect the environment programmatically.
>>>
>>> Regards,
>>> Manas
>>>
>>> On Wed, Sep 2, 2020 at 7:32 AM Xingbo Huang <hxbks...@gmail.com> wrote:
>>>
>>>> Hi Manas,
>>>>
>>>> When running locally, you need
>>>> `ten_sec_summaries.get_job_client().get_job_execution_result().result()` to
>>>> wait job finished. However, when you submit to the cluster, you need to
>>>> delete this code. In my opinion, the current feasible solution is that you
>>>> prepare two sets of codes for this, although this is annoying. After all,
>>>> running jobs locally is usually for testing, so it should be acceptable to
>>>> prepare different codes.
>>>> In the long run, it should be the flink framework that makes different
>>>> behaviors according to different environments  so that users don’t need to
>>>> prepare different codes.
>>>>
>>>> Best,
>>>> Xingbo
>>>>
>>>> Manas Kale <manaskal...@gmail.com> 于2020年9月1日周二 下午3:00写道:
>>>>
>>>>> Hi,
>>>>> I am trying to submit a pyFlink job in detached mode using the command:
>>>>>
>>>>> ../../flink-1.11.0/bin/flink run -d -py basic_streaming_job.py -j
>>>>> flink-sql-connector-kafka_2.11-1.11.0.jar
>>>>>
>>>>> The jobs are submitted successfully but the command does not return. I
>>>>> realized that was because I had the following line in
>>>>> basic_streaming_job.py:
>>>>>
>>>>> ten_sec_summaries.get_job_client().get_job_execution_result().result()
>>>>>
>>>>> This statement is useful when testing this locally within a
>>>>> minicluster (using python basic_streaming_job.py) but not needed when the
>>>>> job is submitted to a cluster.
>>>>>
>>>>> So I would like to programmatically detect if the
>>>>> StreamExecutionEnvironment is a localStreamEnvironment and execute
>>>>> the above snippet accordingly. How do I do this?
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Manas
>>>>>
>>>>

Reply via email to