Sorry folks, the design doc is late as you expected. Here's the design doc
I drafted, welcome any comments and feedback.

https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing



Stephan Ewen <se...@apache.org> 于2019年2月14日周四 下午8:43写道:

> Nice that this discussion is happening.
>
> In the FLIP, we could also revisit the entire role of the environments
> again.
>
> Initially, the idea was:
>   - the environments take care of the specific setup for standalone (no
> setup needed), yarn, mesos, etc.
>   - the session ones have control over the session. The environment holds
> the session client.
>   - running a job gives a "control" object for that job. That behavior is
> the same in all environments.
>
> The actual implementation diverged quite a bit from that. Happy to see a
> discussion about straitening this out a bit more.
>
>
> On Tue, Feb 12, 2019 at 4:58 AM Jeff Zhang <zjf...@gmail.com> wrote:
>
> > Hi folks,
> >
> > Sorry for late response, It seems we reach consensus on this, I will
> create
> > FLIP for this with more detailed design
> >
> >
> > Thomas Weise <t...@apache.org> 于2018年12月21日周五 上午11:43写道:
> >
> > > Great to see this discussion seeded! The problems you face with the
> > > Zeppelin integration are also affecting other downstream projects, like
> > > Beam.
> > >
> > > We just enabled the savepoint restore option in RemoteStreamEnvironment
> > [1]
> > > and that was more difficult than it should be. The main issue is that
> > > environment and cluster client aren't decoupled. Ideally it should be
> > > possible to just get the matching cluster client from the environment
> and
> > > then control the job through it (environment as factory for cluster
> > > client). But note that the environment classes are part of the public
> > API,
> > > and it is not straightforward to make larger changes without breaking
> > > backward compatibility.
> > >
> > > ClusterClient currently exposes internal classes like JobGraph and
> > > StreamGraph. But it should be possible to wrap this with a new public
> API
> > > that brings the required job control capabilities for downstream
> > projects.
> > > Perhaps it is helpful to look at some of the interfaces in Beam while
> > > thinking about this: [2] for the portable job API and [3] for the old
> > > asynchronous job control from the Beam Java SDK.
> > >
> > > The backward compatibility discussion [4] is also relevant here. A new
> > API
> > > should shield downstream projects from internals and allow them to
> > > interoperate with multiple future Flink versions in the same release
> line
> > > without forced upgrades.
> > >
> > > Thanks,
> > > Thomas
> > >
> > > [1] https://github.com/apache/flink/pull/7249
> > > [2]
> > >
> > >
> >
> https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto
> > > [3]
> > >
> > >
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
> > > [4]
> > >
> > >
> >
> https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E
> > >
> > >
> > > On Thu, Dec 20, 2018 at 6:15 PM Jeff Zhang <zjf...@gmail.com> wrote:
> > >
> > > > >>> I'm not so sure whether the user should be able to define where
> the
> > > job
> > > > runs (in your example Yarn). This is actually independent of the job
> > > > development and is something which is decided at deployment time.
> > > >
> > > > User don't need to specify execution mode programmatically. They can
> > also
> > > > pass the execution mode from the arguments in flink run command. e.g.
> > > >
> > > > bin/flink run -m yarn-cluster ....
> > > > bin/flink run -m local ...
> > > > bin/flink run -m host:port ...
> > > >
> > > > Does this make sense to you ?
> > > >
> > > > >>> To me it makes sense that the ExecutionEnvironment is not
> directly
> > > > initialized by the user and instead context sensitive how you want to
> > > > execute your job (Flink CLI vs. IDE, for example).
> > > >
> > > > Right, currently I notice Flink would create different
> > > > ContextExecutionEnvironment based on different submission scenarios
> > > (Flink
> > > > Cli vs IDE). To me this is kind of hack approach, not so
> > straightforward.
> > > > What I suggested above is that is that flink should always create the
> > > same
> > > > ExecutionEnvironment but with different configuration, and based on
> the
> > > > configuration it would create the proper ClusterClient for different
> > > > behaviors.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Till Rohrmann <trohrm...@apache.org> 于2018年12月20日周四 下午11:18写道:
> > > >
> > > > > You are probably right that we have code duplication when it comes
> to
> > > the
> > > > > creation of the ClusterClient. This should be reduced in the
> future.
> > > > >
> > > > > I'm not so sure whether the user should be able to define where the
> > job
> > > > > runs (in your example Yarn). This is actually independent of the
> job
> > > > > development and is something which is decided at deployment time.
> To
> > me
> > > > it
> > > > > makes sense that the ExecutionEnvironment is not directly
> initialized
> > > by
> > > > > the user and instead context sensitive how you want to execute your
> > job
> > > > > (Flink CLI vs. IDE, for example). However, I agree that the
> > > > > ExecutionEnvironment should give you access to the ClusterClient
> and
> > to
> > > > the
> > > > > job (maybe in the form of the JobGraph or a job plan).
> > > > >
> > > > > Cheers,
> > > > > Till
> > > > >
> > > > > On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang <zjf...@gmail.com>
> wrote:
> > > > >
> > > > > > Hi Till,
> > > > > > Thanks for the feedback. You are right that I expect better
> > > > programmatic
> > > > > > job submission/control api which could be used by downstream
> > project.
> > > > And
> > > > > > it would benefit for the flink ecosystem. When I look at the code
> > of
> > > > > flink
> > > > > > scala-shell and sql-client (I believe they are not the core of
> > flink,
> > > > but
> > > > > > belong to the ecosystem of flink), I find many duplicated code
> for
> > > > > creating
> > > > > > ClusterClient from user provided configuration (configuration
> > format
> > > > may
> > > > > be
> > > > > > different from scala-shell and sql-client) and then use that
> > > > > ClusterClient
> > > > > > to manipulate jobs. I don't think this is convenient for
> downstream
> > > > > > projects. What I expect is that downstream project only needs to
> > > > provide
> > > > > > necessary configuration info (maybe introducing class FlinkConf),
> > and
> > > > > then
> > > > > > build ExecutionEnvironment based on this FlinkConf, and
> > > > > > ExecutionEnvironment will create the proper ClusterClient. It not
> > > only
> > > > > > benefit for the downstream project development but also be
> helpful
> > > for
> > > > > > their integration test with flink. Here's one sample code snippet
> > > that
> > > > I
> > > > > > expect.
> > > > > >
> > > > > > val conf = new FlinkConf().mode("yarn")
> > > > > > val env = new ExecutionEnvironment(conf)
> > > > > > val jobId = env.submit(...)
> > > > > > val jobStatus = env.getClusterClient().queryJobStatus(jobId)
> > > > > > env.getClusterClient().cancelJob(jobId)
> > > > > >
> > > > > > What do you think ?
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > Till Rohrmann <trohrm...@apache.org> 于2018年12月11日周二 下午6:28写道:
> > > > > >
> > > > > > > Hi Jeff,
> > > > > > >
> > > > > > > what you are proposing is to provide the user with better
> > > > programmatic
> > > > > > job
> > > > > > > control. There was actually an effort to achieve this but it
> has
> > > > never
> > > > > > been
> > > > > > > completed [1]. However, there are some improvement in the code
> > base
> > > > > now.
> > > > > > > Look for example at the NewClusterClient interface which
> offers a
> > > > > > > non-blocking job submission. But I agree that we need to
> improve
> > > > Flink
> > > > > in
> > > > > > > this regard.
> > > > > > >
> > > > > > > I would not be in favour if exposing all ClusterClient calls
> via
> > > the
> > > > > > > ExecutionEnvironment because it would clutter the class and
> would
> > > not
> > > > > be
> > > > > > a
> > > > > > > good separation of concerns. Instead one idea could be to
> > retrieve
> > > > the
> > > > > > > current ClusterClient from the ExecutionEnvironment which can
> > then
> > > be
> > > > > > used
> > > > > > > for cluster and job control. But before we start an effort
> here,
> > we
> > > > > need
> > > > > > to
> > > > > > > agree and capture what functionality we want to provide.
> > > > > > >
> > > > > > > Initially, the idea was that we have the ClusterDescriptor
> > > describing
> > > > > how
> > > > > > > to talk to cluster manager like Yarn or Mesos. The
> > > ClusterDescriptor
> > > > > can
> > > > > > be
> > > > > > > used for deploying Flink clusters (job and session) and gives
> > you a
> > > > > > > ClusterClient. The ClusterClient controls the cluster (e.g.
> > > > submitting
> > > > > > > jobs, listing all running jobs). And then there was the idea to
> > > > > > introduce a
> > > > > > > JobClient which you obtain from the ClusterClient to trigger
> job
> > > > > specific
> > > > > > > operations (e.g. taking a savepoint, cancelling the job).
> > > > > > >
> > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-4272
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Till
> > > > > > >
> > > > > > > On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang <zjf...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Folks,
> > > > > > > >
> > > > > > > > I am trying to integrate flink into apache zeppelin which is
> an
> > > > > > > interactive
> > > > > > > > notebook. And I hit several issues that is caused by flink
> > client
> > > > > api.
> > > > > > So
> > > > > > > > I'd like to proposal the following changes for flink client
> > api.
> > > > > > > >
> > > > > > > > 1. Support nonblocking execution. Currently,
> > > > > > ExecutionEnvironment#execute
> > > > > > > > is a blocking method which would do 2 things, first submit
> job
> > > and
> > > > > then
> > > > > > > > wait for job until it is finished. I'd like introduce a
> > > nonblocking
> > > > > > > > execution method like ExecutionEnvironment#submit which only
> > > submit
> > > > > job
> > > > > > > and
> > > > > > > > then return jobId to client. And allow user to query the job
> > > status
> > > > > via
> > > > > > > the
> > > > > > > > jobId.
> > > > > > > >
> > > > > > > > 2. Add cancel api in
> > > > ExecutionEnvironment/StreamExecutionEnvironment,
> > > > > > > > currently the only way to cancel job is via cli (bin/flink),
> > this
> > > > is
> > > > > > not
> > > > > > > > convenient for downstream project to use this feature. So I'd
> > > like
> > > > to
> > > > > > add
> > > > > > > > cancel api in ExecutionEnvironment
> > > > > > > >
> > > > > > > > 3. Add savepoint api in
> > > > > > ExecutionEnvironment/StreamExecutionEnvironment.
> > > > > > > It
> > > > > > > > is similar as cancel api, we should use ExecutionEnvironment
> as
> > > the
> > > > > > > unified
> > > > > > > > api for third party to integrate with flink.
> > > > > > > >
> > > > > > > > 4. Add listener for job execution lifecycle. Something like
> > > > > following,
> > > > > > so
> > > > > > > > that downstream project can do custom logic in the lifecycle
> of
> > > > job.
> > > > > > e.g.
> > > > > > > > Zeppelin would capture the jobId after job is submitted and
> > then
> > > > use
> > > > > > this
> > > > > > > > jobId to cancel it later when necessary.
> > > > > > > >
> > > > > > > > public interface JobListener {
> > > > > > > >
> > > > > > > >    void onJobSubmitted(JobID jobId);
> > > > > > > >
> > > > > > > >    void onJobExecuted(JobExecutionResult jobResult);
> > > > > > > >
> > > > > > > >    void onJobCanceled(JobID jobId);
> > > > > > > > }
> > > > > > > >
> > > > > > > > 5. Enable session in ExecutionEnvironment. Currently it is
> > > > disabled,
> > > > > > but
> > > > > > > > session is very convenient for third party to submitting jobs
> > > > > > > continually.
> > > > > > > > I hope flink can enable it again.
> > > > > > > >
> > > > > > > > 6. Unify all flink client api into
> > > > > > > > ExecutionEnvironment/StreamExecutionEnvironment.
> > > > > > > >
> > > > > > > > This is a long term issue which needs more careful thinking
> and
> > > > > design.
> > > > > > > > Currently some of features of flink is exposed in
> > > > > > > > ExecutionEnvironment/StreamExecutionEnvironment, but some are
> > > > exposed
> > > > > > in
> > > > > > > > cli instead of api, like the cancel and savepoint I mentioned
> > > > above.
> > > > > I
> > > > > > > > think the root cause is due to that flink didn't unify the
> > > > > interaction
> > > > > > > with
> > > > > > > > flink. Here I list 3 scenarios of flink operation
> > > > > > > >
> > > > > > > >    - Local job execution.  Flink will create LocalEnvironment
> > and
> > > > > then
> > > > > > > use
> > > > > > > >    this LocalEnvironment to create LocalExecutor for job
> > > execution.
> > > > > > > >    - Remote job execution. Flink will create ClusterClient
> > first
> > > > and
> > > > > > then
> > > > > > > >    create ContextEnvironment based on the ClusterClient and
> > then
> > > > run
> > > > > > the
> > > > > > > > job.
> > > > > > > >    - Job cancelation. Flink will create ClusterClient first
> and
> > > > then
> > > > > > > cancel
> > > > > > > >    this job via this ClusterClient.
> > > > > > > >
> > > > > > > > As you can see in the above 3 scenarios. Flink didn't use the
> > > same
> > > > > > > > approach(code path) to interact with flink
> > > > > > > > What I propose is following:
> > > > > > > > Create the proper LocalEnvironment/RemoteEnvironment (based
> on
> > > user
> > > > > > > > configuration) --> Use this Environment to create proper
> > > > > ClusterClient
> > > > > > > > (LocalClusterClient or RestClusterClient) to interactive with
> > > > Flink (
> > > > > > job
> > > > > > > > execution or cancelation)
> > > > > > > >
> > > > > > > > This way we can unify the process of local execution and
> remote
> > > > > > > execution.
> > > > > > > > And it is much easier for third party to integrate with
> flink,
> > > > > because
> > > > > > > > ExecutionEnvironment is the unified entry point for flink.
> What
> > > > third
> > > > > > > party
> > > > > > > > needs to do is just pass configuration to
> ExecutionEnvironment
> > > and
> > > > > > > > ExecutionEnvironment will do the right thing based on the
> > > > > > configuration.
> > > > > > > > Flink cli can also be considered as flink api consumer. it
> just
> > > > pass
> > > > > > the
> > > > > > > > configuration to ExecutionEnvironment and let
> > > ExecutionEnvironment
> > > > to
> > > > > > > > create the proper ClusterClient instead of letting cli to
> > create
> > > > > > > > ClusterClient directly.
> > > > > > > >
> > > > > > > >
> > > > > > > > 6 would involve large code refactoring, so I think we can
> defer
> > > it
> > > > > for
> > > > > > > > future release, 1,2,3,4,5 could be done at once I believe.
> Let
> > me
> > > > > know
> > > > > > > your
> > > > > > > > comments and feedback, thanks
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > Best Regards
> > > > > > > >
> > > > > > > > Jeff Zhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Best Regards
> > > > > >
> > > > > > Jeff Zhang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Best Regards
> > > >
> > > > Jeff Zhang
> > > >
> > >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>


-- 
Best Regards

Jeff Zhang

Reply via email to