Great to see this discussion seeded! The problems you face with the
Zeppelin integration are also affecting other downstream projects, like
Beam.

We just enabled the savepoint restore option in RemoteStreamEnvironment [1]
and that was more difficult than it should be. The main issue is that
environment and cluster client aren't decoupled. Ideally it should be
possible to just get the matching cluster client from the environment and
then control the job through it (environment as factory for cluster
client). But note that the environment classes are part of the public API,
and it is not straightforward to make larger changes without breaking
backward compatibility.

ClusterClient currently exposes internal classes like JobGraph and
StreamGraph. But it should be possible to wrap this with a new public API
that brings the required job control capabilities for downstream projects.
Perhaps it is helpful to look at some of the interfaces in Beam while
thinking about this: [2] for the portable job API and [3] for the old
asynchronous job control from the Beam Java SDK.

The backward compatibility discussion [4] is also relevant here. A new API
should shield downstream projects from internals and allow them to
interoperate with multiple future Flink versions in the same release line
without forced upgrades.

Thanks,
Thomas

[1] https://github.com/apache/flink/pull/7249
[2]
https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto
[3]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
[4]
https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E


On Thu, Dec 20, 2018 at 6:15 PM Jeff Zhang <zjf...@gmail.com> wrote:

> >>> I'm not so sure whether the user should be able to define where the job
> runs (in your example Yarn). This is actually independent of the job
> development and is something which is decided at deployment time.
>
> User don't need to specify execution mode programmatically. They can also
> pass the execution mode from the arguments in flink run command. e.g.
>
> bin/flink run -m yarn-cluster ....
> bin/flink run -m local ...
> bin/flink run -m host:port ...
>
> Does this make sense to you ?
>
> >>> To me it makes sense that the ExecutionEnvironment is not directly
> initialized by the user and instead context sensitive how you want to
> execute your job (Flink CLI vs. IDE, for example).
>
> Right, currently I notice Flink would create different
> ContextExecutionEnvironment based on different submission scenarios (Flink
> Cli vs IDE). To me this is kind of hack approach, not so straightforward.
> What I suggested above is that is that flink should always create the same
> ExecutionEnvironment but with different configuration, and based on the
> configuration it would create the proper ClusterClient for different
> behaviors.
>
>
>
>
>
>
>
> Till Rohrmann <trohrm...@apache.org> 于2018年12月20日周四 下午11:18写道:
>
> > You are probably right that we have code duplication when it comes to the
> > creation of the ClusterClient. This should be reduced in the future.
> >
> > I'm not so sure whether the user should be able to define where the job
> > runs (in your example Yarn). This is actually independent of the job
> > development and is something which is decided at deployment time. To me
> it
> > makes sense that the ExecutionEnvironment is not directly initialized by
> > the user and instead context sensitive how you want to execute your job
> > (Flink CLI vs. IDE, for example). However, I agree that the
> > ExecutionEnvironment should give you access to the ClusterClient and to
> the
> > job (maybe in the form of the JobGraph or a job plan).
> >
> > Cheers,
> > Till
> >
> > On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang <zjf...@gmail.com> wrote:
> >
> > > Hi Till,
> > > Thanks for the feedback. You are right that I expect better
> programmatic
> > > job submission/control api which could be used by downstream project.
> And
> > > it would benefit for the flink ecosystem. When I look at the code of
> > flink
> > > scala-shell and sql-client (I believe they are not the core of flink,
> but
> > > belong to the ecosystem of flink), I find many duplicated code for
> > creating
> > > ClusterClient from user provided configuration (configuration format
> may
> > be
> > > different from scala-shell and sql-client) and then use that
> > ClusterClient
> > > to manipulate jobs. I don't think this is convenient for downstream
> > > projects. What I expect is that downstream project only needs to
> provide
> > > necessary configuration info (maybe introducing class FlinkConf), and
> > then
> > > build ExecutionEnvironment based on this FlinkConf, and
> > > ExecutionEnvironment will create the proper ClusterClient. It not only
> > > benefit for the downstream project development but also be helpful for
> > > their integration test with flink. Here's one sample code snippet that
> I
> > > expect.
> > >
> > > val conf = new FlinkConf().mode("yarn")
> > > val env = new ExecutionEnvironment(conf)
> > > val jobId = env.submit(...)
> > > val jobStatus = env.getClusterClient().queryJobStatus(jobId)
> > > env.getClusterClient().cancelJob(jobId)
> > >
> > > What do you think ?
> > >
> > >
> > >
> > >
> > > Till Rohrmann <trohrm...@apache.org> 于2018年12月11日周二 下午6:28写道:
> > >
> > > > Hi Jeff,
> > > >
> > > > what you are proposing is to provide the user with better
> programmatic
> > > job
> > > > control. There was actually an effort to achieve this but it has
> never
> > > been
> > > > completed [1]. However, there are some improvement in the code base
> > now.
> > > > Look for example at the NewClusterClient interface which offers a
> > > > non-blocking job submission. But I agree that we need to improve
> Flink
> > in
> > > > this regard.
> > > >
> > > > I would not be in favour if exposing all ClusterClient calls via the
> > > > ExecutionEnvironment because it would clutter the class and would not
> > be
> > > a
> > > > good separation of concerns. Instead one idea could be to retrieve
> the
> > > > current ClusterClient from the ExecutionEnvironment which can then be
> > > used
> > > > for cluster and job control. But before we start an effort here, we
> > need
> > > to
> > > > agree and capture what functionality we want to provide.
> > > >
> > > > Initially, the idea was that we have the ClusterDescriptor describing
> > how
> > > > to talk to cluster manager like Yarn or Mesos. The ClusterDescriptor
> > can
> > > be
> > > > used for deploying Flink clusters (job and session) and gives you a
> > > > ClusterClient. The ClusterClient controls the cluster (e.g.
> submitting
> > > > jobs, listing all running jobs). And then there was the idea to
> > > introduce a
> > > > JobClient which you obtain from the ClusterClient to trigger job
> > specific
> > > > operations (e.g. taking a savepoint, cancelling the job).
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-4272
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang <zjf...@gmail.com>
> wrote:
> > > >
> > > > > Hi Folks,
> > > > >
> > > > > I am trying to integrate flink into apache zeppelin which is an
> > > > interactive
> > > > > notebook. And I hit several issues that is caused by flink client
> > api.
> > > So
> > > > > I'd like to proposal the following changes for flink client api.
> > > > >
> > > > > 1. Support nonblocking execution. Currently,
> > > ExecutionEnvironment#execute
> > > > > is a blocking method which would do 2 things, first submit job and
> > then
> > > > > wait for job until it is finished. I'd like introduce a nonblocking
> > > > > execution method like ExecutionEnvironment#submit which only submit
> > job
> > > > and
> > > > > then return jobId to client. And allow user to query the job status
> > via
> > > > the
> > > > > jobId.
> > > > >
> > > > > 2. Add cancel api in
> ExecutionEnvironment/StreamExecutionEnvironment,
> > > > > currently the only way to cancel job is via cli (bin/flink), this
> is
> > > not
> > > > > convenient for downstream project to use this feature. So I'd like
> to
> > > add
> > > > > cancel api in ExecutionEnvironment
> > > > >
> > > > > 3. Add savepoint api in
> > > ExecutionEnvironment/StreamExecutionEnvironment.
> > > > It
> > > > > is similar as cancel api, we should use ExecutionEnvironment as the
> > > > unified
> > > > > api for third party to integrate with flink.
> > > > >
> > > > > 4. Add listener for job execution lifecycle. Something like
> > following,
> > > so
> > > > > that downstream project can do custom logic in the lifecycle of
> job.
> > > e.g.
> > > > > Zeppelin would capture the jobId after job is submitted and then
> use
> > > this
> > > > > jobId to cancel it later when necessary.
> > > > >
> > > > > public interface JobListener {
> > > > >
> > > > >    void onJobSubmitted(JobID jobId);
> > > > >
> > > > >    void onJobExecuted(JobExecutionResult jobResult);
> > > > >
> > > > >    void onJobCanceled(JobID jobId);
> > > > > }
> > > > >
> > > > > 5. Enable session in ExecutionEnvironment. Currently it is
> disabled,
> > > but
> > > > > session is very convenient for third party to submitting jobs
> > > > continually.
> > > > > I hope flink can enable it again.
> > > > >
> > > > > 6. Unify all flink client api into
> > > > > ExecutionEnvironment/StreamExecutionEnvironment.
> > > > >
> > > > > This is a long term issue which needs more careful thinking and
> > design.
> > > > > Currently some of features of flink is exposed in
> > > > > ExecutionEnvironment/StreamExecutionEnvironment, but some are
> exposed
> > > in
> > > > > cli instead of api, like the cancel and savepoint I mentioned
> above.
> > I
> > > > > think the root cause is due to that flink didn't unify the
> > interaction
> > > > with
> > > > > flink. Here I list 3 scenarios of flink operation
> > > > >
> > > > >    - Local job execution.  Flink will create LocalEnvironment and
> > then
> > > > use
> > > > >    this LocalEnvironment to create LocalExecutor for job execution.
> > > > >    - Remote job execution. Flink will create ClusterClient first
> and
> > > then
> > > > >    create ContextEnvironment based on the ClusterClient and then
> run
> > > the
> > > > > job.
> > > > >    - Job cancelation. Flink will create ClusterClient first and
> then
> > > > cancel
> > > > >    this job via this ClusterClient.
> > > > >
> > > > > As you can see in the above 3 scenarios. Flink didn't use the same
> > > > > approach(code path) to interact with flink
> > > > > What I propose is following:
> > > > > Create the proper LocalEnvironment/RemoteEnvironment (based on user
> > > > > configuration) --> Use this Environment to create proper
> > ClusterClient
> > > > > (LocalClusterClient or RestClusterClient) to interactive with
> Flink (
> > > job
> > > > > execution or cancelation)
> > > > >
> > > > > This way we can unify the process of local execution and remote
> > > > execution.
> > > > > And it is much easier for third party to integrate with flink,
> > because
> > > > > ExecutionEnvironment is the unified entry point for flink. What
> third
> > > > party
> > > > > needs to do is just pass configuration to ExecutionEnvironment and
> > > > > ExecutionEnvironment will do the right thing based on the
> > > configuration.
> > > > > Flink cli can also be considered as flink api consumer. it just
> pass
> > > the
> > > > > configuration to ExecutionEnvironment and let ExecutionEnvironment
> to
> > > > > create the proper ClusterClient instead of letting cli to create
> > > > > ClusterClient directly.
> > > > >
> > > > >
> > > > > 6 would involve large code refactoring, so I think we can defer it
> > for
> > > > > future release, 1,2,3,4,5 could be done at once I believe. Let me
> > know
> > > > your
> > > > > comments and feedback, thanks
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Best Regards
> > > > >
> > > > > Jeff Zhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best Regards
> > >
> > > Jeff Zhang
> > >
> >
>
>
> --
> Best Regards
>
> Jeff Zhang
>

Reply via email to