I agree with your take regarding superficial stream environment distinction and the difficulties that introduces for users.
To fix the immediate issue in Beam, it was necessary to duplicate RemoteStreamEnvironment.executeRemotely https://github.com/apache/beam/pull/7169/files#diff-6acb0479d563cfc121ac04e789f4bc6dR294 To address this in Flink, it would make sense to turn that piece of code into a utility method that can be used directly and from RemoteStreamEnvironment.execute for compatibility. If there isn't any other feedback then I would create a JIRA and work on this. Thanks, Thomas On Thu, Nov 29, 2018 at 10:00 AM Chesnay Schepler <ches...@apache.org> wrote: > I'm only voicing my opinion here; these do not reflect in any way > long-term directions. > > I wouldn't remove the execute() method; it's too important for a > convenient execution of jobs via the CLI/WebUI. > > But I would like to get rid of this distinction of environments as their > existence implies that there is not one way to /write/ a Flink job as > they can differ in the environment they use, which affects whether you > can even run them via the CLI. > > Have you ever tried setting up a jar that you can both run via the CLI > on the associated cluster, but also in the IDE on said cluster? You > would need 2 entry points, which create the environment and pass this > environment to your job-defining method. This is quite different from a > normal job, where the environment is created right before you define > your job. > Yet at the same time you can write a job that simply uses a > StreamExecutionEnvironment, that can run locally in the IDE, or can be > submitted to the CLI. > This just seems highly inconsistent to me. > > There are some ways how one could deal with this; for example in our > tests we basically inject a job executor into the default > StreamExecutionEnvironment. One could also resort to a approach that > uses system properties to determine how they should be executed. > However, I haven't thought about these thoroughly. > > Regardless if you were to use the RestClusterClient explicitly, > then yes, currently you would access rather obscure semi-internal code. > (that is very much subject to change) > > But encapsulating this into a Execution.executeRemotely(env, host, port, > savepointRestoreSettings) method (as a replacement for execute()) would > be feasible imo. > > On 29.11.2018 17:00, Thomas Weise wrote: > > Thanks for taking a look. > > > > Are you saying that the longer term direction is to get rid of the > > execute method from StreamExecutionEnvironment and instead construct > > the cluster client outside? > > > > That would currently expose even more internals to the user. > > Considering the current implementation in RemoteStreamEnvironment: > > > > @Override > > public JobExecutionResult execute(String jobName) throws > > ProgramInvocationException { > > StreamGraph streamGraph = getStreamGraph(); > > streamGraph.setJobName(jobName); > > transformations.clear(); > > return executeRemotely(streamGraph, jarFiles); > > } > > > > We would use > > env.getStreamGraph().getJobGraph().setSavepointRestoreSettings(..) in > > the Beam code and then use the cluster client directly. > > > > If we wanted to keep this hidden from users, we could add > > setSavePointRestoreSettings to RemoteStreamEnvironment and > > LocalStreamEnvironment and deal with it internally. > > > > Alternatively, the remote environment could serve just as cluster > > client factory. > > > > WDYT? > > > > > > On Thu, Nov 29, 2018 at 2:35 AM Chesnay Schepler <ches...@apache.org > > <mailto:ches...@apache.org>> wrote: > > > > I'm not aware of any plans to expose this in the > > StreamExecutionEnvironment. > > > > The issue would be that we would start mixing submission details > > with the job definition, which results in redundancy and weird > > semantics, e.g., which savepoint configuration takes priority if > > both a job and CLI job submission specify it? > > > > Looking at the Beam JIRA it would be sufficient to have this in > > the RemoteStreamEnvironment (which would be /less /problematic > > since the issue above is baked into this class anyway), however I > > would recommend migrating to a ClusterClient for these use-cases. > > > > On 29.11.2018 08:18, Thomas Weise wrote: > >> Hi, > >> > >> Currently it is not possible to submit a job with savepoint restore > option > >> through the execution environment. I found that while attempting to > add the > >> support to the Flink runner in Beam ( > >> https://issues.apache.org/jira/browse/BEAM-5396) > >> > >> I also foundhttps://issues.apache.org/jira/browse/FLINK-9644 - > but is > >> there a plan to support restore from savepoint > >> through StreamExecutionEnvironment in general? > >> > >> Thanks, > >> Thomas > >> > > > >