Hi Manas, As Till said, you need to check whether the execution environment used is LocalStreamEnvironment. You need to get the class object corresponding to the corresponding java object through py4j. You can take a look at the example I wrote below, I hope it will help you
``` from pyflink.table import EnvironmentSettings, StreamTableEnvironment from pyflink.datastream import StreamExecutionEnvironment from pyflink.java_gateway import get_gateway from py4j.java_gateway import get_java_class def test(): env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create( env, environment_settings=EnvironmentSettings.new_instance() .in_streaming_mode().use_blink_planner().build()) gateway = get_gateway() # get the execution environment class env_class = table_env._j_tenv.getPlanner().getExecEnv().getClass() # get the LocalStreamEnvironment class local_stream_environment_class = get_java_class( gateway.jvm.org.apache.flink.streaming.api.environment.LocalStreamEnvironment) print(env_class == local_stream_environment_class) if __name__ == '__main__': test() ``` Best, Xingbo Till Rohrmann <trohrm...@apache.org> 于2020年9月2日周三 下午5:03写道: > Hi Manas, > > I am not entirely sure but you might try to check whether > env._j_stream_execution_environment is an instance of > gateway.jvm.org.apache.flink.streaming.api.environment.LocalStreamEnvironment > via Python's isinstance function. > > Cheers, > Till > > On Wed, Sep 2, 2020 at 5:46 AM Manas Kale <manaskal...@gmail.com> wrote: > >> Hi Xingbo, >> Thank you for clarifying that. I am indeed maintaining a different >> version of the code by commenting those lines, but I was just wondering if >> it was possible to detect the environment programmatically. >> >> Regards, >> Manas >> >> On Wed, Sep 2, 2020 at 7:32 AM Xingbo Huang <hxbks...@gmail.com> wrote: >> >>> Hi Manas, >>> >>> When running locally, you need >>> `ten_sec_summaries.get_job_client().get_job_execution_result().result()` to >>> wait job finished. However, when you submit to the cluster, you need to >>> delete this code. In my opinion, the current feasible solution is that you >>> prepare two sets of codes for this, although this is annoying. After all, >>> running jobs locally is usually for testing, so it should be acceptable to >>> prepare different codes. >>> In the long run, it should be the flink framework that makes different >>> behaviors according to different environments so that users don’t need to >>> prepare different codes. >>> >>> Best, >>> Xingbo >>> >>> Manas Kale <manaskal...@gmail.com> 于2020年9月1日周二 下午3:00写道: >>> >>>> Hi, >>>> I am trying to submit a pyFlink job in detached mode using the command: >>>> >>>> ../../flink-1.11.0/bin/flink run -d -py basic_streaming_job.py -j >>>> flink-sql-connector-kafka_2.11-1.11.0.jar >>>> >>>> The jobs are submitted successfully but the command does not return. I >>>> realized that was because I had the following line in >>>> basic_streaming_job.py: >>>> >>>> ten_sec_summaries.get_job_client().get_job_execution_result().result() >>>> >>>> This statement is useful when testing this locally within a minicluster >>>> (using python basic_streaming_job.py) but not needed when the job is >>>> submitted to a cluster. >>>> >>>> So I would like to programmatically detect if the >>>> StreamExecutionEnvironment is a localStreamEnvironment and execute the >>>> above snippet accordingly. How do I do this? >>>> >>>> >>>> Thanks, >>>> Manas >>>> >>>