Hi Xingbo and Till, Thank you for your help! On Wed, Sep 2, 2020 at 9:38 PM Xingbo Huang <hxbks...@gmail.com> wrote:
> Hi Manas, > > As Till said, you need to check whether the execution environment used is > LocalStreamEnvironment. You need to get the class object corresponding to > the corresponding java object through py4j. You can take a look at the > example I wrote below, I hope it will help you > > ``` > from pyflink.table import EnvironmentSettings, StreamTableEnvironment > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.java_gateway import get_gateway > from py4j.java_gateway import get_java_class > > > def test(): > env = StreamExecutionEnvironment.get_execution_environment() > table_env = StreamTableEnvironment.create( > env, environment_settings=EnvironmentSettings.new_instance() > .in_streaming_mode().use_blink_planner().build()) > gateway = get_gateway() > > # get the execution environment class > env_class = table_env._j_tenv.getPlanner().getExecEnv().getClass() > > # get the LocalStreamEnvironment class > local_stream_environment_class = get_java_class( > > gateway.jvm.org.apache.flink.streaming.api.environment.LocalStreamEnvironment) > print(env_class == local_stream_environment_class) > > > if __name__ == '__main__': > test() > > ``` > > > Best, > Xingbo > > Till Rohrmann <trohrm...@apache.org> 于2020年9月2日周三 下午5:03写道: > >> Hi Manas, >> >> I am not entirely sure but you might try to check whether >> env._j_stream_execution_environment is an instance of >> gateway.jvm.org.apache.flink.streaming.api.environment.LocalStreamEnvironment >> via Python's isinstance function. >> >> Cheers, >> Till >> >> On Wed, Sep 2, 2020 at 5:46 AM Manas Kale <manaskal...@gmail.com> wrote: >> >>> Hi Xingbo, >>> Thank you for clarifying that. I am indeed maintaining a different >>> version of the code by commenting those lines, but I was just wondering if >>> it was possible to detect the environment programmatically. >>> >>> Regards, >>> Manas >>> >>> On Wed, Sep 2, 2020 at 7:32 AM Xingbo Huang <hxbks...@gmail.com> wrote: >>> >>>> Hi Manas, >>>> >>>> When running locally, you need >>>> `ten_sec_summaries.get_job_client().get_job_execution_result().result()` to >>>> wait job finished. However, when you submit to the cluster, you need to >>>> delete this code. In my opinion, the current feasible solution is that you >>>> prepare two sets of codes for this, although this is annoying. After all, >>>> running jobs locally is usually for testing, so it should be acceptable to >>>> prepare different codes. >>>> In the long run, it should be the flink framework that makes different >>>> behaviors according to different environments so that users don’t need to >>>> prepare different codes. >>>> >>>> Best, >>>> Xingbo >>>> >>>> Manas Kale <manaskal...@gmail.com> 于2020年9月1日周二 下午3:00写道: >>>> >>>>> Hi, >>>>> I am trying to submit a pyFlink job in detached mode using the command: >>>>> >>>>> ../../flink-1.11.0/bin/flink run -d -py basic_streaming_job.py -j >>>>> flink-sql-connector-kafka_2.11-1.11.0.jar >>>>> >>>>> The jobs are submitted successfully but the command does not return. I >>>>> realized that was because I had the following line in >>>>> basic_streaming_job.py: >>>>> >>>>> ten_sec_summaries.get_job_client().get_job_execution_result().result() >>>>> >>>>> This statement is useful when testing this locally within a >>>>> minicluster (using python basic_streaming_job.py) but not needed when the >>>>> job is submitted to a cluster. >>>>> >>>>> So I would like to programmatically detect if the >>>>> StreamExecutionEnvironment is a localStreamEnvironment and execute >>>>> the above snippet accordingly. How do I do this? >>>>> >>>>> >>>>> Thanks, >>>>> Manas >>>>> >>>>