Nice that this discussion is happening.

In the FLIP, we could also revisit the entire role of the environments
again.

Initially, the idea was:
  - the environments take care of the specific setup for standalone (no
setup needed), yarn, mesos, etc.
  - the session ones have control over the session. The environment holds
the session client.
  - running a job gives a "control" object for that job. That behavior is
the same in all environments.

The actual implementation diverged quite a bit from that. Happy to see a
discussion about straitening this out a bit more.


On Tue, Feb 12, 2019 at 4:58 AM Jeff Zhang <zjf...@gmail.com> wrote:

> Hi folks,
>
> Sorry for late response, It seems we reach consensus on this, I will create
> FLIP for this with more detailed design
>
>
> Thomas Weise <t...@apache.org> 于2018年12月21日周五 上午11:43写道:
>
> > Great to see this discussion seeded! The problems you face with the
> > Zeppelin integration are also affecting other downstream projects, like
> > Beam.
> >
> > We just enabled the savepoint restore option in RemoteStreamEnvironment
> [1]
> > and that was more difficult than it should be. The main issue is that
> > environment and cluster client aren't decoupled. Ideally it should be
> > possible to just get the matching cluster client from the environment and
> > then control the job through it (environment as factory for cluster
> > client). But note that the environment classes are part of the public
> API,
> > and it is not straightforward to make larger changes without breaking
> > backward compatibility.
> >
> > ClusterClient currently exposes internal classes like JobGraph and
> > StreamGraph. But it should be possible to wrap this with a new public API
> > that brings the required job control capabilities for downstream
> projects.
> > Perhaps it is helpful to look at some of the interfaces in Beam while
> > thinking about this: [2] for the portable job API and [3] for the old
> > asynchronous job control from the Beam Java SDK.
> >
> > The backward compatibility discussion [4] is also relevant here. A new
> API
> > should shield downstream projects from internals and allow them to
> > interoperate with multiple future Flink versions in the same release line
> > without forced upgrades.
> >
> > Thanks,
> > Thomas
> >
> > [1] https://github.com/apache/flink/pull/7249
> > [2]
> >
> >
> https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto
> > [3]
> >
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
> > [4]
> >
> >
> https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E
> >
> >
> > On Thu, Dec 20, 2018 at 6:15 PM Jeff Zhang <zjf...@gmail.com> wrote:
> >
> > > >>> I'm not so sure whether the user should be able to define where the
> > job
> > > runs (in your example Yarn). This is actually independent of the job
> > > development and is something which is decided at deployment time.
> > >
> > > User don't need to specify execution mode programmatically. They can
> also
> > > pass the execution mode from the arguments in flink run command. e.g.
> > >
> > > bin/flink run -m yarn-cluster ....
> > > bin/flink run -m local ...
> > > bin/flink run -m host:port ...
> > >
> > > Does this make sense to you ?
> > >
> > > >>> To me it makes sense that the ExecutionEnvironment is not directly
> > > initialized by the user and instead context sensitive how you want to
> > > execute your job (Flink CLI vs. IDE, for example).
> > >
> > > Right, currently I notice Flink would create different
> > > ContextExecutionEnvironment based on different submission scenarios
> > (Flink
> > > Cli vs IDE). To me this is kind of hack approach, not so
> straightforward.
> > > What I suggested above is that is that flink should always create the
> > same
> > > ExecutionEnvironment but with different configuration, and based on the
> > > configuration it would create the proper ClusterClient for different
> > > behaviors.
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > Till Rohrmann <trohrm...@apache.org> 于2018年12月20日周四 下午11:18写道:
> > >
> > > > You are probably right that we have code duplication when it comes to
> > the
> > > > creation of the ClusterClient. This should be reduced in the future.
> > > >
> > > > I'm not so sure whether the user should be able to define where the
> job
> > > > runs (in your example Yarn). This is actually independent of the job
> > > > development and is something which is decided at deployment time. To
> me
> > > it
> > > > makes sense that the ExecutionEnvironment is not directly initialized
> > by
> > > > the user and instead context sensitive how you want to execute your
> job
> > > > (Flink CLI vs. IDE, for example). However, I agree that the
> > > > ExecutionEnvironment should give you access to the ClusterClient and
> to
> > > the
> > > > job (maybe in the form of the JobGraph or a job plan).
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang <zjf...@gmail.com> wrote:
> > > >
> > > > > Hi Till,
> > > > > Thanks for the feedback. You are right that I expect better
> > > programmatic
> > > > > job submission/control api which could be used by downstream
> project.
> > > And
> > > > > it would benefit for the flink ecosystem. When I look at the code
> of
> > > > flink
> > > > > scala-shell and sql-client (I believe they are not the core of
> flink,
> > > but
> > > > > belong to the ecosystem of flink), I find many duplicated code for
> > > > creating
> > > > > ClusterClient from user provided configuration (configuration
> format
> > > may
> > > > be
> > > > > different from scala-shell and sql-client) and then use that
> > > > ClusterClient
> > > > > to manipulate jobs. I don't think this is convenient for downstream
> > > > > projects. What I expect is that downstream project only needs to
> > > provide
> > > > > necessary configuration info (maybe introducing class FlinkConf),
> and
> > > > then
> > > > > build ExecutionEnvironment based on this FlinkConf, and
> > > > > ExecutionEnvironment will create the proper ClusterClient. It not
> > only
> > > > > benefit for the downstream project development but also be helpful
> > for
> > > > > their integration test with flink. Here's one sample code snippet
> > that
> > > I
> > > > > expect.
> > > > >
> > > > > val conf = new FlinkConf().mode("yarn")
> > > > > val env = new ExecutionEnvironment(conf)
> > > > > val jobId = env.submit(...)
> > > > > val jobStatus = env.getClusterClient().queryJobStatus(jobId)
> > > > > env.getClusterClient().cancelJob(jobId)
> > > > >
> > > > > What do you think ?
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Till Rohrmann <trohrm...@apache.org> 于2018年12月11日周二 下午6:28写道:
> > > > >
> > > > > > Hi Jeff,
> > > > > >
> > > > > > what you are proposing is to provide the user with better
> > > programmatic
> > > > > job
> > > > > > control. There was actually an effort to achieve this but it has
> > > never
> > > > > been
> > > > > > completed [1]. However, there are some improvement in the code
> base
> > > > now.
> > > > > > Look for example at the NewClusterClient interface which offers a
> > > > > > non-blocking job submission. But I agree that we need to improve
> > > Flink
> > > > in
> > > > > > this regard.
> > > > > >
> > > > > > I would not be in favour if exposing all ClusterClient calls via
> > the
> > > > > > ExecutionEnvironment because it would clutter the class and would
> > not
> > > > be
> > > > > a
> > > > > > good separation of concerns. Instead one idea could be to
> retrieve
> > > the
> > > > > > current ClusterClient from the ExecutionEnvironment which can
> then
> > be
> > > > > used
> > > > > > for cluster and job control. But before we start an effort here,
> we
> > > > need
> > > > > to
> > > > > > agree and capture what functionality we want to provide.
> > > > > >
> > > > > > Initially, the idea was that we have the ClusterDescriptor
> > describing
> > > > how
> > > > > > to talk to cluster manager like Yarn or Mesos. The
> > ClusterDescriptor
> > > > can
> > > > > be
> > > > > > used for deploying Flink clusters (job and session) and gives
> you a
> > > > > > ClusterClient. The ClusterClient controls the cluster (e.g.
> > > submitting
> > > > > > jobs, listing all running jobs). And then there was the idea to
> > > > > introduce a
> > > > > > JobClient which you obtain from the ClusterClient to trigger job
> > > > specific
> > > > > > operations (e.g. taking a savepoint, cancelling the job).
> > > > > >
> > > > > > [1] https://issues.apache.org/jira/browse/FLINK-4272
> > > > > >
> > > > > > Cheers,
> > > > > > Till
> > > > > >
> > > > > > On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang <zjf...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hi Folks,
> > > > > > >
> > > > > > > I am trying to integrate flink into apache zeppelin which is an
> > > > > > interactive
> > > > > > > notebook. And I hit several issues that is caused by flink
> client
> > > > api.
> > > > > So
> > > > > > > I'd like to proposal the following changes for flink client
> api.
> > > > > > >
> > > > > > > 1. Support nonblocking execution. Currently,
> > > > > ExecutionEnvironment#execute
> > > > > > > is a blocking method which would do 2 things, first submit job
> > and
> > > > then
> > > > > > > wait for job until it is finished. I'd like introduce a
> > nonblocking
> > > > > > > execution method like ExecutionEnvironment#submit which only
> > submit
> > > > job
> > > > > > and
> > > > > > > then return jobId to client. And allow user to query the job
> > status
> > > > via
> > > > > > the
> > > > > > > jobId.
> > > > > > >
> > > > > > > 2. Add cancel api in
> > > ExecutionEnvironment/StreamExecutionEnvironment,
> > > > > > > currently the only way to cancel job is via cli (bin/flink),
> this
> > > is
> > > > > not
> > > > > > > convenient for downstream project to use this feature. So I'd
> > like
> > > to
> > > > > add
> > > > > > > cancel api in ExecutionEnvironment
> > > > > > >
> > > > > > > 3. Add savepoint api in
> > > > > ExecutionEnvironment/StreamExecutionEnvironment.
> > > > > > It
> > > > > > > is similar as cancel api, we should use ExecutionEnvironment as
> > the
> > > > > > unified
> > > > > > > api for third party to integrate with flink.
> > > > > > >
> > > > > > > 4. Add listener for job execution lifecycle. Something like
> > > > following,
> > > > > so
> > > > > > > that downstream project can do custom logic in the lifecycle of
> > > job.
> > > > > e.g.
> > > > > > > Zeppelin would capture the jobId after job is submitted and
> then
> > > use
> > > > > this
> > > > > > > jobId to cancel it later when necessary.
> > > > > > >
> > > > > > > public interface JobListener {
> > > > > > >
> > > > > > >    void onJobSubmitted(JobID jobId);
> > > > > > >
> > > > > > >    void onJobExecuted(JobExecutionResult jobResult);
> > > > > > >
> > > > > > >    void onJobCanceled(JobID jobId);
> > > > > > > }
> > > > > > >
> > > > > > > 5. Enable session in ExecutionEnvironment. Currently it is
> > > disabled,
> > > > > but
> > > > > > > session is very convenient for third party to submitting jobs
> > > > > > continually.
> > > > > > > I hope flink can enable it again.
> > > > > > >
> > > > > > > 6. Unify all flink client api into
> > > > > > > ExecutionEnvironment/StreamExecutionEnvironment.
> > > > > > >
> > > > > > > This is a long term issue which needs more careful thinking and
> > > > design.
> > > > > > > Currently some of features of flink is exposed in
> > > > > > > ExecutionEnvironment/StreamExecutionEnvironment, but some are
> > > exposed
> > > > > in
> > > > > > > cli instead of api, like the cancel and savepoint I mentioned
> > > above.
> > > > I
> > > > > > > think the root cause is due to that flink didn't unify the
> > > > interaction
> > > > > > with
> > > > > > > flink. Here I list 3 scenarios of flink operation
> > > > > > >
> > > > > > >    - Local job execution.  Flink will create LocalEnvironment
> and
> > > > then
> > > > > > use
> > > > > > >    this LocalEnvironment to create LocalExecutor for job
> > execution.
> > > > > > >    - Remote job execution. Flink will create ClusterClient
> first
> > > and
> > > > > then
> > > > > > >    create ContextEnvironment based on the ClusterClient and
> then
> > > run
> > > > > the
> > > > > > > job.
> > > > > > >    - Job cancelation. Flink will create ClusterClient first and
> > > then
> > > > > > cancel
> > > > > > >    this job via this ClusterClient.
> > > > > > >
> > > > > > > As you can see in the above 3 scenarios. Flink didn't use the
> > same
> > > > > > > approach(code path) to interact with flink
> > > > > > > What I propose is following:
> > > > > > > Create the proper LocalEnvironment/RemoteEnvironment (based on
> > user
> > > > > > > configuration) --> Use this Environment to create proper
> > > > ClusterClient
> > > > > > > (LocalClusterClient or RestClusterClient) to interactive with
> > > Flink (
> > > > > job
> > > > > > > execution or cancelation)
> > > > > > >
> > > > > > > This way we can unify the process of local execution and remote
> > > > > > execution.
> > > > > > > And it is much easier for third party to integrate with flink,
> > > > because
> > > > > > > ExecutionEnvironment is the unified entry point for flink. What
> > > third
> > > > > > party
> > > > > > > needs to do is just pass configuration to ExecutionEnvironment
> > and
> > > > > > > ExecutionEnvironment will do the right thing based on the
> > > > > configuration.
> > > > > > > Flink cli can also be considered as flink api consumer. it just
> > > pass
> > > > > the
> > > > > > > configuration to ExecutionEnvironment and let
> > ExecutionEnvironment
> > > to
> > > > > > > create the proper ClusterClient instead of letting cli to
> create
> > > > > > > ClusterClient directly.
> > > > > > >
> > > > > > >
> > > > > > > 6 would involve large code refactoring, so I think we can defer
> > it
> > > > for
> > > > > > > future release, 1,2,3,4,5 could be done at once I believe. Let
> me
> > > > know
> > > > > > your
> > > > > > > comments and feedback, thanks
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Best Regards
> > > > > > >
> > > > > > > Jeff Zhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Best Regards
> > > > >
> > > > > Jeff Zhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > Best Regards
> > >
> > > Jeff Zhang
> > >
> >
>
>
> --
> Best Regards
>
> Jeff Zhang
>

Reply via email to