Thanks tison for the effort. I left a few comments.
Zili Chen <wander4...@gmail.com> 于2019年7月31日周三 下午8:24写道: > Hi Flavio, > > Thanks for your reply. > > Either current impl and in the design, ClusterClient > never takes responsibility for generating JobGraph. > (what you see in current codebase is several class methods) > > Instead, user describes his program in the main method > with ExecutionEnvironment apis and calls env.compile() > or env.optimize() to get FlinkPlan and JobGraph respectively. > > For listing main classes in a jar and choose one for > submission, you're now able to customize a CLI to do it. > Specifically, the path of jar is passed as arguments and > in the customized CLI you list main classes, choose one > to submit to the cluster. > > Best, > tison. > > > Flavio Pompermaier <pomperma...@okkam.it> 于2019年7月31日周三 下午8:12写道: > > > Just one note on my side: it is not clear to me whether the client needs > to > > be able to generate a job graph or not. > > In my opinion, the job jar must resides only on the server/jobManager > side > > and the client requires a way to get the job graph. > > If you really want to access to the job graph, I'd add a dedicated method > > on the ClusterClient. like: > > > > - getJobGraph(jarId, mainClass): JobGraph > > - listMainClasses(jarId): List<String> > > > > These would require some addition also on the job manager endpoint as > > well..what do you think? > > > > On Wed, Jul 31, 2019 at 12:42 PM Zili Chen <wander4...@gmail.com> wrote: > > > > > Hi all, > > > > > > Here is a document[1] on client api enhancement from our perspective. > > > We have investigated current implementations. And we propose > > > > > > 1. Unify the implementation of cluster deployment and job submission in > > > Flink. > > > 2. Provide programmatic interfaces to allow flexible job and cluster > > > management. > > > > > > The first proposal is aimed at reducing code paths of cluster > deployment > > > and > > > job submission so that one can adopt Flink in his usage easily. The > > second > > > proposal is aimed at providing rich interfaces for advanced users > > > who want to make accurate control of these stages. > > > > > > Quick reference on open questions: > > > > > > 1. Exclude job cluster deployment from client side or redefine the > > semantic > > > of job cluster? Since it fits in a process quite different from session > > > cluster deployment and job submission. > > > > > > 2. Maintain the codepaths handling class o.a.f.api.common.Program or > > > implement customized program handling logic by customized CliFrontend? > > > See also this thread[2] and the document[1]. > > > > > > 3. Expose ClusterClient as public api or just expose api in > > > ExecutionEnvironment > > > and delegate them to ClusterClient? Further, in either way is it worth > to > > > introduce a JobClient which is an encapsulation of ClusterClient that > > > associated to specific job? > > > > > > Best, > > > tison. > > > > > > [1] > > > > > > > > > https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit?usp=sharing > > > [2] > > > > > > > > > https://lists.apache.org/thread.html/7ffc9936a384b891dbcf0a481d26c6d13b2125607c200577780d1e18@%3Cdev.flink.apache.org%3E > > > > > > Jeff Zhang <zjf...@gmail.com> 于2019年7月24日周三 上午9:19写道: > > > > > > > Thanks Stephan, I will follow up this issue in next few weeks, and > will > > > > refine the design doc. We could discuss more details after 1.9 > release. > > > > > > > > Stephan Ewen <se...@apache.org> 于2019年7月24日周三 上午12:58写道: > > > > > > > > > Hi all! > > > > > > > > > > This thread has stalled for a bit, which I assume ist mostly due to > > the > > > > > Flink 1.9 feature freeze and release testing effort. > > > > > > > > > > I personally still recognize this issue as one important to be > > solved. > > > > I'd > > > > > be happy to help resume this discussion soon (after the 1.9 > release) > > > and > > > > > see if we can do some step towards this in Flink 1.10. > > > > > > > > > > Best, > > > > > Stephan > > > > > > > > > > > > > > > > > > > > On Mon, Jun 24, 2019 at 10:41 AM Flavio Pompermaier < > > > > pomperma...@okkam.it> > > > > > wrote: > > > > > > > > > > > That's exactly what I suggested a long time ago: the Flink REST > > > client > > > > > > should not require any Flink dependency, only http library to > call > > > the > > > > > REST > > > > > > services to submit and monitor a job. > > > > > > What I suggested also in [1] was to have a way to automatically > > > suggest > > > > > the > > > > > > user (via a UI) the available main classes and their required > > > > > > parameters[2]. > > > > > > Another problem we have with Flink is that the Rest client and > the > > > CLI > > > > > one > > > > > > behaves differently and we use the CLI client (via ssh) because > it > > > > allows > > > > > > to call some other method after env.execute() [3] (we have to > call > > > > > another > > > > > > REST service to signal the end of the job). > > > > > > Int his regard, a dedicated interface, like the JobListener > > suggested > > > > in > > > > > > the previous emails, would be very helpful (IMHO). > > > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-10864 > > > > > > [2] https://issues.apache.org/jira/browse/FLINK-10862 > > > > > > [3] https://issues.apache.org/jira/browse/FLINK-10879 > > > > > > > > > > > > Best, > > > > > > Flavio > > > > > > > > > > > > On Mon, Jun 24, 2019 at 9:54 AM Jeff Zhang <zjf...@gmail.com> > > wrote: > > > > > > > > > > > > > Hi, Tison, > > > > > > > > > > > > > > Thanks for your comments. Overall I agree with you that it is > > > > difficult > > > > > > for > > > > > > > down stream project to integrate with flink and we need to > > refactor > > > > the > > > > > > > current flink client api. > > > > > > > And I agree that CliFrontend should only parsing command line > > > > arguments > > > > > > and > > > > > > > then pass them to ExecutionEnvironment. It is > > > ExecutionEnvironment's > > > > > > > responsibility to compile job, create cluster, and submit job. > > > > Besides > > > > > > > that, Currently flink has many ExecutionEnvironment > > > implementations, > > > > > and > > > > > > > flink will use the specific one based on the context. IMHO, it > is > > > not > > > > > > > necessary, ExecutionEnvironment should be able to do the right > > > thing > > > > > > based > > > > > > > on the FlinkConf it is received. Too many ExecutionEnvironment > > > > > > > implementation is another burden for downstream project > > > integration. > > > > > > > > > > > > > > One thing I'd like to mention is flink's scala shell and sql > > > client, > > > > > > > although they are sub-modules of flink, they could be treated > as > > > > > > downstream > > > > > > > project which use flink's client api. Currently you will find > it > > is > > > > not > > > > > > > easy for them to integrate with flink, they share many > duplicated > > > > code. > > > > > > It > > > > > > > is another sign that we should refactor flink client api. > > > > > > > > > > > > > > I believe it is a large and hard change, and I am afraid we can > > not > > > > > keep > > > > > > > compatibility since many of changes are user facing. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Zili Chen <wander4...@gmail.com> 于2019年6月24日周一 下午2:53写道: > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > After a closer look on our client apis, I can see there are > two > > > > major > > > > > > > > issues to consistency and integration, namely different > > > deployment > > > > of > > > > > > > > job cluster which couples job graph creation and cluster > > > > deployment, > > > > > > > > and submission via CliFrontend confusing control flow of job > > > graph > > > > > > > > compilation and job submission. I'd like to follow the > discuss > > > > above, > > > > > > > > mainly the process described by Jeff and Stephan, and share > my > > > > > > > > ideas on these issues. > > > > > > > > > > > > > > > > 1) CliFrontend confuses the control flow of job compilation > and > > > > > > > submission. > > > > > > > > Following the process of job submission Stephan and Jeff > > > described, > > > > > > > > execution environment knows all configs of the cluster and > > > > > > topos/settings > > > > > > > > of the job. Ideally, in the main method of user program, it > > calls > > > > > > > #execute > > > > > > > > (or named #submit) and Flink deploys the cluster, compile the > > job > > > > > graph > > > > > > > > and submit it to the cluster. However, current CliFrontend > does > > > all > > > > > > these > > > > > > > > things inside its #runProgram method, which introduces a lot > of > > > > > > > subclasses > > > > > > > > of (stream) execution environment. > > > > > > > > > > > > > > > > Actually, it sets up an exec env that hijacks the > > > > > #execute/executePlan > > > > > > > > method, initializes the job graph and abort execution. And > then > > > > > > > > control flow back to CliFrontend, it deploys the cluster(or > > > > retrieve > > > > > > > > the client) and submits the job graph. This is quite a > specific > > > > > > internal > > > > > > > > process inside Flink and none of consistency to anything. > > > > > > > > > > > > > > > > 2) Deployment of job cluster couples job graph creation and > > > cluster > > > > > > > > deployment. Abstractly, from user job to a concrete > submission, > > > it > > > > > > > requires > > > > > > > > > > > > > > > > create JobGraph --\ > > > > > > > > > > > > > > > > create ClusterClient --> submit JobGraph > > > > > > > > > > > > > > > > such a dependency. ClusterClient was created by deploying or > > > > > > retrieving. > > > > > > > > JobGraph submission requires a compiled JobGraph and valid > > > > > > ClusterClient, > > > > > > > > but the creation of ClusterClient is abstractly independent > of > > > that > > > > > of > > > > > > > > JobGraph. However, in job cluster mode, we deploy job cluster > > > with > > > > a > > > > > > job > > > > > > > > graph, which means we use another process: > > > > > > > > > > > > > > > > create JobGraph --> deploy cluster with the JobGraph > > > > > > > > > > > > > > > > Here is another inconsistency and downstream projects/client > > apis > > > > are > > > > > > > > forced to handle different cases with rare supports from > Flink. > > > > > > > > > > > > > > > > Since we likely reached a consensus on > > > > > > > > > > > > > > > > 1. all configs gathered by Flink configuration and passed > > > > > > > > 2. execution environment knows all configs and handles > > > > execution(both > > > > > > > > deployment and submission) > > > > > > > > > > > > > > > > to the issues above I propose eliminating inconsistencies by > > > > > following > > > > > > > > approach: > > > > > > > > > > > > > > > > 1) CliFrontend should exactly be a front end, at least for > > "run" > > > > > > command. > > > > > > > > That means it just gathered and passed all config from > command > > > line > > > > > to > > > > > > > > the main method of user program. Execution environment knows > > all > > > > the > > > > > > info > > > > > > > > and with an addition to utils for ClusterClient, we > gracefully > > > get > > > > a > > > > > > > > ClusterClient by deploying or retrieving. In this way, we > don't > > > > need > > > > > to > > > > > > > > hijack #execute/executePlan methods and can remove various > > > hacking > > > > > > > > subclasses of exec env, as well as #run methods in > > > > ClusterClient(for > > > > > an > > > > > > > > interface-ized ClusterClient). Now the control flow flows > from > > > > > > > CliFrontend > > > > > > > > to the main method and never returns. > > > > > > > > > > > > > > > > 2) Job cluster means a cluster for the specific job. From > > another > > > > > > > > perspective, it is an ephemeral session. We may decouple the > > > > > deployment > > > > > > > > with a compiled job graph, but start a session with idle > > timeout > > > > > > > > and submit the job following. > > > > > > > > > > > > > > > > These topics, before we go into more details on design or > > > > > > implementation, > > > > > > > > are better to be aware and discussed for a consensus. > > > > > > > > > > > > > > > > Best, > > > > > > > > tison. > > > > > > > > > > > > > > > > > > > > > > > > Zili Chen <wander4...@gmail.com> 于2019年6月20日周四 上午3:21写道: > > > > > > > > > > > > > > > >> Hi Jeff, > > > > > > > >> > > > > > > > >> Thanks for raising this thread and the design document! > > > > > > > >> > > > > > > > >> As @Thomas Weise mentioned above, extending config to flink > > > > > > > >> requires far more effort than it should be. Another example > > > > > > > >> is we achieve detach mode by introduce another execution > > > > > > > >> environment which also hijack #execute method. > > > > > > > >> > > > > > > > >> I agree with your idea that user would configure all things > > > > > > > >> and flink "just" respect it. On this topic I think the > unusual > > > > > > > >> control flow when CliFrontend handle "run" command is the > > > problem. > > > > > > > >> It handles several configs, mainly about cluster settings, > and > > > > > > > >> thus main method of user program is unaware of them. Also it > > > > > compiles > > > > > > > >> app to job graph by run the main method with a hijacked exec > > > env, > > > > > > > >> which constrain the main method further. > > > > > > > >> > > > > > > > >> I'd like to write down a few of notes on configs/args pass > and > > > > > > respect, > > > > > > > >> as well as decoupling job compilation and submission. Share > on > > > > this > > > > > > > >> thread later. > > > > > > > >> > > > > > > > >> Best, > > > > > > > >> tison. > > > > > > > >> > > > > > > > >> > > > > > > > >> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月17日周一 > 下午7:29写道: > > > > > > > >> > > > > > > > >>> Hi Jeff and Flavio, > > > > > > > >>> > > > > > > > >>> Thanks Jeff a lot for proposing the design document. > > > > > > > >>> > > > > > > > >>> We are also working on refactoring ClusterClient to allow > > > > flexible > > > > > > and > > > > > > > >>> efficient job management in our real-time platform. > > > > > > > >>> We would like to draft a document to share our ideas with > > you. > > > > > > > >>> > > > > > > > >>> I think it's a good idea to have something like Apache Livy > > for > > > > > > Flink, > > > > > > > >>> and > > > > > > > >>> the efforts discussed here will take a great step forward > to > > > it. > > > > > > > >>> > > > > > > > >>> Regards, > > > > > > > >>> Xiaogang > > > > > > > >>> > > > > > > > >>> Flavio Pompermaier <pomperma...@okkam.it> 于2019年6月17日周一 > > > > 下午7:13写道: > > > > > > > >>> > > > > > > > >>> > Is there any possibility to have something like Apache > Livy > > > [1] > > > > > > also > > > > > > > >>> for > > > > > > > >>> > Flink in the future? > > > > > > > >>> > > > > > > > > >>> > [1] https://livy.apache.org/ > > > > > > > >>> > > > > > > > > >>> > On Tue, Jun 11, 2019 at 5:23 PM Jeff Zhang < > > zjf...@gmail.com > > > > > > > > > > wrote: > > > > > > > >>> > > > > > > > > >>> > > >>> Any API we expose should not have dependencies on > > the > > > > > > runtime > > > > > > > >>> > > (flink-runtime) package or other implementation > details. > > To > > > > me, > > > > > > > this > > > > > > > >>> > means > > > > > > > >>> > > that the current ClusterClient cannot be exposed to > users > > > > > because > > > > > > > it > > > > > > > >>> > uses > > > > > > > >>> > > quite some classes from the optimiser and runtime > > packages. > > > > > > > >>> > > > > > > > > > >>> > > We should change ClusterClient from class to interface. > > > > > > > >>> > > ExecutionEnvironment only use the interface > ClusterClient > > > > which > > > > > > > >>> should be > > > > > > > >>> > > in flink-clients while the concrete implementation > class > > > > could > > > > > be > > > > > > > in > > > > > > > >>> > > flink-runtime. > > > > > > > >>> > > > > > > > > > >>> > > >>> What happens when a failure/restart in the client > > > > happens? > > > > > > > There > > > > > > > >>> need > > > > > > > >>> > > to be a way of re-establishing the connection to the > job, > > > set > > > > > up > > > > > > > the > > > > > > > >>> > > listeners again, etc. > > > > > > > >>> > > > > > > > > > >>> > > Good point. First we need to define what does > > > > failure/restart > > > > > in > > > > > > > the > > > > > > > >>> > > client mean. IIUC, that usually mean network failure > > which > > > > will > > > > > > > >>> happen in > > > > > > > >>> > > class RestClient. If my understanding is correct, > > > > restart/retry > > > > > > > >>> mechanism > > > > > > > >>> > > should be done in RestClient. > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > Aljoscha Krettek <aljos...@apache.org> 于2019年6月11日周二 > > > > > 下午11:10写道: > > > > > > > >>> > > > > > > > > > >>> > > > Some points to consider: > > > > > > > >>> > > > > > > > > > > >>> > > > * Any API we expose should not have dependencies on > the > > > > > runtime > > > > > > > >>> > > > (flink-runtime) package or other implementation > > details. > > > To > > > > > me, > > > > > > > >>> this > > > > > > > >>> > > means > > > > > > > >>> > > > that the current ClusterClient cannot be exposed to > > users > > > > > > because > > > > > > > >>> it > > > > > > > >>> > > uses > > > > > > > >>> > > > quite some classes from the optimiser and runtime > > > packages. > > > > > > > >>> > > > > > > > > > > >>> > > > * What happens when a failure/restart in the client > > > > happens? > > > > > > > There > > > > > > > >>> need > > > > > > > >>> > > to > > > > > > > >>> > > > be a way of re-establishing the connection to the > job, > > > set > > > > up > > > > > > the > > > > > > > >>> > > listeners > > > > > > > >>> > > > again, etc. > > > > > > > >>> > > > > > > > > > > >>> > > > Aljoscha > > > > > > > >>> > > > > > > > > > > >>> > > > > On 29. May 2019, at 10:17, Jeff Zhang < > > > zjf...@gmail.com> > > > > > > > wrote: > > > > > > > >>> > > > > > > > > > > > >>> > > > > Sorry folks, the design doc is late as you > expected. > > > > Here's > > > > > > the > > > > > > > >>> > design > > > > > > > >>> > > > doc > > > > > > > >>> > > > > I drafted, welcome any comments and feedback. > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > Stephan Ewen <se...@apache.org> 于2019年2月14日周四 > > > 下午8:43写道: > > > > > > > >>> > > > > > > > > > > > >>> > > > >> Nice that this discussion is happening. > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> In the FLIP, we could also revisit the entire role > > of > > > > the > > > > > > > >>> > environments > > > > > > > >>> > > > >> again. > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> Initially, the idea was: > > > > > > > >>> > > > >> - the environments take care of the specific > setup > > > for > > > > > > > >>> standalone > > > > > > > >>> > (no > > > > > > > >>> > > > >> setup needed), yarn, mesos, etc. > > > > > > > >>> > > > >> - the session ones have control over the session. > > The > > > > > > > >>> environment > > > > > > > >>> > > holds > > > > > > > >>> > > > >> the session client. > > > > > > > >>> > > > >> - running a job gives a "control" object for that > > > job. > > > > > That > > > > > > > >>> > behavior > > > > > > > >>> > > is > > > > > > > >>> > > > >> the same in all environments. > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> The actual implementation diverged quite a bit > from > > > > that. > > > > > > > Happy > > > > > > > >>> to > > > > > > > >>> > > see a > > > > > > > >>> > > > >> discussion about straitening this out a bit more. > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> On Tue, Feb 12, 2019 at 4:58 AM Jeff Zhang < > > > > > > zjf...@gmail.com> > > > > > > > >>> > wrote: > > > > > > > >>> > > > >> > > > > > > > >>> > > > >>> Hi folks, > > > > > > > >>> > > > >>> > > > > > > > >>> > > > >>> Sorry for late response, It seems we reach > > consensus > > > on > > > > > > > this, I > > > > > > > >>> > will > > > > > > > >>> > > > >> create > > > > > > > >>> > > > >>> FLIP for this with more detailed design > > > > > > > >>> > > > >>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > > >>> Thomas Weise <t...@apache.org> 于2018年12月21日周五 > > > > 上午11:43写道: > > > > > > > >>> > > > >>> > > > > > > > >>> > > > >>>> Great to see this discussion seeded! The > problems > > > you > > > > > face > > > > > > > >>> with > > > > > > > >>> > the > > > > > > > >>> > > > >>>> Zeppelin integration are also affecting other > > > > downstream > > > > > > > >>> projects, > > > > > > > >>> > > > like > > > > > > > >>> > > > >>>> Beam. > > > > > > > >>> > > > >>>> > > > > > > > >>> > > > >>>> We just enabled the savepoint restore option in > > > > > > > >>> > > > RemoteStreamEnvironment > > > > > > > >>> > > > >>> [1] > > > > > > > >>> > > > >>>> and that was more difficult than it should be. > The > > > > main > > > > > > > issue > > > > > > > >>> is > > > > > > > >>> > > that > > > > > > > >>> > > > >>>> environment and cluster client aren't decoupled. > > > > Ideally > > > > > > it > > > > > > > >>> should > > > > > > > >>> > > be > > > > > > > >>> > > > >>>> possible to just get the matching cluster client > > > from > > > > > the > > > > > > > >>> > > environment > > > > > > > >>> > > > >> and > > > > > > > >>> > > > >>>> then control the job through it (environment as > > > > factory > > > > > > for > > > > > > > >>> > cluster > > > > > > > >>> > > > >>>> client). But note that the environment classes > are > > > > part > > > > > of > > > > > > > the > > > > > > > >>> > > public > > > > > > > >>> > > > >>> API, > > > > > > > >>> > > > >>>> and it is not straightforward to make larger > > changes > > > > > > without > > > > > > > >>> > > breaking > > > > > > > >>> > > > >>>> backward compatibility. > > > > > > > >>> > > > >>>> > > > > > > > >>> > > > >>>> ClusterClient currently exposes internal classes > > > like > > > > > > > >>> JobGraph and > > > > > > > >>> > > > >>>> StreamGraph. But it should be possible to wrap > > this > > > > > with a > > > > > > > new > > > > > > > >>> > > public > > > > > > > >>> > > > >> API > > > > > > > >>> > > > >>>> that brings the required job control > capabilities > > > for > > > > > > > >>> downstream > > > > > > > >>> > > > >>> projects. > > > > > > > >>> > > > >>>> Perhaps it is helpful to look at some of the > > > > interfaces > > > > > in > > > > > > > >>> Beam > > > > > > > >>> > > while > > > > > > > >>> > > > >>>> thinking about this: [2] for the portable job > API > > > and > > > > > [3] > > > > > > > for > > > > > > > >>> the > > > > > > > >>> > > old > > > > > > > >>> > > > >>>> asynchronous job control from the Beam Java SDK. > > > > > > > >>> > > > >>>> > > > > > > > >>> > > > >>>> The backward compatibility discussion [4] is > also > > > > > relevant > > > > > > > >>> here. A > > > > > > > >>> > > new > > > > > > > >>> > > > >>> API > > > > > > > >>> > > > >>>> should shield downstream projects from internals > > and > > > > > allow > > > > > > > >>> them to > > > > > > > >>> > > > >>>> interoperate with multiple future Flink versions > > in > > > > the > > > > > > same > > > > > > > >>> > release > > > > > > > >>> > > > >> line > > > > > > > >>> > > > >>>> without forced upgrades. > > > > > > > >>> > > > >>>> > > > > > > > >>> > > > >>>> Thanks, > > > > > > > >>> > > > >>>> Thomas > > > > > > > >>> > > > >>>> > > > > > > > >>> > > > >>>> [1] https://github.com/apache/flink/pull/7249 > > > > > > > >>> > > > >>>> [2] > > > > > > > >>> > > > >>>> > > > > > > > >>> > > > >>>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > > >> > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto > > > > > > > >>> > > > >>>> [3] > > > > > > > >>> > > > >>>> > > > > > > > >>> > > > >>>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > > >> > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java > > > > > > > >>> > > > >>>> [4] > > > > > > > >>> > > > >>>> > > > > > > > >>> > > > >>>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > > >> > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E > > > > > > > >>> > > > >>>> > > > > > > > >>> > > > >>>> > > > > > > > >>> > > > >>>> On Thu, Dec 20, 2018 at 6:15 PM Jeff Zhang < > > > > > > > zjf...@gmail.com> > > > > > > > >>> > > wrote: > > > > > > > >>> > > > >>>> > > > > > > > >>> > > > >>>>>>>> I'm not so sure whether the user should be > > able > > > to > > > > > > > define > > > > > > > >>> > where > > > > > > > >>> > > > >> the > > > > > > > >>> > > > >>>> job > > > > > > > >>> > > > >>>>> runs (in your example Yarn). This is actually > > > > > independent > > > > > > > of > > > > > > > >>> the > > > > > > > >>> > > job > > > > > > > >>> > > > >>>>> development and is something which is decided > at > > > > > > deployment > > > > > > > >>> time. > > > > > > > >>> > > > >>>>> > > > > > > > >>> > > > >>>>> User don't need to specify execution mode > > > > > > programmatically. > > > > > > > >>> They > > > > > > > >>> > > can > > > > > > > >>> > > > >>> also > > > > > > > >>> > > > >>>>> pass the execution mode from the arguments in > > flink > > > > run > > > > > > > >>> command. > > > > > > > >>> > > e.g. > > > > > > > >>> > > > >>>>> > > > > > > > >>> > > > >>>>> bin/flink run -m yarn-cluster .... > > > > > > > >>> > > > >>>>> bin/flink run -m local ... > > > > > > > >>> > > > >>>>> bin/flink run -m host:port ... > > > > > > > >>> > > > >>>>> > > > > > > > >>> > > > >>>>> Does this make sense to you ? > > > > > > > >>> > > > >>>>> > > > > > > > >>> > > > >>>>>>>> To me it makes sense that the > > > ExecutionEnvironment > > > > > is > > > > > > > not > > > > > > > >>> > > > >> directly > > > > > > > >>> > > > >>>>> initialized by the user and instead context > > > sensitive > > > > > how > > > > > > > you > > > > > > > >>> > want > > > > > > > >>> > > to > > > > > > > >>> > > > >>>>> execute your job (Flink CLI vs. IDE, for > > example). > > > > > > > >>> > > > >>>>> > > > > > > > >>> > > > >>>>> Right, currently I notice Flink would create > > > > different > > > > > > > >>> > > > >>>>> ContextExecutionEnvironment based on different > > > > > submission > > > > > > > >>> > scenarios > > > > > > > >>> > > > >>>> (Flink > > > > > > > >>> > > > >>>>> Cli vs IDE). To me this is kind of hack > approach, > > > not > > > > > so > > > > > > > >>> > > > >>> straightforward. > > > > > > > >>> > > > >>>>> What I suggested above is that is that flink > > should > > > > > > always > > > > > > > >>> create > > > > > > > >>> > > the > > > > > > > >>> > > > >>>> same > > > > > > > >>> > > > >>>>> ExecutionEnvironment but with different > > > > configuration, > > > > > > and > > > > > > > >>> based > > > > > > > >>> > on > > > > > > > >>> > > > >> the > > > > > > > >>> > > > >>>>> configuration it would create the proper > > > > ClusterClient > > > > > > for > > > > > > > >>> > > different > > > > > > > >>> > > > >>>>> behaviors. > > > > > > > >>> > > > >>>>> > > > > > > > >>> > > > >>>>> > > > > > > > >>> > > > >>>>> > > > > > > > >>> > > > >>>>> > > > > > > > >>> > > > >>>>> > > > > > > > >>> > > > >>>>> > > > > > > > >>> > > > >>>>> > > > > > > > >>> > > > >>>>> Till Rohrmann <trohrm...@apache.org> > > > 于2018年12月20日周四 > > > > > > > >>> 下午11:18写道: > > > > > > > >>> > > > >>>>> > > > > > > > >>> > > > >>>>>> You are probably right that we have code > > > duplication > > > > > > when > > > > > > > it > > > > > > > >>> > comes > > > > > > > >>> > > > >> to > > > > > > > >>> > > > >>>> the > > > > > > > >>> > > > >>>>>> creation of the ClusterClient. This should be > > > > reduced > > > > > in > > > > > > > the > > > > > > > >>> > > > >> future. > > > > > > > >>> > > > >>>>>> > > > > > > > >>> > > > >>>>>> I'm not so sure whether the user should be > able > > to > > > > > > define > > > > > > > >>> where > > > > > > > >>> > > the > > > > > > > >>> > > > >>> job > > > > > > > >>> > > > >>>>>> runs (in your example Yarn). This is actually > > > > > > independent > > > > > > > >>> of the > > > > > > > >>> > > > >> job > > > > > > > >>> > > > >>>>>> development and is something which is decided > at > > > > > > > deployment > > > > > > > >>> > time. > > > > > > > >>> > > > >> To > > > > > > > >>> > > > >>> me > > > > > > > >>> > > > >>>>> it > > > > > > > >>> > > > >>>>>> makes sense that the ExecutionEnvironment is > not > > > > > > directly > > > > > > > >>> > > > >> initialized > > > > > > > >>> > > > >>>> by > > > > > > > >>> > > > >>>>>> the user and instead context sensitive how you > > > want > > > > to > > > > > > > >>> execute > > > > > > > >>> > > your > > > > > > > >>> > > > >>> job > > > > > > > >>> > > > >>>>>> (Flink CLI vs. IDE, for example). However, I > > agree > > > > > that > > > > > > > the > > > > > > > >>> > > > >>>>>> ExecutionEnvironment should give you access to > > the > > > > > > > >>> ClusterClient > > > > > > > >>> > > > >> and > > > > > > > >>> > > > >>> to > > > > > > > >>> > > > >>>>> the > > > > > > > >>> > > > >>>>>> job (maybe in the form of the JobGraph or a > job > > > > plan). > > > > > > > >>> > > > >>>>>> > > > > > > > >>> > > > >>>>>> Cheers, > > > > > > > >>> > > > >>>>>> Till > > > > > > > >>> > > > >>>>>> > > > > > > > >>> > > > >>>>>> On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang < > > > > > > > >>> zjf...@gmail.com> > > > > > > > >>> > > > >> wrote: > > > > > > > >>> > > > >>>>>> > > > > > > > >>> > > > >>>>>>> Hi Till, > > > > > > > >>> > > > >>>>>>> Thanks for the feedback. You are right that I > > > > expect > > > > > > > better > > > > > > > >>> > > > >>>>> programmatic > > > > > > > >>> > > > >>>>>>> job submission/control api which could be > used > > by > > > > > > > >>> downstream > > > > > > > >>> > > > >>> project. > > > > > > > >>> > > > >>>>> And > > > > > > > >>> > > > >>>>>>> it would benefit for the flink ecosystem. > When > > I > > > > look > > > > > > at > > > > > > > >>> the > > > > > > > >>> > code > > > > > > > >>> > > > >>> of > > > > > > > >>> > > > >>>>>> flink > > > > > > > >>> > > > >>>>>>> scala-shell and sql-client (I believe they > are > > > not > > > > > the > > > > > > > >>> core of > > > > > > > >>> > > > >>> flink, > > > > > > > >>> > > > >>>>> but > > > > > > > >>> > > > >>>>>>> belong to the ecosystem of flink), I find > many > > > > > > duplicated > > > > > > > >>> code > > > > > > > >>> > > > >> for > > > > > > > >>> > > > >>>>>> creating > > > > > > > >>> > > > >>>>>>> ClusterClient from user provided > configuration > > > > > > > >>> (configuration > > > > > > > >>> > > > >>> format > > > > > > > >>> > > > >>>>> may > > > > > > > >>> > > > >>>>>> be > > > > > > > >>> > > > >>>>>>> different from scala-shell and sql-client) > and > > > then > > > > > use > > > > > > > >>> that > > > > > > > >>> > > > >>>>>> ClusterClient > > > > > > > >>> > > > >>>>>>> to manipulate jobs. I don't think this is > > > > convenient > > > > > > for > > > > > > > >>> > > > >> downstream > > > > > > > >>> > > > >>>>>>> projects. What I expect is that downstream > > > project > > > > > only > > > > > > > >>> needs > > > > > > > >>> > to > > > > > > > >>> > > > >>>>> provide > > > > > > > >>> > > > >>>>>>> necessary configuration info (maybe > introducing > > > > class > > > > > > > >>> > FlinkConf), > > > > > > > >>> > > > >>> and > > > > > > > >>> > > > >>>>>> then > > > > > > > >>> > > > >>>>>>> build ExecutionEnvironment based on this > > > FlinkConf, > > > > > and > > > > > > > >>> > > > >>>>>>> ExecutionEnvironment will create the proper > > > > > > > ClusterClient. > > > > > > > >>> It > > > > > > > >>> > not > > > > > > > >>> > > > >>>> only > > > > > > > >>> > > > >>>>>>> benefit for the downstream project > development > > > but > > > > > also > > > > > > > be > > > > > > > >>> > > > >> helpful > > > > > > > >>> > > > >>>> for > > > > > > > >>> > > > >>>>>>> their integration test with flink. Here's one > > > > sample > > > > > > code > > > > > > > >>> > snippet > > > > > > > >>> > > > >>>> that > > > > > > > >>> > > > >>>>> I > > > > > > > >>> > > > >>>>>>> expect. > > > > > > > >>> > > > >>>>>>> > > > > > > > >>> > > > >>>>>>> val conf = new FlinkConf().mode("yarn") > > > > > > > >>> > > > >>>>>>> val env = new ExecutionEnvironment(conf) > > > > > > > >>> > > > >>>>>>> val jobId = env.submit(...) > > > > > > > >>> > > > >>>>>>> val jobStatus = > > > > > > > >>> env.getClusterClient().queryJobStatus(jobId) > > > > > > > >>> > > > >>>>>>> env.getClusterClient().cancelJob(jobId) > > > > > > > >>> > > > >>>>>>> > > > > > > > >>> > > > >>>>>>> What do you think ? > > > > > > > >>> > > > >>>>>>> > > > > > > > >>> > > > >>>>>>> > > > > > > > >>> > > > >>>>>>> > > > > > > > >>> > > > >>>>>>> > > > > > > > >>> > > > >>>>>>> Till Rohrmann <trohrm...@apache.org> > > > > 于2018年12月11日周二 > > > > > > > >>> 下午6:28写道: > > > > > > > >>> > > > >>>>>>> > > > > > > > >>> > > > >>>>>>>> Hi Jeff, > > > > > > > >>> > > > >>>>>>>> > > > > > > > >>> > > > >>>>>>>> what you are proposing is to provide the > user > > > with > > > > > > > better > > > > > > > >>> > > > >>>>> programmatic > > > > > > > >>> > > > >>>>>>> job > > > > > > > >>> > > > >>>>>>>> control. There was actually an effort to > > achieve > > > > > this > > > > > > > but > > > > > > > >>> it > > > > > > > >>> > > > >> has > > > > > > > >>> > > > >>>>> never > > > > > > > >>> > > > >>>>>>> been > > > > > > > >>> > > > >>>>>>>> completed [1]. However, there are some > > > improvement > > > > > in > > > > > > > the > > > > > > > >>> code > > > > > > > >>> > > > >>> base > > > > > > > >>> > > > >>>>>> now. > > > > > > > >>> > > > >>>>>>>> Look for example at the NewClusterClient > > > interface > > > > > > which > > > > > > > >>> > > > >> offers a > > > > > > > >>> > > > >>>>>>>> non-blocking job submission. But I agree > that > > we > > > > > need > > > > > > to > > > > > > > >>> > > > >> improve > > > > > > > >>> > > > >>>>> Flink > > > > > > > >>> > > > >>>>>> in > > > > > > > >>> > > > >>>>>>>> this regard. > > > > > > > >>> > > > >>>>>>>> > > > > > > > >>> > > > >>>>>>>> I would not be in favour if exposing all > > > > > ClusterClient > > > > > > > >>> calls > > > > > > > >>> > > > >> via > > > > > > > >>> > > > >>>> the > > > > > > > >>> > > > >>>>>>>> ExecutionEnvironment because it would > clutter > > > the > > > > > > class > > > > > > > >>> and > > > > > > > >>> > > > >> would > > > > > > > >>> > > > >>>> not > > > > > > > >>> > > > >>>>>> be > > > > > > > >>> > > > >>>>>>> a > > > > > > > >>> > > > >>>>>>>> good separation of concerns. Instead one > idea > > > > could > > > > > be > > > > > > > to > > > > > > > >>> > > > >>> retrieve > > > > > > > >>> > > > >>>>> the > > > > > > > >>> > > > >>>>>>>> current ClusterClient from the > > > > ExecutionEnvironment > > > > > > > which > > > > > > > >>> can > > > > > > > >>> > > > >>> then > > > > > > > >>> > > > >>>> be > > > > > > > >>> > > > >>>>>>> used > > > > > > > >>> > > > >>>>>>>> for cluster and job control. But before we > > start > > > > an > > > > > > > effort > > > > > > > >>> > > > >> here, > > > > > > > >>> > > > >>> we > > > > > > > >>> > > > >>>>>> need > > > > > > > >>> > > > >>>>>>> to > > > > > > > >>> > > > >>>>>>>> agree and capture what functionality we want > > to > > > > > > provide. > > > > > > > >>> > > > >>>>>>>> > > > > > > > >>> > > > >>>>>>>> Initially, the idea was that we have the > > > > > > > ClusterDescriptor > > > > > > > >>> > > > >>>> describing > > > > > > > >>> > > > >>>>>> how > > > > > > > >>> > > > >>>>>>>> to talk to cluster manager like Yarn or > Mesos. > > > The > > > > > > > >>> > > > >>>> ClusterDescriptor > > > > > > > >>> > > > >>>>>> can > > > > > > > >>> > > > >>>>>>> be > > > > > > > >>> > > > >>>>>>>> used for deploying Flink clusters (job and > > > > session) > > > > > > and > > > > > > > >>> gives > > > > > > > >>> > > > >>> you a > > > > > > > >>> > > > >>>>>>>> ClusterClient. The ClusterClient controls > the > > > > > cluster > > > > > > > >>> (e.g. > > > > > > > >>> > > > >>>>> submitting > > > > > > > >>> > > > >>>>>>>> jobs, listing all running jobs). And then > > there > > > > was > > > > > > the > > > > > > > >>> idea > > > > > > > >>> > to > > > > > > > >>> > > > >>>>>>> introduce a > > > > > > > >>> > > > >>>>>>>> JobClient which you obtain from the > > > ClusterClient > > > > to > > > > > > > >>> trigger > > > > > > > >>> > > > >> job > > > > > > > >>> > > > >>>>>> specific > > > > > > > >>> > > > >>>>>>>> operations (e.g. taking a savepoint, > > cancelling > > > > the > > > > > > > job). > > > > > > > >>> > > > >>>>>>>> > > > > > > > >>> > > > >>>>>>>> [1] > > > > > https://issues.apache.org/jira/browse/FLINK-4272 > > > > > > > >>> > > > >>>>>>>> > > > > > > > >>> > > > >>>>>>>> Cheers, > > > > > > > >>> > > > >>>>>>>> Till > > > > > > > >>> > > > >>>>>>>> > > > > > > > >>> > > > >>>>>>>> On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang > < > > > > > > > >>> zjf...@gmail.com > > > > > > > >>> > > > > > > > > > >>> > > > >>>>> wrote: > > > > > > > >>> > > > >>>>>>>> > > > > > > > >>> > > > >>>>>>>>> Hi Folks, > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> I am trying to integrate flink into apache > > > > zeppelin > > > > > > > >>> which is > > > > > > > >>> > > > >> an > > > > > > > >>> > > > >>>>>>>> interactive > > > > > > > >>> > > > >>>>>>>>> notebook. And I hit several issues that is > > > caused > > > > > by > > > > > > > >>> flink > > > > > > > >>> > > > >>> client > > > > > > > >>> > > > >>>>>> api. > > > > > > > >>> > > > >>>>>>> So > > > > > > > >>> > > > >>>>>>>>> I'd like to proposal the following changes > > for > > > > > flink > > > > > > > >>> client > > > > > > > >>> > > > >>> api. > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> 1. Support nonblocking execution. > Currently, > > > > > > > >>> > > > >>>>>>> ExecutionEnvironment#execute > > > > > > > >>> > > > >>>>>>>>> is a blocking method which would do 2 > things, > > > > first > > > > > > > >>> submit > > > > > > > >>> > > > >> job > > > > > > > >>> > > > >>>> and > > > > > > > >>> > > > >>>>>> then > > > > > > > >>> > > > >>>>>>>>> wait for job until it is finished. I'd like > > > > > > introduce a > > > > > > > >>> > > > >>>> nonblocking > > > > > > > >>> > > > >>>>>>>>> execution method like > > > ExecutionEnvironment#submit > > > > > > which > > > > > > > >>> only > > > > > > > >>> > > > >>>> submit > > > > > > > >>> > > > >>>>>> job > > > > > > > >>> > > > >>>>>>>> and > > > > > > > >>> > > > >>>>>>>>> then return jobId to client. And allow user > > to > > > > > query > > > > > > > the > > > > > > > >>> job > > > > > > > >>> > > > >>>> status > > > > > > > >>> > > > >>>>>> via > > > > > > > >>> > > > >>>>>>>> the > > > > > > > >>> > > > >>>>>>>>> jobId. > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> 2. Add cancel api in > > > > > > > >>> > > > >>>>> > ExecutionEnvironment/StreamExecutionEnvironment, > > > > > > > >>> > > > >>>>>>>>> currently the only way to cancel job is via > > cli > > > > > > > >>> (bin/flink), > > > > > > > >>> > > > >>> this > > > > > > > >>> > > > >>>>> is > > > > > > > >>> > > > >>>>>>> not > > > > > > > >>> > > > >>>>>>>>> convenient for downstream project to use > this > > > > > > feature. > > > > > > > >>> So I'd > > > > > > > >>> > > > >>>> like > > > > > > > >>> > > > >>>>> to > > > > > > > >>> > > > >>>>>>> add > > > > > > > >>> > > > >>>>>>>>> cancel api in ExecutionEnvironment > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> 3. Add savepoint api in > > > > > > > >>> > > > >>>>>>> > > ExecutionEnvironment/StreamExecutionEnvironment. > > > > > > > >>> > > > >>>>>>>> It > > > > > > > >>> > > > >>>>>>>>> is similar as cancel api, we should use > > > > > > > >>> ExecutionEnvironment > > > > > > > >>> > > > >> as > > > > > > > >>> > > > >>>> the > > > > > > > >>> > > > >>>>>>>> unified > > > > > > > >>> > > > >>>>>>>>> api for third party to integrate with > flink. > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> 4. Add listener for job execution > lifecycle. > > > > > > Something > > > > > > > >>> like > > > > > > > >>> > > > >>>>>> following, > > > > > > > >>> > > > >>>>>>> so > > > > > > > >>> > > > >>>>>>>>> that downstream project can do custom logic > > in > > > > the > > > > > > > >>> lifecycle > > > > > > > >>> > > > >> of > > > > > > > >>> > > > >>>>> job. > > > > > > > >>> > > > >>>>>>> e.g. > > > > > > > >>> > > > >>>>>>>>> Zeppelin would capture the jobId after job > is > > > > > > submitted > > > > > > > >>> and > > > > > > > >>> > > > >>> then > > > > > > > >>> > > > >>>>> use > > > > > > > >>> > > > >>>>>>> this > > > > > > > >>> > > > >>>>>>>>> jobId to cancel it later when necessary. > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> public interface JobListener { > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> void onJobSubmitted(JobID jobId); > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> void onJobExecuted(JobExecutionResult > > > > jobResult); > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> void onJobCanceled(JobID jobId); > > > > > > > >>> > > > >>>>>>>>> } > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> 5. Enable session in ExecutionEnvironment. > > > > > Currently > > > > > > it > > > > > > > >>> is > > > > > > > >>> > > > >>>>> disabled, > > > > > > > >>> > > > >>>>>>> but > > > > > > > >>> > > > >>>>>>>>> session is very convenient for third party > to > > > > > > > submitting > > > > > > > >>> jobs > > > > > > > >>> > > > >>>>>>>> continually. > > > > > > > >>> > > > >>>>>>>>> I hope flink can enable it again. > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> 6. Unify all flink client api into > > > > > > > >>> > > > >>>>>>>>> > > > ExecutionEnvironment/StreamExecutionEnvironment. > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> This is a long term issue which needs more > > > > careful > > > > > > > >>> thinking > > > > > > > >>> > > > >> and > > > > > > > >>> > > > >>>>>> design. > > > > > > > >>> > > > >>>>>>>>> Currently some of features of flink is > > exposed > > > in > > > > > > > >>> > > > >>>>>>>>> > > > ExecutionEnvironment/StreamExecutionEnvironment, > > > > > but > > > > > > > >>> some are > > > > > > > >>> > > > >>>>> exposed > > > > > > > >>> > > > >>>>>>> in > > > > > > > >>> > > > >>>>>>>>> cli instead of api, like the cancel and > > > > savepoint I > > > > > > > >>> mentioned > > > > > > > >>> > > > >>>>> above. > > > > > > > >>> > > > >>>>>> I > > > > > > > >>> > > > >>>>>>>>> think the root cause is due to that flink > > > didn't > > > > > > unify > > > > > > > >>> the > > > > > > > >>> > > > >>>>>> interaction > > > > > > > >>> > > > >>>>>>>> with > > > > > > > >>> > > > >>>>>>>>> flink. Here I list 3 scenarios of flink > > > operation > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> - Local job execution. Flink will create > > > > > > > >>> LocalEnvironment > > > > > > > >>> > > > >>> and > > > > > > > >>> > > > >>>>>> then > > > > > > > >>> > > > >>>>>>>> use > > > > > > > >>> > > > >>>>>>>>> this LocalEnvironment to create > > LocalExecutor > > > > for > > > > > > job > > > > > > > >>> > > > >>>> execution. > > > > > > > >>> > > > >>>>>>>>> - Remote job execution. Flink will create > > > > > > > ClusterClient > > > > > > > >>> > > > >>> first > > > > > > > >>> > > > >>>>> and > > > > > > > >>> > > > >>>>>>> then > > > > > > > >>> > > > >>>>>>>>> create ContextEnvironment based on the > > > > > > ClusterClient > > > > > > > >>> and > > > > > > > >>> > > > >>> then > > > > > > > >>> > > > >>>>> run > > > > > > > >>> > > > >>>>>>> the > > > > > > > >>> > > > >>>>>>>>> job. > > > > > > > >>> > > > >>>>>>>>> - Job cancelation. Flink will create > > > > > ClusterClient > > > > > > > >>> first > > > > > > > >>> > > > >> and > > > > > > > >>> > > > >>>>> then > > > > > > > >>> > > > >>>>>>>> cancel > > > > > > > >>> > > > >>>>>>>>> this job via this ClusterClient. > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> As you can see in the above 3 scenarios. > > Flink > > > > > didn't > > > > > > > >>> use the > > > > > > > >>> > > > >>>> same > > > > > > > >>> > > > >>>>>>>>> approach(code path) to interact with flink > > > > > > > >>> > > > >>>>>>>>> What I propose is following: > > > > > > > >>> > > > >>>>>>>>> Create the proper > > > > > LocalEnvironment/RemoteEnvironment > > > > > > > >>> (based > > > > > > > >>> > > > >> on > > > > > > > >>> > > > >>>> user > > > > > > > >>> > > > >>>>>>>>> configuration) --> Use this Environment to > > > create > > > > > > > proper > > > > > > > >>> > > > >>>>>> ClusterClient > > > > > > > >>> > > > >>>>>>>>> (LocalClusterClient or RestClusterClient) > to > > > > > > > interactive > > > > > > > >>> with > > > > > > > >>> > > > >>>>> Flink ( > > > > > > > >>> > > > >>>>>>> job > > > > > > > >>> > > > >>>>>>>>> execution or cancelation) > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> This way we can unify the process of local > > > > > execution > > > > > > > and > > > > > > > >>> > > > >> remote > > > > > > > >>> > > > >>>>>>>> execution. > > > > > > > >>> > > > >>>>>>>>> And it is much easier for third party to > > > > integrate > > > > > > with > > > > > > > >>> > > > >> flink, > > > > > > > >>> > > > >>>>>> because > > > > > > > >>> > > > >>>>>>>>> ExecutionEnvironment is the unified entry > > point > > > > for > > > > > > > >>> flink. > > > > > > > >>> > > > >> What > > > > > > > >>> > > > >>>>> third > > > > > > > >>> > > > >>>>>>>> party > > > > > > > >>> > > > >>>>>>>>> needs to do is just pass configuration to > > > > > > > >>> > > > >> ExecutionEnvironment > > > > > > > >>> > > > >>>> and > > > > > > > >>> > > > >>>>>>>>> ExecutionEnvironment will do the right > thing > > > > based > > > > > on > > > > > > > the > > > > > > > >>> > > > >>>>>>> configuration. > > > > > > > >>> > > > >>>>>>>>> Flink cli can also be considered as flink > api > > > > > > consumer. > > > > > > > >>> it > > > > > > > >>> > > > >> just > > > > > > > >>> > > > >>>>> pass > > > > > > > >>> > > > >>>>>>> the > > > > > > > >>> > > > >>>>>>>>> configuration to ExecutionEnvironment and > let > > > > > > > >>> > > > >>>> ExecutionEnvironment > > > > > > > >>> > > > >>>>> to > > > > > > > >>> > > > >>>>>>>>> create the proper ClusterClient instead of > > > > letting > > > > > > cli > > > > > > > to > > > > > > > >>> > > > >>> create > > > > > > > >>> > > > >>>>>>>>> ClusterClient directly. > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> 6 would involve large code refactoring, so > I > > > > think > > > > > we > > > > > > > can > > > > > > > >>> > > > >> defer > > > > > > > >>> > > > >>>> it > > > > > > > >>> > > > >>>>>> for > > > > > > > >>> > > > >>>>>>>>> future release, 1,2,3,4,5 could be done at > > > once I > > > > > > > >>> believe. > > > > > > > >>> > > > >> Let > > > > > > > >>> > > > >>> me > > > > > > > >>> > > > >>>>>> know > > > > > > > >>> > > > >>>>>>>> your > > > > > > > >>> > > > >>>>>>>>> comments and feedback, thanks > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> -- > > > > > > > >>> > > > >>>>>>>>> Best Regards > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>>> Jeff Zhang > > > > > > > >>> > > > >>>>>>>>> > > > > > > > >>> > > > >>>>>>>> > > > > > > > >>> > > > >>>>>>> > > > > > > > >>> > > > >>>>>>> > > > > > > > >>> > > > >>>>>>> -- > > > > > > > >>> > > > >>>>>>> Best Regards > > > > > > > >>> > > > >>>>>>> > > > > > > > >>> > > > >>>>>>> Jeff Zhang > > > > > > > >>> > > > >>>>>>> > > > > > > > >>> > > > >>>>>> > > > > > > > >>> > > > >>>>> > > > > > > > >>> > > > >>>>> > > > > > > > >>> > > > >>>>> -- > > > > > > > >>> > > > >>>>> Best Regards > > > > > > > >>> > > > >>>>> > > > > > > > >>> > > > >>>>> Jeff Zhang > > > > > > > >>> > > > >>>>> > > > > > > > >>> > > > >>>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > > >>> > > > > > > > >>> > > > >>> -- > > > > > > > >>> > > > >>> Best Regards > > > > > > > >>> > > > >>> > > > > > > > >>> > > > >>> Jeff Zhang > > > > > > > >>> > > > >>> > > > > > > > >>> > > > >> > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > -- > > > > > > > >>> > > > > Best Regards > > > > > > > >>> > > > > > > > > > > > >>> > > > > Jeff Zhang > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > >>> > > -- > > > > > > > >>> > > Best Regards > > > > > > > >>> > > > > > > > > > >>> > > Jeff Zhang > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >> > > > > > > > > > > > > > > -- > > > > > > > Best Regards > > > > > > > > > > > > > > Jeff Zhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Best Regards > > > > > > > > Jeff Zhang > > > > > > > > > > -- Best Regards Jeff Zhang