>>> I'm not so sure whether the user should be able to define where the job
runs (in your example Yarn). This is actually independent of the job
development and is something which is decided at deployment time.

User don't need to specify execution mode programmatically. They can also
pass the execution mode from the arguments in flink run command. e.g.

bin/flink run -m yarn-cluster ....
bin/flink run -m local ...
bin/flink run -m host:port ...

Does this make sense to you ?

>>> To me it makes sense that the ExecutionEnvironment is not directly
initialized by the user and instead context sensitive how you want to
execute your job (Flink CLI vs. IDE, for example).

Right, currently I notice Flink would create different
ContextExecutionEnvironment based on different submission scenarios (Flink
Cli vs IDE). To me this is kind of hack approach, not so straightforward.
What I suggested above is that is that flink should always create the same
ExecutionEnvironment but with different configuration, and based on the
configuration it would create the proper ClusterClient for different
behaviors.







Till Rohrmann <trohrm...@apache.org> 于2018年12月20日周四 下午11:18写道:

> You are probably right that we have code duplication when it comes to the
> creation of the ClusterClient. This should be reduced in the future.
>
> I'm not so sure whether the user should be able to define where the job
> runs (in your example Yarn). This is actually independent of the job
> development and is something which is decided at deployment time. To me it
> makes sense that the ExecutionEnvironment is not directly initialized by
> the user and instead context sensitive how you want to execute your job
> (Flink CLI vs. IDE, for example). However, I agree that the
> ExecutionEnvironment should give you access to the ClusterClient and to the
> job (maybe in the form of the JobGraph or a job plan).
>
> Cheers,
> Till
>
> On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang <zjf...@gmail.com> wrote:
>
> > Hi Till,
> > Thanks for the feedback. You are right that I expect better programmatic
> > job submission/control api which could be used by downstream project. And
> > it would benefit for the flink ecosystem. When I look at the code of
> flink
> > scala-shell and sql-client (I believe they are not the core of flink, but
> > belong to the ecosystem of flink), I find many duplicated code for
> creating
> > ClusterClient from user provided configuration (configuration format may
> be
> > different from scala-shell and sql-client) and then use that
> ClusterClient
> > to manipulate jobs. I don't think this is convenient for downstream
> > projects. What I expect is that downstream project only needs to provide
> > necessary configuration info (maybe introducing class FlinkConf), and
> then
> > build ExecutionEnvironment based on this FlinkConf, and
> > ExecutionEnvironment will create the proper ClusterClient. It not only
> > benefit for the downstream project development but also be helpful for
> > their integration test with flink. Here's one sample code snippet that I
> > expect.
> >
> > val conf = new FlinkConf().mode("yarn")
> > val env = new ExecutionEnvironment(conf)
> > val jobId = env.submit(...)
> > val jobStatus = env.getClusterClient().queryJobStatus(jobId)
> > env.getClusterClient().cancelJob(jobId)
> >
> > What do you think ?
> >
> >
> >
> >
> > Till Rohrmann <trohrm...@apache.org> 于2018年12月11日周二 下午6:28写道:
> >
> > > Hi Jeff,
> > >
> > > what you are proposing is to provide the user with better programmatic
> > job
> > > control. There was actually an effort to achieve this but it has never
> > been
> > > completed [1]. However, there are some improvement in the code base
> now.
> > > Look for example at the NewClusterClient interface which offers a
> > > non-blocking job submission. But I agree that we need to improve Flink
> in
> > > this regard.
> > >
> > > I would not be in favour if exposing all ClusterClient calls via the
> > > ExecutionEnvironment because it would clutter the class and would not
> be
> > a
> > > good separation of concerns. Instead one idea could be to retrieve the
> > > current ClusterClient from the ExecutionEnvironment which can then be
> > used
> > > for cluster and job control. But before we start an effort here, we
> need
> > to
> > > agree and capture what functionality we want to provide.
> > >
> > > Initially, the idea was that we have the ClusterDescriptor describing
> how
> > > to talk to cluster manager like Yarn or Mesos. The ClusterDescriptor
> can
> > be
> > > used for deploying Flink clusters (job and session) and gives you a
> > > ClusterClient. The ClusterClient controls the cluster (e.g. submitting
> > > jobs, listing all running jobs). And then there was the idea to
> > introduce a
> > > JobClient which you obtain from the ClusterClient to trigger job
> specific
> > > operations (e.g. taking a savepoint, cancelling the job).
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-4272
> > >
> > > Cheers,
> > > Till
> > >
> > > On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang <zjf...@gmail.com> wrote:
> > >
> > > > Hi Folks,
> > > >
> > > > I am trying to integrate flink into apache zeppelin which is an
> > > interactive
> > > > notebook. And I hit several issues that is caused by flink client
> api.
> > So
> > > > I'd like to proposal the following changes for flink client api.
> > > >
> > > > 1. Support nonblocking execution. Currently,
> > ExecutionEnvironment#execute
> > > > is a blocking method which would do 2 things, first submit job and
> then
> > > > wait for job until it is finished. I'd like introduce a nonblocking
> > > > execution method like ExecutionEnvironment#submit which only submit
> job
> > > and
> > > > then return jobId to client. And allow user to query the job status
> via
> > > the
> > > > jobId.
> > > >
> > > > 2. Add cancel api in ExecutionEnvironment/StreamExecutionEnvironment,
> > > > currently the only way to cancel job is via cli (bin/flink), this is
> > not
> > > > convenient for downstream project to use this feature. So I'd like to
> > add
> > > > cancel api in ExecutionEnvironment
> > > >
> > > > 3. Add savepoint api in
> > ExecutionEnvironment/StreamExecutionEnvironment.
> > > It
> > > > is similar as cancel api, we should use ExecutionEnvironment as the
> > > unified
> > > > api for third party to integrate with flink.
> > > >
> > > > 4. Add listener for job execution lifecycle. Something like
> following,
> > so
> > > > that downstream project can do custom logic in the lifecycle of job.
> > e.g.
> > > > Zeppelin would capture the jobId after job is submitted and then use
> > this
> > > > jobId to cancel it later when necessary.
> > > >
> > > > public interface JobListener {
> > > >
> > > >    void onJobSubmitted(JobID jobId);
> > > >
> > > >    void onJobExecuted(JobExecutionResult jobResult);
> > > >
> > > >    void onJobCanceled(JobID jobId);
> > > > }
> > > >
> > > > 5. Enable session in ExecutionEnvironment. Currently it is disabled,
> > but
> > > > session is very convenient for third party to submitting jobs
> > > continually.
> > > > I hope flink can enable it again.
> > > >
> > > > 6. Unify all flink client api into
> > > > ExecutionEnvironment/StreamExecutionEnvironment.
> > > >
> > > > This is a long term issue which needs more careful thinking and
> design.
> > > > Currently some of features of flink is exposed in
> > > > ExecutionEnvironment/StreamExecutionEnvironment, but some are exposed
> > in
> > > > cli instead of api, like the cancel and savepoint I mentioned above.
> I
> > > > think the root cause is due to that flink didn't unify the
> interaction
> > > with
> > > > flink. Here I list 3 scenarios of flink operation
> > > >
> > > >    - Local job execution.  Flink will create LocalEnvironment and
> then
> > > use
> > > >    this LocalEnvironment to create LocalExecutor for job execution.
> > > >    - Remote job execution. Flink will create ClusterClient first and
> > then
> > > >    create ContextEnvironment based on the ClusterClient and then run
> > the
> > > > job.
> > > >    - Job cancelation. Flink will create ClusterClient first and then
> > > cancel
> > > >    this job via this ClusterClient.
> > > >
> > > > As you can see in the above 3 scenarios. Flink didn't use the same
> > > > approach(code path) to interact with flink
> > > > What I propose is following:
> > > > Create the proper LocalEnvironment/RemoteEnvironment (based on user
> > > > configuration) --> Use this Environment to create proper
> ClusterClient
> > > > (LocalClusterClient or RestClusterClient) to interactive with Flink (
> > job
> > > > execution or cancelation)
> > > >
> > > > This way we can unify the process of local execution and remote
> > > execution.
> > > > And it is much easier for third party to integrate with flink,
> because
> > > > ExecutionEnvironment is the unified entry point for flink. What third
> > > party
> > > > needs to do is just pass configuration to ExecutionEnvironment and
> > > > ExecutionEnvironment will do the right thing based on the
> > configuration.
> > > > Flink cli can also be considered as flink api consumer. it just pass
> > the
> > > > configuration to ExecutionEnvironment and let ExecutionEnvironment to
> > > > create the proper ClusterClient instead of letting cli to create
> > > > ClusterClient directly.
> > > >
> > > >
> > > > 6 would involve large code refactoring, so I think we can defer it
> for
> > > > future release, 1,2,3,4,5 could be done at once I believe. Let me
> know
> > > your
> > > > comments and feedback, thanks
> > > >
> > > >
> > > >
> > > > --
> > > > Best Regards
> > > >
> > > > Jeff Zhang
> > > >
> > >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>


-- 
Best Regards

Jeff Zhang

Reply via email to