Thanks tison for the effort. I left a few comments.

Zili Chen <wander4...@gmail.com> 于2019年7月31日周三 下午8:24写道:

> Hi Flavio,
>
> Thanks for your reply.
>
> Either current impl and in the design, ClusterClient
> never takes responsibility for generating JobGraph.
> (what you see in current codebase is several class methods)
>
> Instead, user describes his program in the main method
> with ExecutionEnvironment apis and calls env.compile()
> or env.optimize() to get FlinkPlan and JobGraph respectively.
>
> For listing main classes in a jar and choose one for
> submission, you're now able to customize a CLI to do it.
> Specifically, the path of jar is passed as arguments and
> in the customized CLI you list main classes, choose one
> to submit to the cluster.
>
> Best,
> tison.
>
>
> Flavio Pompermaier <pomperma...@okkam.it> 于2019年7月31日周三 下午8:12写道:
>
> > Just one note on my side: it is not clear to me whether the client needs
> to
> > be able to generate a job graph or not.
> > In my opinion, the job jar must resides only on the server/jobManager
> side
> > and the client requires a way to get the job graph.
> > If you really want to access to the job graph, I'd add a dedicated method
> > on the ClusterClient. like:
> >
> >    - getJobGraph(jarId, mainClass): JobGraph
> >    - listMainClasses(jarId): List<String>
> >
> > These would require some addition also on the job manager endpoint as
> > well..what do you think?
> >
> > On Wed, Jul 31, 2019 at 12:42 PM Zili Chen <wander4...@gmail.com> wrote:
> >
> > > Hi all,
> > >
> > > Here is a document[1] on client api enhancement from our perspective.
> > > We have investigated current implementations. And we propose
> > >
> > > 1. Unify the implementation of cluster deployment and job submission in
> > > Flink.
> > > 2. Provide programmatic interfaces to allow flexible job and cluster
> > > management.
> > >
> > > The first proposal is aimed at reducing code paths of cluster
> deployment
> > > and
> > > job submission so that one can adopt Flink in his usage easily. The
> > second
> > > proposal is aimed at providing rich interfaces for advanced users
> > > who want to make accurate control of these stages.
> > >
> > > Quick reference on open questions:
> > >
> > > 1. Exclude job cluster deployment from client side or redefine the
> > semantic
> > > of job cluster? Since it fits in a process quite different from session
> > > cluster deployment and job submission.
> > >
> > > 2. Maintain the codepaths handling class o.a.f.api.common.Program or
> > > implement customized program handling logic by customized CliFrontend?
> > > See also this thread[2] and the document[1].
> > >
> > > 3. Expose ClusterClient as public api or just expose api in
> > > ExecutionEnvironment
> > > and delegate them to ClusterClient? Further, in either way is it worth
> to
> > > introduce a JobClient which is an encapsulation of ClusterClient that
> > > associated to specific job?
> > >
> > > Best,
> > > tison.
> > >
> > > [1]
> > >
> > >
> >
> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit?usp=sharing
> > > [2]
> > >
> > >
> >
> https://lists.apache.org/thread.html/7ffc9936a384b891dbcf0a481d26c6d13b2125607c200577780d1e18@%3Cdev.flink.apache.org%3E
> > >
> > > Jeff Zhang <zjf...@gmail.com> 于2019年7月24日周三 上午9:19写道:
> > >
> > > > Thanks Stephan, I will follow up this issue in next few weeks, and
> will
> > > > refine the design doc. We could discuss more details after 1.9
> release.
> > > >
> > > > Stephan Ewen <se...@apache.org> 于2019年7月24日周三 上午12:58写道:
> > > >
> > > > > Hi all!
> > > > >
> > > > > This thread has stalled for a bit, which I assume ist mostly due to
> > the
> > > > > Flink 1.9 feature freeze and release testing effort.
> > > > >
> > > > > I personally still recognize this issue as one important to be
> > solved.
> > > > I'd
> > > > > be happy to help resume this discussion soon (after the 1.9
> release)
> > > and
> > > > > see if we can do some step towards this in Flink 1.10.
> > > > >
> > > > > Best,
> > > > > Stephan
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Jun 24, 2019 at 10:41 AM Flavio Pompermaier <
> > > > pomperma...@okkam.it>
> > > > > wrote:
> > > > >
> > > > > > That's exactly what I suggested a long time ago: the Flink REST
> > > client
> > > > > > should not require any Flink dependency, only http library to
> call
> > > the
> > > > > REST
> > > > > > services to submit and monitor a job.
> > > > > > What I suggested also in [1] was to have a way to automatically
> > > suggest
> > > > > the
> > > > > > user (via a UI) the available main classes and their required
> > > > > > parameters[2].
> > > > > > Another problem we have with Flink is that the Rest client and
> the
> > > CLI
> > > > > one
> > > > > > behaves differently and we use the CLI client (via ssh) because
> it
> > > > allows
> > > > > > to call some other method after env.execute() [3] (we have to
> call
> > > > > another
> > > > > > REST service to signal the end of the job).
> > > > > > Int his regard, a dedicated interface, like the JobListener
> > suggested
> > > > in
> > > > > > the previous emails, would be very helpful (IMHO).
> > > > > >
> > > > > > [1] https://issues.apache.org/jira/browse/FLINK-10864
> > > > > > [2] https://issues.apache.org/jira/browse/FLINK-10862
> > > > > > [3] https://issues.apache.org/jira/browse/FLINK-10879
> > > > > >
> > > > > > Best,
> > > > > > Flavio
> > > > > >
> > > > > > On Mon, Jun 24, 2019 at 9:54 AM Jeff Zhang <zjf...@gmail.com>
> > wrote:
> > > > > >
> > > > > > > Hi, Tison,
> > > > > > >
> > > > > > > Thanks for your comments. Overall I agree with you that it is
> > > > difficult
> > > > > > for
> > > > > > > down stream project to integrate with flink and we need to
> > refactor
> > > > the
> > > > > > > current flink client api.
> > > > > > > And I agree that CliFrontend should only parsing command line
> > > > arguments
> > > > > > and
> > > > > > > then pass them to ExecutionEnvironment. It is
> > > ExecutionEnvironment's
> > > > > > > responsibility to compile job, create cluster, and submit job.
> > > > Besides
> > > > > > > that, Currently flink has many ExecutionEnvironment
> > > implementations,
> > > > > and
> > > > > > > flink will use the specific one based on the context. IMHO, it
> is
> > > not
> > > > > > > necessary, ExecutionEnvironment should be able to do the right
> > > thing
> > > > > > based
> > > > > > > on the FlinkConf it is received. Too many ExecutionEnvironment
> > > > > > > implementation is another burden for downstream project
> > > integration.
> > > > > > >
> > > > > > > One thing I'd like to mention is flink's scala shell and sql
> > > client,
> > > > > > > although they are sub-modules of flink, they could be treated
> as
> > > > > > downstream
> > > > > > > project which use flink's client api. Currently you will find
> it
> > is
> > > > not
> > > > > > > easy for them to integrate with flink, they share many
> duplicated
> > > > code.
> > > > > > It
> > > > > > > is another sign that we should refactor flink client api.
> > > > > > >
> > > > > > > I believe it is a large and hard change, and I am afraid we can
> > not
> > > > > keep
> > > > > > > compatibility since many of changes are user facing.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Zili Chen <wander4...@gmail.com> 于2019年6月24日周一 下午2:53写道:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > After a closer look on our client apis, I can see there are
> two
> > > > major
> > > > > > > > issues to consistency and integration, namely different
> > > deployment
> > > > of
> > > > > > > > job cluster which couples job graph creation and cluster
> > > > deployment,
> > > > > > > > and submission via CliFrontend confusing control flow of job
> > > graph
> > > > > > > > compilation and job submission. I'd like to follow the
> discuss
> > > > above,
> > > > > > > > mainly the process described by Jeff and Stephan, and share
> my
> > > > > > > > ideas on these issues.
> > > > > > > >
> > > > > > > > 1) CliFrontend confuses the control flow of job compilation
> and
> > > > > > > submission.
> > > > > > > > Following the process of job submission Stephan and Jeff
> > > described,
> > > > > > > > execution environment knows all configs of the cluster and
> > > > > > topos/settings
> > > > > > > > of the job. Ideally, in the main method of user program, it
> > calls
> > > > > > > #execute
> > > > > > > > (or named #submit) and Flink deploys the cluster, compile the
> > job
> > > > > graph
> > > > > > > > and submit it to the cluster. However, current CliFrontend
> does
> > > all
> > > > > > these
> > > > > > > > things inside its #runProgram method, which introduces a lot
> of
> > > > > > > subclasses
> > > > > > > > of (stream) execution environment.
> > > > > > > >
> > > > > > > > Actually, it sets up an exec env that hijacks the
> > > > > #execute/executePlan
> > > > > > > > method, initializes the job graph and abort execution. And
> then
> > > > > > > > control flow back to CliFrontend, it deploys the cluster(or
> > > > retrieve
> > > > > > > > the client) and submits the job graph. This is quite a
> specific
> > > > > > internal
> > > > > > > > process inside Flink and none of consistency to anything.
> > > > > > > >
> > > > > > > > 2) Deployment of job cluster couples job graph creation and
> > > cluster
> > > > > > > > deployment. Abstractly, from user job to a concrete
> submission,
> > > it
> > > > > > > requires
> > > > > > > >
> > > > > > > >      create JobGraph --\
> > > > > > > >
> > > > > > > > create ClusterClient -->  submit JobGraph
> > > > > > > >
> > > > > > > > such a dependency. ClusterClient was created by deploying or
> > > > > > retrieving.
> > > > > > > > JobGraph submission requires a compiled JobGraph and valid
> > > > > > ClusterClient,
> > > > > > > > but the creation of ClusterClient is abstractly independent
> of
> > > that
> > > > > of
> > > > > > > > JobGraph. However, in job cluster mode, we deploy job cluster
> > > with
> > > > a
> > > > > > job
> > > > > > > > graph, which means we use another process:
> > > > > > > >
> > > > > > > > create JobGraph --> deploy cluster with the JobGraph
> > > > > > > >
> > > > > > > > Here is another inconsistency and downstream projects/client
> > apis
> > > > are
> > > > > > > > forced to handle different cases with rare supports from
> Flink.
> > > > > > > >
> > > > > > > > Since we likely reached a consensus on
> > > > > > > >
> > > > > > > > 1. all configs gathered by Flink configuration and passed
> > > > > > > > 2. execution environment knows all configs and handles
> > > > execution(both
> > > > > > > > deployment and submission)
> > > > > > > >
> > > > > > > > to the issues above I propose eliminating inconsistencies by
> > > > > following
> > > > > > > > approach:
> > > > > > > >
> > > > > > > > 1) CliFrontend should exactly be a front end, at least for
> > "run"
> > > > > > command.
> > > > > > > > That means it just gathered and passed all config from
> command
> > > line
> > > > > to
> > > > > > > > the main method of user program. Execution environment knows
> > all
> > > > the
> > > > > > info
> > > > > > > > and with an addition to utils for ClusterClient, we
> gracefully
> > > get
> > > > a
> > > > > > > > ClusterClient by deploying or retrieving. In this way, we
> don't
> > > > need
> > > > > to
> > > > > > > > hijack #execute/executePlan methods and can remove various
> > > hacking
> > > > > > > > subclasses of exec env, as well as #run methods in
> > > > ClusterClient(for
> > > > > an
> > > > > > > > interface-ized ClusterClient). Now the control flow flows
> from
> > > > > > > CliFrontend
> > > > > > > > to the main method and never returns.
> > > > > > > >
> > > > > > > > 2) Job cluster means a cluster for the specific job. From
> > another
> > > > > > > > perspective, it is an ephemeral session. We may decouple the
> > > > > deployment
> > > > > > > > with a compiled job graph, but start a session with idle
> > timeout
> > > > > > > > and submit the job following.
> > > > > > > >
> > > > > > > > These topics, before we go into more details on design or
> > > > > > implementation,
> > > > > > > > are better to be aware and discussed for a consensus.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > tison.
> > > > > > > >
> > > > > > > >
> > > > > > > > Zili Chen <wander4...@gmail.com> 于2019年6月20日周四 上午3:21写道:
> > > > > > > >
> > > > > > > >> Hi Jeff,
> > > > > > > >>
> > > > > > > >> Thanks for raising this thread and the design document!
> > > > > > > >>
> > > > > > > >> As @Thomas Weise mentioned above, extending config to flink
> > > > > > > >> requires far more effort than it should be. Another example
> > > > > > > >> is we achieve detach mode by introduce another execution
> > > > > > > >> environment which also hijack #execute method.
> > > > > > > >>
> > > > > > > >> I agree with your idea that user would configure all things
> > > > > > > >> and flink "just" respect it. On this topic I think the
> unusual
> > > > > > > >> control flow when CliFrontend handle "run" command is the
> > > problem.
> > > > > > > >> It handles several configs, mainly about cluster settings,
> and
> > > > > > > >> thus main method of user program is unaware of them. Also it
> > > > > compiles
> > > > > > > >> app to job graph by run the main method with a hijacked exec
> > > env,
> > > > > > > >> which constrain the main method further.
> > > > > > > >>
> > > > > > > >> I'd like to write down a few of notes on configs/args pass
> and
> > > > > > respect,
> > > > > > > >> as well as decoupling job compilation and submission. Share
> on
> > > > this
> > > > > > > >> thread later.
> > > > > > > >>
> > > > > > > >> Best,
> > > > > > > >> tison.
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月17日周一
> 下午7:29写道:
> > > > > > > >>
> > > > > > > >>> Hi Jeff and Flavio,
> > > > > > > >>>
> > > > > > > >>> Thanks Jeff a lot for proposing the design document.
> > > > > > > >>>
> > > > > > > >>> We are also working on refactoring ClusterClient to allow
> > > > flexible
> > > > > > and
> > > > > > > >>> efficient job management in our real-time platform.
> > > > > > > >>> We would like to draft a document to share our ideas with
> > you.
> > > > > > > >>>
> > > > > > > >>> I think it's a good idea to have something like Apache Livy
> > for
> > > > > > Flink,
> > > > > > > >>> and
> > > > > > > >>> the efforts discussed here will take a great step forward
> to
> > > it.
> > > > > > > >>>
> > > > > > > >>> Regards,
> > > > > > > >>> Xiaogang
> > > > > > > >>>
> > > > > > > >>> Flavio Pompermaier <pomperma...@okkam.it> 于2019年6月17日周一
> > > > 下午7:13写道:
> > > > > > > >>>
> > > > > > > >>> > Is there any possibility to have something like Apache
> Livy
> > > [1]
> > > > > > also
> > > > > > > >>> for
> > > > > > > >>> > Flink in the future?
> > > > > > > >>> >
> > > > > > > >>> > [1] https://livy.apache.org/
> > > > > > > >>> >
> > > > > > > >>> > On Tue, Jun 11, 2019 at 5:23 PM Jeff Zhang <
> > zjf...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > >>> >
> > > > > > > >>> > > >>>  Any API we expose should not have dependencies on
> > the
> > > > > > runtime
> > > > > > > >>> > > (flink-runtime) package or other implementation
> details.
> > To
> > > > me,
> > > > > > > this
> > > > > > > >>> > means
> > > > > > > >>> > > that the current ClusterClient cannot be exposed to
> users
> > > > > because
> > > > > > > it
> > > > > > > >>> >  uses
> > > > > > > >>> > > quite some classes from the optimiser and runtime
> > packages.
> > > > > > > >>> > >
> > > > > > > >>> > > We should change ClusterClient from class to interface.
> > > > > > > >>> > > ExecutionEnvironment only use the interface
> ClusterClient
> > > > which
> > > > > > > >>> should be
> > > > > > > >>> > > in flink-clients while the concrete implementation
> class
> > > > could
> > > > > be
> > > > > > > in
> > > > > > > >>> > > flink-runtime.
> > > > > > > >>> > >
> > > > > > > >>> > > >>> What happens when a failure/restart in the client
> > > > happens?
> > > > > > > There
> > > > > > > >>> need
> > > > > > > >>> > > to be a way of re-establishing the connection to the
> job,
> > > set
> > > > > up
> > > > > > > the
> > > > > > > >>> > > listeners again, etc.
> > > > > > > >>> > >
> > > > > > > >>> > > Good point.  First we need to define what does
> > > > failure/restart
> > > > > in
> > > > > > > the
> > > > > > > >>> > > client mean. IIUC, that usually mean network failure
> > which
> > > > will
> > > > > > > >>> happen in
> > > > > > > >>> > > class RestClient. If my understanding is correct,
> > > > restart/retry
> > > > > > > >>> mechanism
> > > > > > > >>> > > should be done in RestClient.
> > > > > > > >>> > >
> > > > > > > >>> > >
> > > > > > > >>> > >
> > > > > > > >>> > >
> > > > > > > >>> > >
> > > > > > > >>> > > Aljoscha Krettek <aljos...@apache.org> 于2019年6月11日周二
> > > > > 下午11:10写道:
> > > > > > > >>> > >
> > > > > > > >>> > > > Some points to consider:
> > > > > > > >>> > > >
> > > > > > > >>> > > > * Any API we expose should not have dependencies on
> the
> > > > > runtime
> > > > > > > >>> > > > (flink-runtime) package or other implementation
> > details.
> > > To
> > > > > me,
> > > > > > > >>> this
> > > > > > > >>> > > means
> > > > > > > >>> > > > that the current ClusterClient cannot be exposed to
> > users
> > > > > > because
> > > > > > > >>> it
> > > > > > > >>> > >  uses
> > > > > > > >>> > > > quite some classes from the optimiser and runtime
> > > packages.
> > > > > > > >>> > > >
> > > > > > > >>> > > > * What happens when a failure/restart in the client
> > > > happens?
> > > > > > > There
> > > > > > > >>> need
> > > > > > > >>> > > to
> > > > > > > >>> > > > be a way of re-establishing the connection to the
> job,
> > > set
> > > > up
> > > > > > the
> > > > > > > >>> > > listeners
> > > > > > > >>> > > > again, etc.
> > > > > > > >>> > > >
> > > > > > > >>> > > > Aljoscha
> > > > > > > >>> > > >
> > > > > > > >>> > > > > On 29. May 2019, at 10:17, Jeff Zhang <
> > > zjf...@gmail.com>
> > > > > > > wrote:
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > Sorry folks, the design doc is late as you
> expected.
> > > > Here's
> > > > > > the
> > > > > > > >>> > design
> > > > > > > >>> > > > doc
> > > > > > > >>> > > > > I drafted, welcome any comments and feedback.
> > > > > > > >>> > > > >
> > > > > > > >>> > > > >
> > > > > > > >>> > > >
> > > > > > > >>> > >
> > > > > > > >>> >
> > > > > > > >>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing
> > > > > > > >>> > > > >
> > > > > > > >>> > > > >
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > Stephan Ewen <se...@apache.org> 于2019年2月14日周四
> > > 下午8:43写道:
> > > > > > > >>> > > > >
> > > > > > > >>> > > > >> Nice that this discussion is happening.
> > > > > > > >>> > > > >>
> > > > > > > >>> > > > >> In the FLIP, we could also revisit the entire role
> > of
> > > > the
> > > > > > > >>> > environments
> > > > > > > >>> > > > >> again.
> > > > > > > >>> > > > >>
> > > > > > > >>> > > > >> Initially, the idea was:
> > > > > > > >>> > > > >>  - the environments take care of the specific
> setup
> > > for
> > > > > > > >>> standalone
> > > > > > > >>> > (no
> > > > > > > >>> > > > >> setup needed), yarn, mesos, etc.
> > > > > > > >>> > > > >>  - the session ones have control over the session.
> > The
> > > > > > > >>> environment
> > > > > > > >>> > > holds
> > > > > > > >>> > > > >> the session client.
> > > > > > > >>> > > > >>  - running a job gives a "control" object for that
> > > job.
> > > > > That
> > > > > > > >>> > behavior
> > > > > > > >>> > > is
> > > > > > > >>> > > > >> the same in all environments.
> > > > > > > >>> > > > >>
> > > > > > > >>> > > > >> The actual implementation diverged quite a bit
> from
> > > > that.
> > > > > > > Happy
> > > > > > > >>> to
> > > > > > > >>> > > see a
> > > > > > > >>> > > > >> discussion about straitening this out a bit more.
> > > > > > > >>> > > > >>
> > > > > > > >>> > > > >>
> > > > > > > >>> > > > >> On Tue, Feb 12, 2019 at 4:58 AM Jeff Zhang <
> > > > > > zjf...@gmail.com>
> > > > > > > >>> > wrote:
> > > > > > > >>> > > > >>
> > > > > > > >>> > > > >>> Hi folks,
> > > > > > > >>> > > > >>>
> > > > > > > >>> > > > >>> Sorry for late response, It seems we reach
> > consensus
> > > on
> > > > > > > this, I
> > > > > > > >>> > will
> > > > > > > >>> > > > >> create
> > > > > > > >>> > > > >>> FLIP for this with more detailed design
> > > > > > > >>> > > > >>>
> > > > > > > >>> > > > >>>
> > > > > > > >>> > > > >>> Thomas Weise <t...@apache.org> 于2018年12月21日周五
> > > > 上午11:43写道:
> > > > > > > >>> > > > >>>
> > > > > > > >>> > > > >>>> Great to see this discussion seeded! The
> problems
> > > you
> > > > > face
> > > > > > > >>> with
> > > > > > > >>> > the
> > > > > > > >>> > > > >>>> Zeppelin integration are also affecting other
> > > > downstream
> > > > > > > >>> projects,
> > > > > > > >>> > > > like
> > > > > > > >>> > > > >>>> Beam.
> > > > > > > >>> > > > >>>>
> > > > > > > >>> > > > >>>> We just enabled the savepoint restore option in
> > > > > > > >>> > > > RemoteStreamEnvironment
> > > > > > > >>> > > > >>> [1]
> > > > > > > >>> > > > >>>> and that was more difficult than it should be.
> The
> > > > main
> > > > > > > issue
> > > > > > > >>> is
> > > > > > > >>> > > that
> > > > > > > >>> > > > >>>> environment and cluster client aren't decoupled.
> > > > Ideally
> > > > > > it
> > > > > > > >>> should
> > > > > > > >>> > > be
> > > > > > > >>> > > > >>>> possible to just get the matching cluster client
> > > from
> > > > > the
> > > > > > > >>> > > environment
> > > > > > > >>> > > > >> and
> > > > > > > >>> > > > >>>> then control the job through it (environment as
> > > > factory
> > > > > > for
> > > > > > > >>> > cluster
> > > > > > > >>> > > > >>>> client). But note that the environment classes
> are
> > > > part
> > > > > of
> > > > > > > the
> > > > > > > >>> > > public
> > > > > > > >>> > > > >>> API,
> > > > > > > >>> > > > >>>> and it is not straightforward to make larger
> > changes
> > > > > > without
> > > > > > > >>> > > breaking
> > > > > > > >>> > > > >>>> backward compatibility.
> > > > > > > >>> > > > >>>>
> > > > > > > >>> > > > >>>> ClusterClient currently exposes internal classes
> > > like
> > > > > > > >>> JobGraph and
> > > > > > > >>> > > > >>>> StreamGraph. But it should be possible to wrap
> > this
> > > > > with a
> > > > > > > new
> > > > > > > >>> > > public
> > > > > > > >>> > > > >> API
> > > > > > > >>> > > > >>>> that brings the required job control
> capabilities
> > > for
> > > > > > > >>> downstream
> > > > > > > >>> > > > >>> projects.
> > > > > > > >>> > > > >>>> Perhaps it is helpful to look at some of the
> > > > interfaces
> > > > > in
> > > > > > > >>> Beam
> > > > > > > >>> > > while
> > > > > > > >>> > > > >>>> thinking about this: [2] for the portable job
> API
> > > and
> > > > > [3]
> > > > > > > for
> > > > > > > >>> the
> > > > > > > >>> > > old
> > > > > > > >>> > > > >>>> asynchronous job control from the Beam Java SDK.
> > > > > > > >>> > > > >>>>
> > > > > > > >>> > > > >>>> The backward compatibility discussion [4] is
> also
> > > > > relevant
> > > > > > > >>> here. A
> > > > > > > >>> > > new
> > > > > > > >>> > > > >>> API
> > > > > > > >>> > > > >>>> should shield downstream projects from internals
> > and
> > > > > allow
> > > > > > > >>> them to
> > > > > > > >>> > > > >>>> interoperate with multiple future Flink versions
> > in
> > > > the
> > > > > > same
> > > > > > > >>> > release
> > > > > > > >>> > > > >> line
> > > > > > > >>> > > > >>>> without forced upgrades.
> > > > > > > >>> > > > >>>>
> > > > > > > >>> > > > >>>> Thanks,
> > > > > > > >>> > > > >>>> Thomas
> > > > > > > >>> > > > >>>>
> > > > > > > >>> > > > >>>> [1] https://github.com/apache/flink/pull/7249
> > > > > > > >>> > > > >>>> [2]
> > > > > > > >>> > > > >>>>
> > > > > > > >>> > > > >>>>
> > > > > > > >>> > > > >>>
> > > > > > > >>> > > > >>
> > > > > > > >>> > > >
> > > > > > > >>> > >
> > > > > > > >>> >
> > > > > > > >>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto
> > > > > > > >>> > > > >>>> [3]
> > > > > > > >>> > > > >>>>
> > > > > > > >>> > > > >>>>
> > > > > > > >>> > > > >>>
> > > > > > > >>> > > > >>
> > > > > > > >>> > > >
> > > > > > > >>> > >
> > > > > > > >>> >
> > > > > > > >>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
> > > > > > > >>> > > > >>>> [4]
> > > > > > > >>> > > > >>>>
> > > > > > > >>> > > > >>>>
> > > > > > > >>> > > > >>>
> > > > > > > >>> > > > >>
> > > > > > > >>> > > >
> > > > > > > >>> > >
> > > > > > > >>> >
> > > > > > > >>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E
> > > > > > > >>> > > > >>>>
> > > > > > > >>> > > > >>>>
> > > > > > > >>> > > > >>>> On Thu, Dec 20, 2018 at 6:15 PM Jeff Zhang <
> > > > > > > zjf...@gmail.com>
> > > > > > > >>> > > wrote:
> > > > > > > >>> > > > >>>>
> > > > > > > >>> > > > >>>>>>>> I'm not so sure whether the user should be
> > able
> > > to
> > > > > > > define
> > > > > > > >>> > where
> > > > > > > >>> > > > >> the
> > > > > > > >>> > > > >>>> job
> > > > > > > >>> > > > >>>>> runs (in your example Yarn). This is actually
> > > > > independent
> > > > > > > of
> > > > > > > >>> the
> > > > > > > >>> > > job
> > > > > > > >>> > > > >>>>> development and is something which is decided
> at
> > > > > > deployment
> > > > > > > >>> time.
> > > > > > > >>> > > > >>>>>
> > > > > > > >>> > > > >>>>> User don't need to specify execution mode
> > > > > > programmatically.
> > > > > > > >>> They
> > > > > > > >>> > > can
> > > > > > > >>> > > > >>> also
> > > > > > > >>> > > > >>>>> pass the execution mode from the arguments in
> > flink
> > > > run
> > > > > > > >>> command.
> > > > > > > >>> > > e.g.
> > > > > > > >>> > > > >>>>>
> > > > > > > >>> > > > >>>>> bin/flink run -m yarn-cluster ....
> > > > > > > >>> > > > >>>>> bin/flink run -m local ...
> > > > > > > >>> > > > >>>>> bin/flink run -m host:port ...
> > > > > > > >>> > > > >>>>>
> > > > > > > >>> > > > >>>>> Does this make sense to you ?
> > > > > > > >>> > > > >>>>>
> > > > > > > >>> > > > >>>>>>>> To me it makes sense that the
> > > ExecutionEnvironment
> > > > > is
> > > > > > > not
> > > > > > > >>> > > > >> directly
> > > > > > > >>> > > > >>>>> initialized by the user and instead context
> > > sensitive
> > > > > how
> > > > > > > you
> > > > > > > >>> > want
> > > > > > > >>> > > to
> > > > > > > >>> > > > >>>>> execute your job (Flink CLI vs. IDE, for
> > example).
> > > > > > > >>> > > > >>>>>
> > > > > > > >>> > > > >>>>> Right, currently I notice Flink would create
> > > > different
> > > > > > > >>> > > > >>>>> ContextExecutionEnvironment based on different
> > > > > submission
> > > > > > > >>> > scenarios
> > > > > > > >>> > > > >>>> (Flink
> > > > > > > >>> > > > >>>>> Cli vs IDE). To me this is kind of hack
> approach,
> > > not
> > > > > so
> > > > > > > >>> > > > >>> straightforward.
> > > > > > > >>> > > > >>>>> What I suggested above is that is that flink
> > should
> > > > > > always
> > > > > > > >>> create
> > > > > > > >>> > > the
> > > > > > > >>> > > > >>>> same
> > > > > > > >>> > > > >>>>> ExecutionEnvironment but with different
> > > > configuration,
> > > > > > and
> > > > > > > >>> based
> > > > > > > >>> > on
> > > > > > > >>> > > > >> the
> > > > > > > >>> > > > >>>>> configuration it would create the proper
> > > > ClusterClient
> > > > > > for
> > > > > > > >>> > > different
> > > > > > > >>> > > > >>>>> behaviors.
> > > > > > > >>> > > > >>>>>
> > > > > > > >>> > > > >>>>>
> > > > > > > >>> > > > >>>>>
> > > > > > > >>> > > > >>>>>
> > > > > > > >>> > > > >>>>>
> > > > > > > >>> > > > >>>>>
> > > > > > > >>> > > > >>>>>
> > > > > > > >>> > > > >>>>> Till Rohrmann <trohrm...@apache.org>
> > > 于2018年12月20日周四
> > > > > > > >>> 下午11:18写道:
> > > > > > > >>> > > > >>>>>
> > > > > > > >>> > > > >>>>>> You are probably right that we have code
> > > duplication
> > > > > > when
> > > > > > > it
> > > > > > > >>> > comes
> > > > > > > >>> > > > >> to
> > > > > > > >>> > > > >>>> the
> > > > > > > >>> > > > >>>>>> creation of the ClusterClient. This should be
> > > > reduced
> > > > > in
> > > > > > > the
> > > > > > > >>> > > > >> future.
> > > > > > > >>> > > > >>>>>>
> > > > > > > >>> > > > >>>>>> I'm not so sure whether the user should be
> able
> > to
> > > > > > define
> > > > > > > >>> where
> > > > > > > >>> > > the
> > > > > > > >>> > > > >>> job
> > > > > > > >>> > > > >>>>>> runs (in your example Yarn). This is actually
> > > > > > independent
> > > > > > > >>> of the
> > > > > > > >>> > > > >> job
> > > > > > > >>> > > > >>>>>> development and is something which is decided
> at
> > > > > > > deployment
> > > > > > > >>> > time.
> > > > > > > >>> > > > >> To
> > > > > > > >>> > > > >>> me
> > > > > > > >>> > > > >>>>> it
> > > > > > > >>> > > > >>>>>> makes sense that the ExecutionEnvironment is
> not
> > > > > > directly
> > > > > > > >>> > > > >> initialized
> > > > > > > >>> > > > >>>> by
> > > > > > > >>> > > > >>>>>> the user and instead context sensitive how you
> > > want
> > > > to
> > > > > > > >>> execute
> > > > > > > >>> > > your
> > > > > > > >>> > > > >>> job
> > > > > > > >>> > > > >>>>>> (Flink CLI vs. IDE, for example). However, I
> > agree
> > > > > that
> > > > > > > the
> > > > > > > >>> > > > >>>>>> ExecutionEnvironment should give you access to
> > the
> > > > > > > >>> ClusterClient
> > > > > > > >>> > > > >> and
> > > > > > > >>> > > > >>> to
> > > > > > > >>> > > > >>>>> the
> > > > > > > >>> > > > >>>>>> job (maybe in the form of the JobGraph or a
> job
> > > > plan).
> > > > > > > >>> > > > >>>>>>
> > > > > > > >>> > > > >>>>>> Cheers,
> > > > > > > >>> > > > >>>>>> Till
> > > > > > > >>> > > > >>>>>>
> > > > > > > >>> > > > >>>>>> On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang <
> > > > > > > >>> zjf...@gmail.com>
> > > > > > > >>> > > > >> wrote:
> > > > > > > >>> > > > >>>>>>
> > > > > > > >>> > > > >>>>>>> Hi Till,
> > > > > > > >>> > > > >>>>>>> Thanks for the feedback. You are right that I
> > > > expect
> > > > > > > better
> > > > > > > >>> > > > >>>>> programmatic
> > > > > > > >>> > > > >>>>>>> job submission/control api which could be
> used
> > by
> > > > > > > >>> downstream
> > > > > > > >>> > > > >>> project.
> > > > > > > >>> > > > >>>>> And
> > > > > > > >>> > > > >>>>>>> it would benefit for the flink ecosystem.
> When
> > I
> > > > look
> > > > > > at
> > > > > > > >>> the
> > > > > > > >>> > code
> > > > > > > >>> > > > >>> of
> > > > > > > >>> > > > >>>>>> flink
> > > > > > > >>> > > > >>>>>>> scala-shell and sql-client (I believe they
> are
> > > not
> > > > > the
> > > > > > > >>> core of
> > > > > > > >>> > > > >>> flink,
> > > > > > > >>> > > > >>>>> but
> > > > > > > >>> > > > >>>>>>> belong to the ecosystem of flink), I find
> many
> > > > > > duplicated
> > > > > > > >>> code
> > > > > > > >>> > > > >> for
> > > > > > > >>> > > > >>>>>> creating
> > > > > > > >>> > > > >>>>>>> ClusterClient from user provided
> configuration
> > > > > > > >>> (configuration
> > > > > > > >>> > > > >>> format
> > > > > > > >>> > > > >>>>> may
> > > > > > > >>> > > > >>>>>> be
> > > > > > > >>> > > > >>>>>>> different from scala-shell and sql-client)
> and
> > > then
> > > > > use
> > > > > > > >>> that
> > > > > > > >>> > > > >>>>>> ClusterClient
> > > > > > > >>> > > > >>>>>>> to manipulate jobs. I don't think this is
> > > > convenient
> > > > > > for
> > > > > > > >>> > > > >> downstream
> > > > > > > >>> > > > >>>>>>> projects. What I expect is that downstream
> > > project
> > > > > only
> > > > > > > >>> needs
> > > > > > > >>> > to
> > > > > > > >>> > > > >>>>> provide
> > > > > > > >>> > > > >>>>>>> necessary configuration info (maybe
> introducing
> > > > class
> > > > > > > >>> > FlinkConf),
> > > > > > > >>> > > > >>> and
> > > > > > > >>> > > > >>>>>> then
> > > > > > > >>> > > > >>>>>>> build ExecutionEnvironment based on this
> > > FlinkConf,
> > > > > and
> > > > > > > >>> > > > >>>>>>> ExecutionEnvironment will create the proper
> > > > > > > ClusterClient.
> > > > > > > >>> It
> > > > > > > >>> > not
> > > > > > > >>> > > > >>>> only
> > > > > > > >>> > > > >>>>>>> benefit for the downstream project
> development
> > > but
> > > > > also
> > > > > > > be
> > > > > > > >>> > > > >> helpful
> > > > > > > >>> > > > >>>> for
> > > > > > > >>> > > > >>>>>>> their integration test with flink. Here's one
> > > > sample
> > > > > > code
> > > > > > > >>> > snippet
> > > > > > > >>> > > > >>>> that
> > > > > > > >>> > > > >>>>> I
> > > > > > > >>> > > > >>>>>>> expect.
> > > > > > > >>> > > > >>>>>>>
> > > > > > > >>> > > > >>>>>>> val conf = new FlinkConf().mode("yarn")
> > > > > > > >>> > > > >>>>>>> val env = new ExecutionEnvironment(conf)
> > > > > > > >>> > > > >>>>>>> val jobId = env.submit(...)
> > > > > > > >>> > > > >>>>>>> val jobStatus =
> > > > > > > >>> env.getClusterClient().queryJobStatus(jobId)
> > > > > > > >>> > > > >>>>>>> env.getClusterClient().cancelJob(jobId)
> > > > > > > >>> > > > >>>>>>>
> > > > > > > >>> > > > >>>>>>> What do you think ?
> > > > > > > >>> > > > >>>>>>>
> > > > > > > >>> > > > >>>>>>>
> > > > > > > >>> > > > >>>>>>>
> > > > > > > >>> > > > >>>>>>>
> > > > > > > >>> > > > >>>>>>> Till Rohrmann <trohrm...@apache.org>
> > > > 于2018年12月11日周二
> > > > > > > >>> 下午6:28写道:
> > > > > > > >>> > > > >>>>>>>
> > > > > > > >>> > > > >>>>>>>> Hi Jeff,
> > > > > > > >>> > > > >>>>>>>>
> > > > > > > >>> > > > >>>>>>>> what you are proposing is to provide the
> user
> > > with
> > > > > > > better
> > > > > > > >>> > > > >>>>> programmatic
> > > > > > > >>> > > > >>>>>>> job
> > > > > > > >>> > > > >>>>>>>> control. There was actually an effort to
> > achieve
> > > > > this
> > > > > > > but
> > > > > > > >>> it
> > > > > > > >>> > > > >> has
> > > > > > > >>> > > > >>>>> never
> > > > > > > >>> > > > >>>>>>> been
> > > > > > > >>> > > > >>>>>>>> completed [1]. However, there are some
> > > improvement
> > > > > in
> > > > > > > the
> > > > > > > >>> code
> > > > > > > >>> > > > >>> base
> > > > > > > >>> > > > >>>>>> now.
> > > > > > > >>> > > > >>>>>>>> Look for example at the NewClusterClient
> > > interface
> > > > > > which
> > > > > > > >>> > > > >> offers a
> > > > > > > >>> > > > >>>>>>>> non-blocking job submission. But I agree
> that
> > we
> > > > > need
> > > > > > to
> > > > > > > >>> > > > >> improve
> > > > > > > >>> > > > >>>>> Flink
> > > > > > > >>> > > > >>>>>> in
> > > > > > > >>> > > > >>>>>>>> this regard.
> > > > > > > >>> > > > >>>>>>>>
> > > > > > > >>> > > > >>>>>>>> I would not be in favour if exposing all
> > > > > ClusterClient
> > > > > > > >>> calls
> > > > > > > >>> > > > >> via
> > > > > > > >>> > > > >>>> the
> > > > > > > >>> > > > >>>>>>>> ExecutionEnvironment because it would
> clutter
> > > the
> > > > > > class
> > > > > > > >>> and
> > > > > > > >>> > > > >> would
> > > > > > > >>> > > > >>>> not
> > > > > > > >>> > > > >>>>>> be
> > > > > > > >>> > > > >>>>>>> a
> > > > > > > >>> > > > >>>>>>>> good separation of concerns. Instead one
> idea
> > > > could
> > > > > be
> > > > > > > to
> > > > > > > >>> > > > >>> retrieve
> > > > > > > >>> > > > >>>>> the
> > > > > > > >>> > > > >>>>>>>> current ClusterClient from the
> > > > ExecutionEnvironment
> > > > > > > which
> > > > > > > >>> can
> > > > > > > >>> > > > >>> then
> > > > > > > >>> > > > >>>> be
> > > > > > > >>> > > > >>>>>>> used
> > > > > > > >>> > > > >>>>>>>> for cluster and job control. But before we
> > start
> > > > an
> > > > > > > effort
> > > > > > > >>> > > > >> here,
> > > > > > > >>> > > > >>> we
> > > > > > > >>> > > > >>>>>> need
> > > > > > > >>> > > > >>>>>>> to
> > > > > > > >>> > > > >>>>>>>> agree and capture what functionality we want
> > to
> > > > > > provide.
> > > > > > > >>> > > > >>>>>>>>
> > > > > > > >>> > > > >>>>>>>> Initially, the idea was that we have the
> > > > > > > ClusterDescriptor
> > > > > > > >>> > > > >>>> describing
> > > > > > > >>> > > > >>>>>> how
> > > > > > > >>> > > > >>>>>>>> to talk to cluster manager like Yarn or
> Mesos.
> > > The
> > > > > > > >>> > > > >>>> ClusterDescriptor
> > > > > > > >>> > > > >>>>>> can
> > > > > > > >>> > > > >>>>>>> be
> > > > > > > >>> > > > >>>>>>>> used for deploying Flink clusters (job and
> > > > session)
> > > > > > and
> > > > > > > >>> gives
> > > > > > > >>> > > > >>> you a
> > > > > > > >>> > > > >>>>>>>> ClusterClient. The ClusterClient controls
> the
> > > > > cluster
> > > > > > > >>> (e.g.
> > > > > > > >>> > > > >>>>> submitting
> > > > > > > >>> > > > >>>>>>>> jobs, listing all running jobs). And then
> > there
> > > > was
> > > > > > the
> > > > > > > >>> idea
> > > > > > > >>> > to
> > > > > > > >>> > > > >>>>>>> introduce a
> > > > > > > >>> > > > >>>>>>>> JobClient which you obtain from the
> > > ClusterClient
> > > > to
> > > > > > > >>> trigger
> > > > > > > >>> > > > >> job
> > > > > > > >>> > > > >>>>>> specific
> > > > > > > >>> > > > >>>>>>>> operations (e.g. taking a savepoint,
> > cancelling
> > > > the
> > > > > > > job).
> > > > > > > >>> > > > >>>>>>>>
> > > > > > > >>> > > > >>>>>>>> [1]
> > > > > https://issues.apache.org/jira/browse/FLINK-4272
> > > > > > > >>> > > > >>>>>>>>
> > > > > > > >>> > > > >>>>>>>> Cheers,
> > > > > > > >>> > > > >>>>>>>> Till
> > > > > > > >>> > > > >>>>>>>>
> > > > > > > >>> > > > >>>>>>>> On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang
> <
> > > > > > > >>> zjf...@gmail.com
> > > > > > > >>> > >
> > > > > > > >>> > > > >>>>> wrote:
> > > > > > > >>> > > > >>>>>>>>
> > > > > > > >>> > > > >>>>>>>>> Hi Folks,
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>> I am trying to integrate flink into apache
> > > > zeppelin
> > > > > > > >>> which is
> > > > > > > >>> > > > >> an
> > > > > > > >>> > > > >>>>>>>> interactive
> > > > > > > >>> > > > >>>>>>>>> notebook. And I hit several issues that is
> > > caused
> > > > > by
> > > > > > > >>> flink
> > > > > > > >>> > > > >>> client
> > > > > > > >>> > > > >>>>>> api.
> > > > > > > >>> > > > >>>>>>> So
> > > > > > > >>> > > > >>>>>>>>> I'd like to proposal the following changes
> > for
> > > > > flink
> > > > > > > >>> client
> > > > > > > >>> > > > >>> api.
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>> 1. Support nonblocking execution.
> Currently,
> > > > > > > >>> > > > >>>>>>> ExecutionEnvironment#execute
> > > > > > > >>> > > > >>>>>>>>> is a blocking method which would do 2
> things,
> > > > first
> > > > > > > >>> submit
> > > > > > > >>> > > > >> job
> > > > > > > >>> > > > >>>> and
> > > > > > > >>> > > > >>>>>> then
> > > > > > > >>> > > > >>>>>>>>> wait for job until it is finished. I'd like
> > > > > > introduce a
> > > > > > > >>> > > > >>>> nonblocking
> > > > > > > >>> > > > >>>>>>>>> execution method like
> > > ExecutionEnvironment#submit
> > > > > > which
> > > > > > > >>> only
> > > > > > > >>> > > > >>>> submit
> > > > > > > >>> > > > >>>>>> job
> > > > > > > >>> > > > >>>>>>>> and
> > > > > > > >>> > > > >>>>>>>>> then return jobId to client. And allow user
> > to
> > > > > query
> > > > > > > the
> > > > > > > >>> job
> > > > > > > >>> > > > >>>> status
> > > > > > > >>> > > > >>>>>> via
> > > > > > > >>> > > > >>>>>>>> the
> > > > > > > >>> > > > >>>>>>>>> jobId.
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>> 2. Add cancel api in
> > > > > > > >>> > > > >>>>>
> ExecutionEnvironment/StreamExecutionEnvironment,
> > > > > > > >>> > > > >>>>>>>>> currently the only way to cancel job is via
> > cli
> > > > > > > >>> (bin/flink),
> > > > > > > >>> > > > >>> this
> > > > > > > >>> > > > >>>>> is
> > > > > > > >>> > > > >>>>>>> not
> > > > > > > >>> > > > >>>>>>>>> convenient for downstream project to use
> this
> > > > > > feature.
> > > > > > > >>> So I'd
> > > > > > > >>> > > > >>>> like
> > > > > > > >>> > > > >>>>> to
> > > > > > > >>> > > > >>>>>>> add
> > > > > > > >>> > > > >>>>>>>>> cancel api in ExecutionEnvironment
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>> 3. Add savepoint api in
> > > > > > > >>> > > > >>>>>>>
> > ExecutionEnvironment/StreamExecutionEnvironment.
> > > > > > > >>> > > > >>>>>>>> It
> > > > > > > >>> > > > >>>>>>>>> is similar as cancel api, we should use
> > > > > > > >>> ExecutionEnvironment
> > > > > > > >>> > > > >> as
> > > > > > > >>> > > > >>>> the
> > > > > > > >>> > > > >>>>>>>> unified
> > > > > > > >>> > > > >>>>>>>>> api for third party to integrate with
> flink.
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>> 4. Add listener for job execution
> lifecycle.
> > > > > > Something
> > > > > > > >>> like
> > > > > > > >>> > > > >>>>>> following,
> > > > > > > >>> > > > >>>>>>> so
> > > > > > > >>> > > > >>>>>>>>> that downstream project can do custom logic
> > in
> > > > the
> > > > > > > >>> lifecycle
> > > > > > > >>> > > > >> of
> > > > > > > >>> > > > >>>>> job.
> > > > > > > >>> > > > >>>>>>> e.g.
> > > > > > > >>> > > > >>>>>>>>> Zeppelin would capture the jobId after job
> is
> > > > > > submitted
> > > > > > > >>> and
> > > > > > > >>> > > > >>> then
> > > > > > > >>> > > > >>>>> use
> > > > > > > >>> > > > >>>>>>> this
> > > > > > > >>> > > > >>>>>>>>> jobId to cancel it later when necessary.
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>> public interface JobListener {
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>>   void onJobSubmitted(JobID jobId);
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>>   void onJobExecuted(JobExecutionResult
> > > > jobResult);
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>>   void onJobCanceled(JobID jobId);
> > > > > > > >>> > > > >>>>>>>>> }
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>> 5. Enable session in ExecutionEnvironment.
> > > > > Currently
> > > > > > it
> > > > > > > >>> is
> > > > > > > >>> > > > >>>>> disabled,
> > > > > > > >>> > > > >>>>>>> but
> > > > > > > >>> > > > >>>>>>>>> session is very convenient for third party
> to
> > > > > > > submitting
> > > > > > > >>> jobs
> > > > > > > >>> > > > >>>>>>>> continually.
> > > > > > > >>> > > > >>>>>>>>> I hope flink can enable it again.
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>> 6. Unify all flink client api into
> > > > > > > >>> > > > >>>>>>>>>
> > > ExecutionEnvironment/StreamExecutionEnvironment.
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>> This is a long term issue which needs more
> > > > careful
> > > > > > > >>> thinking
> > > > > > > >>> > > > >> and
> > > > > > > >>> > > > >>>>>> design.
> > > > > > > >>> > > > >>>>>>>>> Currently some of features of flink is
> > exposed
> > > in
> > > > > > > >>> > > > >>>>>>>>>
> > > ExecutionEnvironment/StreamExecutionEnvironment,
> > > > > but
> > > > > > > >>> some are
> > > > > > > >>> > > > >>>>> exposed
> > > > > > > >>> > > > >>>>>>> in
> > > > > > > >>> > > > >>>>>>>>> cli instead of api, like the cancel and
> > > > savepoint I
> > > > > > > >>> mentioned
> > > > > > > >>> > > > >>>>> above.
> > > > > > > >>> > > > >>>>>> I
> > > > > > > >>> > > > >>>>>>>>> think the root cause is due to that flink
> > > didn't
> > > > > > unify
> > > > > > > >>> the
> > > > > > > >>> > > > >>>>>> interaction
> > > > > > > >>> > > > >>>>>>>> with
> > > > > > > >>> > > > >>>>>>>>> flink. Here I list 3 scenarios of flink
> > > operation
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>>   - Local job execution.  Flink will create
> > > > > > > >>> LocalEnvironment
> > > > > > > >>> > > > >>> and
> > > > > > > >>> > > > >>>>>> then
> > > > > > > >>> > > > >>>>>>>> use
> > > > > > > >>> > > > >>>>>>>>>   this LocalEnvironment to create
> > LocalExecutor
> > > > for
> > > > > > job
> > > > > > > >>> > > > >>>> execution.
> > > > > > > >>> > > > >>>>>>>>>   - Remote job execution. Flink will create
> > > > > > > ClusterClient
> > > > > > > >>> > > > >>> first
> > > > > > > >>> > > > >>>>> and
> > > > > > > >>> > > > >>>>>>> then
> > > > > > > >>> > > > >>>>>>>>>   create ContextEnvironment based on the
> > > > > > ClusterClient
> > > > > > > >>> and
> > > > > > > >>> > > > >>> then
> > > > > > > >>> > > > >>>>> run
> > > > > > > >>> > > > >>>>>>> the
> > > > > > > >>> > > > >>>>>>>>> job.
> > > > > > > >>> > > > >>>>>>>>>   - Job cancelation. Flink will create
> > > > > ClusterClient
> > > > > > > >>> first
> > > > > > > >>> > > > >> and
> > > > > > > >>> > > > >>>>> then
> > > > > > > >>> > > > >>>>>>>> cancel
> > > > > > > >>> > > > >>>>>>>>>   this job via this ClusterClient.
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>> As you can see in the above 3 scenarios.
> > Flink
> > > > > didn't
> > > > > > > >>> use the
> > > > > > > >>> > > > >>>> same
> > > > > > > >>> > > > >>>>>>>>> approach(code path) to interact with flink
> > > > > > > >>> > > > >>>>>>>>> What I propose is following:
> > > > > > > >>> > > > >>>>>>>>> Create the proper
> > > > > LocalEnvironment/RemoteEnvironment
> > > > > > > >>> (based
> > > > > > > >>> > > > >> on
> > > > > > > >>> > > > >>>> user
> > > > > > > >>> > > > >>>>>>>>> configuration) --> Use this Environment to
> > > create
> > > > > > > proper
> > > > > > > >>> > > > >>>>>> ClusterClient
> > > > > > > >>> > > > >>>>>>>>> (LocalClusterClient or RestClusterClient)
> to
> > > > > > > interactive
> > > > > > > >>> with
> > > > > > > >>> > > > >>>>> Flink (
> > > > > > > >>> > > > >>>>>>> job
> > > > > > > >>> > > > >>>>>>>>> execution or cancelation)
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>> This way we can unify the process of local
> > > > > execution
> > > > > > > and
> > > > > > > >>> > > > >> remote
> > > > > > > >>> > > > >>>>>>>> execution.
> > > > > > > >>> > > > >>>>>>>>> And it is much easier for third party to
> > > > integrate
> > > > > > with
> > > > > > > >>> > > > >> flink,
> > > > > > > >>> > > > >>>>>> because
> > > > > > > >>> > > > >>>>>>>>> ExecutionEnvironment is the unified entry
> > point
> > > > for
> > > > > > > >>> flink.
> > > > > > > >>> > > > >> What
> > > > > > > >>> > > > >>>>> third
> > > > > > > >>> > > > >>>>>>>> party
> > > > > > > >>> > > > >>>>>>>>> needs to do is just pass configuration to
> > > > > > > >>> > > > >> ExecutionEnvironment
> > > > > > > >>> > > > >>>> and
> > > > > > > >>> > > > >>>>>>>>> ExecutionEnvironment will do the right
> thing
> > > > based
> > > > > on
> > > > > > > the
> > > > > > > >>> > > > >>>>>>> configuration.
> > > > > > > >>> > > > >>>>>>>>> Flink cli can also be considered as flink
> api
> > > > > > consumer.
> > > > > > > >>> it
> > > > > > > >>> > > > >> just
> > > > > > > >>> > > > >>>>> pass
> > > > > > > >>> > > > >>>>>>> the
> > > > > > > >>> > > > >>>>>>>>> configuration to ExecutionEnvironment and
> let
> > > > > > > >>> > > > >>>> ExecutionEnvironment
> > > > > > > >>> > > > >>>>> to
> > > > > > > >>> > > > >>>>>>>>> create the proper ClusterClient instead of
> > > > letting
> > > > > > cli
> > > > > > > to
> > > > > > > >>> > > > >>> create
> > > > > > > >>> > > > >>>>>>>>> ClusterClient directly.
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>> 6 would involve large code refactoring, so
> I
> > > > think
> > > > > we
> > > > > > > can
> > > > > > > >>> > > > >> defer
> > > > > > > >>> > > > >>>> it
> > > > > > > >>> > > > >>>>>> for
> > > > > > > >>> > > > >>>>>>>>> future release, 1,2,3,4,5 could be done at
> > > once I
> > > > > > > >>> believe.
> > > > > > > >>> > > > >> Let
> > > > > > > >>> > > > >>> me
> > > > > > > >>> > > > >>>>>> know
> > > > > > > >>> > > > >>>>>>>> your
> > > > > > > >>> > > > >>>>>>>>> comments and feedback, thanks
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>> --
> > > > > > > >>> > > > >>>>>>>>> Best Regards
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>> Jeff Zhang
> > > > > > > >>> > > > >>>>>>>>>
> > > > > > > >>> > > > >>>>>>>>
> > > > > > > >>> > > > >>>>>>>
> > > > > > > >>> > > > >>>>>>>
> > > > > > > >>> > > > >>>>>>> --
> > > > > > > >>> > > > >>>>>>> Best Regards
> > > > > > > >>> > > > >>>>>>>
> > > > > > > >>> > > > >>>>>>> Jeff Zhang
> > > > > > > >>> > > > >>>>>>>
> > > > > > > >>> > > > >>>>>>
> > > > > > > >>> > > > >>>>>
> > > > > > > >>> > > > >>>>>
> > > > > > > >>> > > > >>>>> --
> > > > > > > >>> > > > >>>>> Best Regards
> > > > > > > >>> > > > >>>>>
> > > > > > > >>> > > > >>>>> Jeff Zhang
> > > > > > > >>> > > > >>>>>
> > > > > > > >>> > > > >>>>
> > > > > > > >>> > > > >>>
> > > > > > > >>> > > > >>>
> > > > > > > >>> > > > >>> --
> > > > > > > >>> > > > >>> Best Regards
> > > > > > > >>> > > > >>>
> > > > > > > >>> > > > >>> Jeff Zhang
> > > > > > > >>> > > > >>>
> > > > > > > >>> > > > >>
> > > > > > > >>> > > > >
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > --
> > > > > > > >>> > > > > Best Regards
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > Jeff Zhang
> > > > > > > >>> > > >
> > > > > > > >>> > > >
> > > > > > > >>> > >
> > > > > > > >>> > > --
> > > > > > > >>> > > Best Regards
> > > > > > > >>> > >
> > > > > > > >>> > > Jeff Zhang
> > > > > > > >>> > >
> > > > > > > >>> >
> > > > > > > >>>
> > > > > > > >>
> > > > > > >
> > > > > > > --
> > > > > > > Best Regards
> > > > > > >
> > > > > > > Jeff Zhang
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Best Regards
> > > >
> > > > Jeff Zhang
> > > >
> > >
> >
>


-- 
Best Regards

Jeff Zhang

Reply via email to