Great Kostas! Looking forward to your POC! Best, tison.
Jeff Zhang <zjf...@gmail.com> 于2019年8月30日周五 下午11:07写道: > Awesome, @Kostas Looking forward your POC. > > Kostas Kloudas <kklou...@gmail.com> 于2019年8月30日周五 下午8:33写道: > > > Hi all, > > > > I am just writing here to let you know that I am working on a POC that > > tries to refactor the current state of job submission in Flink. > > I want to stress out that it introduces NO CHANGES to the current > > behaviour of Flink. It just re-arranges things and introduces the > > notion of an Executor, which is the entity responsible for taking the > > user-code and submitting it for execution. > > > > Given this, the discussion about the functionality that the JobClient > > will expose to the user can go on independently and the same > > holds for all the open questions so far. > > > > I hope I will have some more new to share soon. > > > > Thanks, > > Kostas > > > > On Mon, Aug 26, 2019 at 4:20 AM Yang Wang <danrtsey...@gmail.com> wrote: > > > > > > Hi Zili, > > > > > > It make sense to me that a dedicated cluster is started for a per-job > > > cluster and will not accept more jobs. > > > Just have a question about the command line. > > > > > > Currently we could use the following commands to start different > > clusters. > > > *per-job cluster* > > > ./bin/flink run -d -p 5 -ynm perjob-cluster1 -m yarn-cluster > > > examples/streaming/WindowJoin.jar > > > *session cluster* > > > ./bin/flink run -p 5 -ynm session-cluster1 -m yarn-cluster > > > examples/streaming/WindowJoin.jar > > > > > > What will it look like after client enhancement? > > > > > > > > > Best, > > > Yang > > > > > > Zili Chen <wander4...@gmail.com> 于2019年8月23日周五 下午10:46写道: > > > > > > > Hi Till, > > > > > > > > Thanks for your update. Nice to hear :-) > > > > > > > > Best, > > > > tison. > > > > > > > > > > > > Till Rohrmann <trohrm...@apache.org> 于2019年8月23日周五 下午10:39写道: > > > > > > > > > Hi Tison, > > > > > > > > > > just a quick comment concerning the class loading issues when using > > the > > > > per > > > > > job mode. The community wants to change it so that the > > > > > StandaloneJobClusterEntryPoint actually uses the user code class > > loader > > > > > with child first class loading [1]. Hence, I hope that this problem > > will > > > > be > > > > > resolved soon. > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-13840 > > > > > > > > > > Cheers, > > > > > Till > > > > > > > > > > On Fri, Aug 23, 2019 at 2:47 PM Kostas Kloudas <kklou...@gmail.com > > > > > > wrote: > > > > > > > > > > > Hi all, > > > > > > > > > > > > On the topic of web submission, I agree with Till that it only > > seems > > > > > > to complicate things. > > > > > > It is bad for security, job isolation (anybody can submit/cancel > > jobs), > > > > > > and its > > > > > > implementation complicates some parts of the code. So, if it were > > to > > > > > > redesign the > > > > > > WebUI, maybe this part could be left out. In addition, I would > say > > > > > > that the ability to cancel > > > > > > jobs could also be left out. > > > > > > > > > > > > Also I would also be in favour of removing the "detached" mode, > for > > > > > > the reasons mentioned > > > > > > above (i.e. because now we will have a future representing the > > result > > > > > > on which the user > > > > > > can choose to wait or not). > > > > > > > > > > > > Now for the separating job submission and cluster creation, I am > in > > > > > > favour of keeping both. > > > > > > Once again, the reasons are mentioned above by Stephan, Till, > > Aljoscha > > > > > > and also Zili seems > > > > > > to agree. They mainly have to do with security, isolation and > ease > > of > > > > > > resource management > > > > > > for the user as he knows that "when my job is done, everything > > will be > > > > > > cleared up". This is > > > > > > also the experience you get when launching a process on your > local > > OS. > > > > > > > > > > > > On excluding the per-job mode from returning a JobClient or not, > I > > > > > > believe that eventually > > > > > > it would be nice to allow users to get back a jobClient. The > > reason is > > > > > > that 1) I cannot > > > > > > find any objective reason why the user-experience should diverge, > > and > > > > > > 2) this will be the > > > > > > way that the user will be able to interact with his running job. > > > > > > Assuming that the necessary > > > > > > ports are open for the REST API to work, then I think that the > > > > > > JobClient can run against the > > > > > > REST API without problems. If the needed ports are not open, then > > we > > > > > > are safe to not return > > > > > > a JobClient, as the user explicitly chose to close all points of > > > > > > communication to his running job. > > > > > > > > > > > > On the topic of not hijacking the "env.execute()" in order to get > > the > > > > > > Plan, I definitely agree but > > > > > > for the proposal of having a "compile()" method in the env, I > would > > > > > > like to have a better look at > > > > > > the existing code. > > > > > > > > > > > > Cheers, > > > > > > Kostas > > > > > > > > > > > > On Fri, Aug 23, 2019 at 5:52 AM Zili Chen <wander4...@gmail.com> > > > > wrote: > > > > > > > > > > > > > > Hi Yang, > > > > > > > > > > > > > > It would be helpful if you check Stephan's last comment, > > > > > > > which states that isolation is important. > > > > > > > > > > > > > > For per-job mode, we run a dedicated cluster(maybe it > > > > > > > should have been a couple of JM and TMs during FLIP-6 > > > > > > > design) for a specific job. Thus the process is prevented > > > > > > > from other jobs. > > > > > > > > > > > > > > In our cases there was a time we suffered from multi > > > > > > > jobs submitted by different users and they affected > > > > > > > each other so that all ran into an error state. Also, > > > > > > > run the client inside the cluster could save client > > > > > > > resource at some points. > > > > > > > > > > > > > > However, we also face several issues as you mentioned, > > > > > > > that in per-job mode it always uses parent classloader > > > > > > > thus classloading issues occur. > > > > > > > > > > > > > > BTW, one can makes an analogy between session/per-job mode > > > > > > > in Flink, and client/cluster mode in Spark. > > > > > > > > > > > > > > Best, > > > > > > > tison. > > > > > > > > > > > > > > > > > > > > > Yang Wang <danrtsey...@gmail.com> 于2019年8月22日周四 上午11:25写道: > > > > > > > > > > > > > > > From the user's perspective, it is really confused about the > > scope > > > > of > > > > > > > > per-job cluster. > > > > > > > > > > > > > > > > > > > > > > > > If it means a flink cluster with single job, so that we could > > get > > > > > > better > > > > > > > > isolation. > > > > > > > > > > > > > > > > Now it does not matter how we deploy the cluster, directly > > > > > > deploy(mode1) > > > > > > > > > > > > > > > > or start a flink cluster and then submit job through cluster > > > > > > client(mode2). > > > > > > > > > > > > > > > > > > > > > > > > Otherwise, if it just means directly deploy, how should we > > name the > > > > > > mode2, > > > > > > > > > > > > > > > > session with job or something else? > > > > > > > > > > > > > > > > We could also benefit from the mode2. Users could get the > same > > > > > > isolation > > > > > > > > with mode1. > > > > > > > > > > > > > > > > The user code and dependencies will be loaded by user class > > loader > > > > > > > > > > > > > > > > to avoid class conflict with framework. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Anyway, both of the two submission modes are useful. > > > > > > > > > > > > > > > > We just need to clarify the concepts. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > Yang > > > > > > > > > > > > > > > > Zili Chen <wander4...@gmail.com> 于2019年8月20日周二 下午5:58写道: > > > > > > > > > > > > > > > > > Thanks for the clarification. > > > > > > > > > > > > > > > > > > The idea JobDeployer ever came into my mind when I was > > muddled > > > > with > > > > > > > > > how to execute per-job mode and session mode with the same > > user > > > > > code > > > > > > > > > and framework codepath. > > > > > > > > > > > > > > > > > > With the concept JobDeployer we back to the statement that > > > > > > environment > > > > > > > > > knows every configs of cluster deployment and job > > submission. We > > > > > > > > > configure or generate from configuration a specific > > JobDeployer > > > > in > > > > > > > > > environment and then code align on > > > > > > > > > > > > > > > > > > *JobClient client = env.execute().get();* > > > > > > > > > > > > > > > > > > which in session mode returned by clusterClient.submitJob > > and in > > > > > > per-job > > > > > > > > > mode returned by clusterDescriptor.deployJobCluster. > > > > > > > > > > > > > > > > > > Here comes a problem that currently we directly run > > > > > ClusterEntrypoint > > > > > > > > > with extracted job graph. Follow the JobDeployer way we'd > > better > > > > > > > > > align entry point of per-job deployment at JobDeployer. > > Users run > > > > > > > > > their main method or by a Cli(finally call main method) to > > deploy > > > > > the > > > > > > > > > job cluster. > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > tison. > > > > > > > > > > > > > > > > > > > > > > > > > > > Stephan Ewen <se...@apache.org> 于2019年8月20日周二 下午4:40写道: > > > > > > > > > > > > > > > > > > > Till has made some good comments here. > > > > > > > > > > > > > > > > > > > > Two things to add: > > > > > > > > > > > > > > > > > > > > - The job mode is very nice in the way that it runs the > > > > client > > > > > > inside > > > > > > > > > the > > > > > > > > > > cluster (in the same image/process that is the JM) and > thus > > > > > unifies > > > > > > > > both > > > > > > > > > > applications and what the Spark world calls the "driver > > mode". > > > > > > > > > > > > > > > > > > > > - Another thing I would add is that during the FLIP-6 > > design, > > > > > we > > > > > > were > > > > > > > > > > thinking about setups where Dispatcher and JobManager are > > > > > separate > > > > > > > > > > processes. > > > > > > > > > > A Yarn or Mesos Dispatcher of a session could run > > > > > independently > > > > > > > > (even > > > > > > > > > > as privileged processes executing no code). > > > > > > > > > > Then you the "per-job" mode could still be helpful: > > when a > > > > > job > > > > > > is > > > > > > > > > > submitted to the dispatcher, it launches the JM again in > a > > > > > per-job > > > > > > > > mode, > > > > > > > > > so > > > > > > > > > > that JM and TM processes are bound to teh job only. For > > higher > > > > > > security > > > > > > > > > > setups, it is important that processes are not reused > > across > > > > > jobs. > > > > > > > > > > > > > > > > > > > > On Tue, Aug 20, 2019 at 10:27 AM Till Rohrmann < > > > > > > trohrm...@apache.org> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > I would not be in favour of getting rid of the per-job > > mode > > > > > > since it > > > > > > > > > > > simplifies the process of running Flink jobs > > considerably. > > > > > > Moreover, > > > > > > > > it > > > > > > > > > > is > > > > > > > > > > > not only well suited for container deployments but also > > for > > > > > > > > deployments > > > > > > > > > > > where you want to guarantee job isolation. For > example, a > > > > user > > > > > > could > > > > > > > > > use > > > > > > > > > > > the per-job mode on Yarn to execute his job on a > separate > > > > > > cluster. > > > > > > > > > > > > > > > > > > > > > > I think that having two notions of cluster deployments > > > > (session > > > > > > vs. > > > > > > > > > > per-job > > > > > > > > > > > mode) does not necessarily contradict your ideas for > the > > > > client > > > > > > api > > > > > > > > > > > refactoring. For example one could have the following > > > > > interfaces: > > > > > > > > > > > > > > > > > > > > > > - ClusterDeploymentDescriptor: encapsulates the logic > > how to > > > > > > deploy a > > > > > > > > > > > cluster. > > > > > > > > > > > - ClusterClient: allows to interact with a cluster > > > > > > > > > > > - JobClient: allows to interact with a running job > > > > > > > > > > > > > > > > > > > > > > Now the ClusterDeploymentDescriptor could have two > > methods: > > > > > > > > > > > > > > > > > > > > > > - ClusterClient deploySessionCluster() > > > > > > > > > > > - JobClusterClient/JobClient > > deployPerJobCluster(JobGraph) > > > > > > > > > > > > > > > > > > > > > > where JobClusterClient is either a supertype of > > ClusterClient > > > > > > which > > > > > > > > > does > > > > > > > > > > > not give you the functionality to submit jobs or > > > > > > deployPerJobCluster > > > > > > > > > > > returns directly a JobClient. > > > > > > > > > > > > > > > > > > > > > > When setting up the ExecutionEnvironment, one would > then > > not > > > > > > provide > > > > > > > > a > > > > > > > > > > > ClusterClient to submit jobs but a JobDeployer which, > > > > depending > > > > > > on > > > > > > > > the > > > > > > > > > > > selected mode, either uses a ClusterClient (session > > mode) to > > > > > > submit > > > > > > > > > jobs > > > > > > > > > > or > > > > > > > > > > > a ClusterDeploymentDescriptor to deploy per a job mode > > > > cluster > > > > > > with > > > > > > > > the > > > > > > > > > > job > > > > > > > > > > > to execute. > > > > > > > > > > > > > > > > > > > > > > These are just some thoughts how one could make it > > working > > > > > > because I > > > > > > > > > > > believe there is some value in using the per job mode > > from > > > > the > > > > > > > > > > > ExecutionEnvironment. > > > > > > > > > > > > > > > > > > > > > > Concerning the web submission, this is indeed a bit > > tricky. > > > > > From > > > > > > a > > > > > > > > > > cluster > > > > > > > > > > > management stand point, I would in favour of not > > executing > > > > user > > > > > > code > > > > > > > > on > > > > > > > > > > the > > > > > > > > > > > REST endpoint. Especially when considering security, it > > would > > > > > be > > > > > > good > > > > > > > > > to > > > > > > > > > > > have a well defined cluster behaviour where it is > > explicitly > > > > > > stated > > > > > > > > > where > > > > > > > > > > > user code and, thus, potentially risky code is > executed. > > > > > Ideally > > > > > > we > > > > > > > > > limit > > > > > > > > > > > it to the TaskExecutor and JobMaster. > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > Till > > > > > > > > > > > > > > > > > > > > > > On Tue, Aug 20, 2019 at 9:40 AM Flavio Pompermaier < > > > > > > > > > pomperma...@okkam.it > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > In my opinion the client should not use any > > environment to > > > > > get > > > > > > the > > > > > > > > > Job > > > > > > > > > > > > graph because the jar should reside ONLY on the > cluster > > > > (and > > > > > > not in > > > > > > > > > the > > > > > > > > > > > > client classpath otherwise there are always > > inconsistencies > > > > > > between > > > > > > > > > > > client > > > > > > > > > > > > and Flink Job manager's classpath). > > > > > > > > > > > > In the YARN, Mesos and Kubernetes scenarios you have > > the > > > > jar > > > > > > but > > > > > > > > you > > > > > > > > > > > could > > > > > > > > > > > > start a cluster that has the jar on the Job Manager > as > > well > > > > > > (but > > > > > > > > this > > > > > > > > > > is > > > > > > > > > > > > the only case where I think you can assume that the > > client > > > > > has > > > > > > the > > > > > > > > > jar > > > > > > > > > > on > > > > > > > > > > > > the classpath..in the REST job submission you don't > > have > > > > any > > > > > > > > > > classpath). > > > > > > > > > > > > > > > > > > > > > > > > Thus, always in my opinion, the JobGraph should be > > > > generated > > > > > > by the > > > > > > > > > Job > > > > > > > > > > > > Manager REST API. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Aug 20, 2019 at 9:00 AM Zili Chen < > > > > > > wander4...@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > >> I would like to involve Till & Stephan here to > clarify > > > > some > > > > > > > > concept > > > > > > > > > of > > > > > > > > > > > >> per-job mode. > > > > > > > > > > > >> > > > > > > > > > > > >> The term per-job is one of modes a cluster could run > > on. > > > > It > > > > > is > > > > > > > > > mainly > > > > > > > > > > > >> aimed > > > > > > > > > > > >> at spawn > > > > > > > > > > > >> a dedicated cluster for a specific job while the job > > could > > > > > be > > > > > > > > > packaged > > > > > > > > > > > >> with > > > > > > > > > > > >> Flink > > > > > > > > > > > >> itself and thus the cluster initialized with job so > > that > > > > get > > > > > > rid > > > > > > > > of > > > > > > > > > a > > > > > > > > > > > >> separated > > > > > > > > > > > >> submission step. > > > > > > > > > > > >> > > > > > > > > > > > >> This is useful for container deployments where one > > create > > > > > his > > > > > > > > image > > > > > > > > > > with > > > > > > > > > > > >> the job > > > > > > > > > > > >> and then simply deploy the container. > > > > > > > > > > > >> > > > > > > > > > > > >> However, it is out of client scope since a > > > > > > client(ClusterClient > > > > > > > > for > > > > > > > > > > > >> example) is for > > > > > > > > > > > >> communicate with an existing cluster and performance > > > > > actions. > > > > > > > > > > Currently, > > > > > > > > > > > >> in > > > > > > > > > > > >> per-job > > > > > > > > > > > >> mode, we extract the job graph and bundle it into > > cluster > > > > > > > > deployment > > > > > > > > > > and > > > > > > > > > > > >> thus no > > > > > > > > > > > >> concept of client get involved. It looks like > > reasonable > > > > to > > > > > > > > exclude > > > > > > > > > > the > > > > > > > > > > > >> deployment > > > > > > > > > > > >> of per-job cluster from client api and use dedicated > > > > utility > > > > > > > > > > > >> classes(deployers) for > > > > > > > > > > > >> deployment. > > > > > > > > > > > >> > > > > > > > > > > > >> Zili Chen <wander4...@gmail.com> 于2019年8月20日周二 > > 下午12:37写道: > > > > > > > > > > > >> > > > > > > > > > > > >> > Hi Aljoscha, > > > > > > > > > > > >> > > > > > > > > > > > > >> > Thanks for your reply and participance. The Google > > Doc > > > > you > > > > > > > > linked > > > > > > > > > to > > > > > > > > > > > >> > requires > > > > > > > > > > > >> > permission and I think you could use a share link > > > > instead. > > > > > > > > > > > >> > > > > > > > > > > > > >> > I agree with that we almost reach a consensus that > > > > > > JobClient is > > > > > > > > > > > >> necessary > > > > > > > > > > > >> > to > > > > > > > > > > > >> > interacte with a running Job. > > > > > > > > > > > >> > > > > > > > > > > > > >> > Let me check your open questions one by one. > > > > > > > > > > > >> > > > > > > > > > > > > >> > 1. Separate cluster creation and job submission > for > > > > > per-job > > > > > > > > mode. > > > > > > > > > > > >> > > > > > > > > > > > > >> > As you mentioned here is where the opinions > > diverge. In > > > > my > > > > > > > > > document > > > > > > > > > > > >> there > > > > > > > > > > > >> > is > > > > > > > > > > > >> > an alternative[2] that proposes excluding per-job > > > > > deployment > > > > > > > > from > > > > > > > > > > > client > > > > > > > > > > > >> > api > > > > > > > > > > > >> > scope and now I find it is more reasonable we do > the > > > > > > exclusion. > > > > > > > > > > > >> > > > > > > > > > > > > >> > When in per-job mode, a dedicated JobCluster is > > launched > > > > > to > > > > > > > > > execute > > > > > > > > > > > the > > > > > > > > > > > >> > specific job. It is like a Flink Application more > > than a > > > > > > > > > submission > > > > > > > > > > > >> > of Flink Job. Client only takes care of job > > submission > > > > and > > > > > > > > assume > > > > > > > > > > > there > > > > > > > > > > > >> is > > > > > > > > > > > >> > an existing cluster. In this way we are able to > > consider > > > > > > per-job > > > > > > > > > > > issues > > > > > > > > > > > >> > individually and JobClusterEntrypoint would be the > > > > utility > > > > > > class > > > > > > > > > for > > > > > > > > > > > >> > per-job > > > > > > > > > > > >> > deployment. > > > > > > > > > > > >> > > > > > > > > > > > > >> > Nevertheless, user program works in both session > > mode > > > > and > > > > > > > > per-job > > > > > > > > > > mode > > > > > > > > > > > >> > without > > > > > > > > > > > >> > necessary to change code. JobClient in per-job > mode > > is > > > > > > returned > > > > > > > > > from > > > > > > > > > > > >> > env.execute as normal. However, it would be no > > longer a > > > > > > wrapper > > > > > > > > of > > > > > > > > > > > >> > RestClusterClient but a wrapper of > > PerJobClusterClient > > > > > which > > > > > > > > > > > >> communicates > > > > > > > > > > > >> > to Dispatcher locally. > > > > > > > > > > > >> > > > > > > > > > > > > >> > 2. How to deal with plan preview. > > > > > > > > > > > >> > > > > > > > > > > > > >> > With env.compile functions users can get JobGraph > or > > > > > > FlinkPlan > > > > > > > > and > > > > > > > > > > > thus > > > > > > > > > > > >> > they can preview the plan with programming. > > Typically it > > > > > > looks > > > > > > > > > like > > > > > > > > > > > >> > > > > > > > > > > > > >> > if (preview configured) { > > > > > > > > > > > >> > FlinkPlan plan = env.compile(); > > > > > > > > > > > >> > new JSONDumpGenerator(...).dump(plan); > > > > > > > > > > > >> > } else { > > > > > > > > > > > >> > env.execute(); > > > > > > > > > > > >> > } > > > > > > > > > > > >> > > > > > > > > > > > > >> > And `flink info` would be invalid any more. > > > > > > > > > > > >> > > > > > > > > > > > > >> > 3. How to deal with Jar Submission at the Web > > Frontend. > > > > > > > > > > > >> > > > > > > > > > > > > >> > There is one more thread talked on this topic[1]. > > Apart > > > > > from > > > > > > > > > > removing > > > > > > > > > > > >> > the functions there are two alternatives. > > > > > > > > > > > >> > > > > > > > > > > > > >> > One is to introduce an interface has a method > > returns > > > > > > > > > > > JobGraph/FilnkPlan > > > > > > > > > > > >> > and Jar Submission only support main-class > > implements > > > > this > > > > > > > > > > interface. > > > > > > > > > > > >> > And then extract the JobGraph/FlinkPlan just by > > calling > > > > > the > > > > > > > > > method. > > > > > > > > > > > >> > In this way, it is even possible to consider a > > > > separation > > > > > > of job > > > > > > > > > > > >> creation > > > > > > > > > > > >> > and job submission. > > > > > > > > > > > >> > > > > > > > > > > > > >> > The other is, as you mentioned, let execute() do > the > > > > > actual > > > > > > > > > > execution. > > > > > > > > > > > >> > We won't execute the main method in the > WebFrontend > > but > > > > > > spawn a > > > > > > > > > > > process > > > > > > > > > > > >> > at WebMonitor side to execute. For return part we > > could > > > > > > generate > > > > > > > > > the > > > > > > > > > > > >> > JobID from WebMonitor and pass it to the execution > > > > > > environemnt. > > > > > > > > > > > >> > > > > > > > > > > > > >> > 4. How to deal with detached mode. > > > > > > > > > > > >> > > > > > > > > > > > > >> > I think detached mode is a temporary solution for > > > > > > non-blocking > > > > > > > > > > > >> submission. > > > > > > > > > > > >> > In my document both submission and execution > return > > a > > > > > > > > > > > CompletableFuture > > > > > > > > > > > >> and > > > > > > > > > > > >> > users control whether or not wait for the result. > In > > > > this > > > > > > point > > > > > > > > we > > > > > > > > > > > don't > > > > > > > > > > > >> > need a detached option but the functionality is > > covered. > > > > > > > > > > > >> > > > > > > > > > > > > >> > 5. How does per-job mode interact with interactive > > > > > > programming. > > > > > > > > > > > >> > > > > > > > > > > > > >> > All of YARN, Mesos and Kubernetes scenarios follow > > the > > > > > > pattern > > > > > > > > > > launch > > > > > > > > > > > a > > > > > > > > > > > >> > JobCluster now. And I don't think there would be > > > > > > inconsistency > > > > > > > > > > between > > > > > > > > > > > >> > different resource management. > > > > > > > > > > > >> > > > > > > > > > > > > >> > Best, > > > > > > > > > > > >> > tison. > > > > > > > > > > > >> > > > > > > > > > > > > >> > [1] > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://lists.apache.org/x/thread.html/6db869c53816f4e2917949a7c6992c2b90856d7d639d7f2e1cd13768@%3Cdev.flink.apache.org%3E > > > > > > > > > > > >> > [2] > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit?disco=AAAADZaGGfs > > > > > > > > > > > >> > > > > > > > > > > > > >> > Aljoscha Krettek <aljos...@apache.org> > > 于2019年8月16日周五 > > > > > > 下午9:20写道: > > > > > > > > > > > >> > > > > > > > > > > > > >> >> Hi, > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> I read both Jeffs initial design document and the > > newer > > > > > > > > document > > > > > > > > > by > > > > > > > > > > > >> >> Tison. I also finally found the time to collect > our > > > > > > thoughts on > > > > > > > > > the > > > > > > > > > > > >> issue, > > > > > > > > > > > >> >> I had quite some discussions with Kostas and this > > is > > > > the > > > > > > > > result: > > > > > > > > > > [1]. > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> I think overall we agree that this part of the > > code is > > > > in > > > > > > dire > > > > > > > > > need > > > > > > > > > > > of > > > > > > > > > > > >> >> some refactoring/improvements but I think there > are > > > > still > > > > > > some > > > > > > > > > open > > > > > > > > > > > >> >> questions and some differences in opinion what > > those > > > > > > > > refactorings > > > > > > > > > > > >> should > > > > > > > > > > > >> >> look like. > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> I think the API-side is quite clear, i.e. we need > > some > > > > > > > > JobClient > > > > > > > > > > API > > > > > > > > > > > >> that > > > > > > > > > > > >> >> allows interacting with a running Job. It could > be > > > > > > worthwhile > > > > > > > > to > > > > > > > > > > spin > > > > > > > > > > > >> that > > > > > > > > > > > >> >> off into a separate FLIP because we can probably > > find > > > > > > consensus > > > > > > > > > on > > > > > > > > > > > that > > > > > > > > > > > >> >> part more easily. > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> For the rest, the main open questions from our > doc > > are > > > > > > these: > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> - Do we want to separate cluster creation and > job > > > > > > submission > > > > > > > > > for > > > > > > > > > > > >> >> per-job mode? In the past, there were conscious > > efforts > > > > > to > > > > > > > > *not* > > > > > > > > > > > >> separate > > > > > > > > > > > >> >> job submission from cluster creation for per-job > > > > clusters > > > > > > for > > > > > > > > > > Mesos, > > > > > > > > > > > >> YARN, > > > > > > > > > > > >> >> Kubernets (see StandaloneJobClusterEntryPoint). > > Tison > > > > > > suggests > > > > > > > > in > > > > > > > > > > his > > > > > > > > > > > >> >> design document to decouple this in order to > unify > > job > > > > > > > > > submission. > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> - How to deal with plan preview, which needs to > > > > hijack > > > > > > > > > execute() > > > > > > > > > > > and > > > > > > > > > > > >> >> let the outside code catch an exception? > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> - How to deal with Jar Submission at the Web > > > > Frontend, > > > > > > which > > > > > > > > > > needs > > > > > > > > > > > to > > > > > > > > > > > >> >> hijack execute() and let the outside code catch > an > > > > > > exception? > > > > > > > > > > > >> >> CliFrontend.run() “hijacks” > > > > > ExecutionEnvironment.execute() > > > > > > to > > > > > > > > > get a > > > > > > > > > > > >> >> JobGraph and then execute that JobGraph manually. > > We > > > > > could > > > > > > get > > > > > > > > > > around > > > > > > > > > > > >> that > > > > > > > > > > > >> >> by letting execute() do the actual execution. One > > > > caveat > > > > > > for > > > > > > > > this > > > > > > > > > > is > > > > > > > > > > > >> that > > > > > > > > > > > >> >> now the main() method doesn’t return (or is > forced > > to > > > > > > return by > > > > > > > > > > > >> throwing an > > > > > > > > > > > >> >> exception from execute()) which means that for > Jar > > > > > > Submission > > > > > > > > > from > > > > > > > > > > > the > > > > > > > > > > > >> >> WebFrontend we have a long-running main() method > > > > running > > > > > > in the > > > > > > > > > > > >> >> WebFrontend. This doesn’t sound very good. We > > could get > > > > > > around > > > > > > > > > this > > > > > > > > > > > by > > > > > > > > > > > >> >> removing the plan preview feature and by removing > > Jar > > > > > > > > > > > >> Submission/Running. > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> - How to deal with detached mode? Right now, > > > > > > > > > DetachedEnvironment > > > > > > > > > > > will > > > > > > > > > > > >> >> execute the job and return immediately. If users > > > > control > > > > > > when > > > > > > > > > they > > > > > > > > > > > >> want to > > > > > > > > > > > >> >> return, by waiting on the job completion future, > > how do > > > > > we > > > > > > deal > > > > > > > > > > with > > > > > > > > > > > >> this? > > > > > > > > > > > >> >> Do we simply remove the distinction between > > > > > > > > > detached/non-detached? > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> - How does per-job mode interact with > > “interactive > > > > > > > > programming” > > > > > > > > > > > >> >> (FLIP-36). For YARN, each execute() call could > > spawn a > > > > > new > > > > > > > > Flink > > > > > > > > > > YARN > > > > > > > > > > > >> >> cluster. What about Mesos and Kubernetes? > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> The first open question is where the opinions > > diverge, > > > > I > > > > > > think. > > > > > > > > > The > > > > > > > > > > > >> rest > > > > > > > > > > > >> >> are just open questions and interesting things > > that we > > > > > > need to > > > > > > > > > > > >> consider. > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> Best, > > > > > > > > > > > >> >> Aljoscha > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> [1] > > > > > > > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit#heading=h.na7k0ad88tix > > > > > > > > > > > >> >> < > > > > > > > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit#heading=h.na7k0ad88tix > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > On 31. Jul 2019, at 15:23, Jeff Zhang < > > > > > zjf...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > Thanks tison for the effort. I left a few > > comments. > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > Zili Chen <wander4...@gmail.com> 于2019年7月31日周三 > > > > > 下午8:24写道: > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> >> Hi Flavio, > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> >> Thanks for your reply. > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> >> Either current impl and in the design, > > ClusterClient > > > > > > > > > > > >> >> >> never takes responsibility for generating > > JobGraph. > > > > > > > > > > > >> >> >> (what you see in current codebase is several > > class > > > > > > methods) > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> >> Instead, user describes his program in the > main > > > > method > > > > > > > > > > > >> >> >> with ExecutionEnvironment apis and calls > > > > env.compile() > > > > > > > > > > > >> >> >> or env.optimize() to get FlinkPlan and > JobGraph > > > > > > > > respectively. > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> >> For listing main classes in a jar and choose > > one for > > > > > > > > > > > >> >> >> submission, you're now able to customize a CLI > > to do > > > > > it. > > > > > > > > > > > >> >> >> Specifically, the path of jar is passed as > > arguments > > > > > and > > > > > > > > > > > >> >> >> in the customized CLI you list main classes, > > choose > > > > > one > > > > > > > > > > > >> >> >> to submit to the cluster. > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> >> Best, > > > > > > > > > > > >> >> >> tison. > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> >> Flavio Pompermaier <pomperma...@okkam.it> > > > > > 于2019年7月31日周三 > > > > > > > > > > 下午8:12写道: > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> >>> Just one note on my side: it is not clear to > me > > > > > > whether the > > > > > > > > > > > client > > > > > > > > > > > >> >> needs > > > > > > > > > > > >> >> >> to > > > > > > > > > > > >> >> >>> be able to generate a job graph or not. > > > > > > > > > > > >> >> >>> In my opinion, the job jar must resides only > > on the > > > > > > > > > > > >> server/jobManager > > > > > > > > > > > >> >> >> side > > > > > > > > > > > >> >> >>> and the client requires a way to get the job > > graph. > > > > > > > > > > > >> >> >>> If you really want to access to the job > graph, > > I'd > > > > > add > > > > > > a > > > > > > > > > > > dedicated > > > > > > > > > > > >> >> method > > > > > > > > > > > >> >> >>> on the ClusterClient. like: > > > > > > > > > > > >> >> >>> > > > > > > > > > > > >> >> >>> - getJobGraph(jarId, mainClass): JobGraph > > > > > > > > > > > >> >> >>> - listMainClasses(jarId): List<String> > > > > > > > > > > > >> >> >>> > > > > > > > > > > > >> >> >>> These would require some addition also on the > > job > > > > > > manager > > > > > > > > > > > endpoint > > > > > > > > > > > >> as > > > > > > > > > > > >> >> >>> well..what do you think? > > > > > > > > > > > >> >> >>> > > > > > > > > > > > >> >> >>> On Wed, Jul 31, 2019 at 12:42 PM Zili Chen < > > > > > > > > > > wander4...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > >> >> wrote: > > > > > > > > > > > >> >> >>> > > > > > > > > > > > >> >> >>>> Hi all, > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>>> Here is a document[1] on client api > > enhancement > > > > from > > > > > > our > > > > > > > > > > > >> perspective. > > > > > > > > > > > >> >> >>>> We have investigated current > implementations. > > And > > > > we > > > > > > > > propose > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>>> 1. Unify the implementation of cluster > > deployment > > > > > and > > > > > > job > > > > > > > > > > > >> submission > > > > > > > > > > > >> >> in > > > > > > > > > > > >> >> >>>> Flink. > > > > > > > > > > > >> >> >>>> 2. Provide programmatic interfaces to allow > > > > flexible > > > > > > job > > > > > > > > and > > > > > > > > > > > >> cluster > > > > > > > > > > > >> >> >>>> management. > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>>> The first proposal is aimed at reducing code > > paths > > > > > of > > > > > > > > > cluster > > > > > > > > > > > >> >> >> deployment > > > > > > > > > > > >> >> >>>> and > > > > > > > > > > > >> >> >>>> job submission so that one can adopt Flink > in > > his > > > > > > usage > > > > > > > > > > easily. > > > > > > > > > > > >> The > > > > > > > > > > > >> >> >>> second > > > > > > > > > > > >> >> >>>> proposal is aimed at providing rich > > interfaces for > > > > > > > > advanced > > > > > > > > > > > users > > > > > > > > > > > >> >> >>>> who want to make accurate control of these > > stages. > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>>> Quick reference on open questions: > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>>> 1. Exclude job cluster deployment from > client > > side > > > > > or > > > > > > > > > redefine > > > > > > > > > > > the > > > > > > > > > > > >> >> >>> semantic > > > > > > > > > > > >> >> >>>> of job cluster? Since it fits in a process > > quite > > > > > > different > > > > > > > > > > from > > > > > > > > > > > >> >> session > > > > > > > > > > > >> >> >>>> cluster deployment and job submission. > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>>> 2. Maintain the codepaths handling class > > > > > > > > > > > o.a.f.api.common.Program > > > > > > > > > > > >> or > > > > > > > > > > > >> >> >>>> implement customized program handling logic > by > > > > > > customized > > > > > > > > > > > >> >> CliFrontend? > > > > > > > > > > > >> >> >>>> See also this thread[2] and the document[1]. > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>>> 3. Expose ClusterClient as public api or > just > > > > expose > > > > > > api > > > > > > > > in > > > > > > > > > > > >> >> >>>> ExecutionEnvironment > > > > > > > > > > > >> >> >>>> and delegate them to ClusterClient? Further, > > in > > > > > > either way > > > > > > > > > is > > > > > > > > > > it > > > > > > > > > > > >> >> worth > > > > > > > > > > > >> >> >> to > > > > > > > > > > > >> >> >>>> introduce a JobClient which is an > > encapsulation of > > > > > > > > > > ClusterClient > > > > > > > > > > > >> that > > > > > > > > > > > >> >> >>>> associated to specific job? > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>>> Best, > > > > > > > > > > > >> >> >>>> tison. > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>>> [1] > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>> > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit?usp=sharing > > > > > > > > > > > >> >> >>>> [2] > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>> > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://lists.apache.org/thread.html/7ffc9936a384b891dbcf0a481d26c6d13b2125607c200577780d1e18@%3Cdev.flink.apache.org%3E > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>>> Jeff Zhang <zjf...@gmail.com> 于2019年7月24日周三 > > > > > 上午9:19写道: > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>>>> Thanks Stephan, I will follow up this issue > > in > > > > next > > > > > > few > > > > > > > > > > weeks, > > > > > > > > > > > >> and > > > > > > > > > > > >> >> >> will > > > > > > > > > > > >> >> >>>>> refine the design doc. We could discuss > more > > > > > details > > > > > > > > after > > > > > > > > > > 1.9 > > > > > > > > > > > >> >> >> release. > > > > > > > > > > > >> >> >>>>> > > > > > > > > > > > >> >> >>>>> Stephan Ewen <se...@apache.org> > > 于2019年7月24日周三 > > > > > > 上午12:58写道: > > > > > > > > > > > >> >> >>>>> > > > > > > > > > > > >> >> >>>>>> Hi all! > > > > > > > > > > > >> >> >>>>>> > > > > > > > > > > > >> >> >>>>>> This thread has stalled for a bit, which I > > > > assume > > > > > > ist > > > > > > > > > mostly > > > > > > > > > > > >> due to > > > > > > > > > > > >> >> >>> the > > > > > > > > > > > >> >> >>>>>> Flink 1.9 feature freeze and release > testing > > > > > effort. > > > > > > > > > > > >> >> >>>>>> > > > > > > > > > > > >> >> >>>>>> I personally still recognize this issue as > > one > > > > > > important > > > > > > > > > to > > > > > > > > > > be > > > > > > > > > > > >> >> >>> solved. > > > > > > > > > > > >> >> >>>>> I'd > > > > > > > > > > > >> >> >>>>>> be happy to help resume this discussion > soon > > > > > (after > > > > > > the > > > > > > > > > 1.9 > > > > > > > > > > > >> >> >> release) > > > > > > > > > > > >> >> >>>> and > > > > > > > > > > > >> >> >>>>>> see if we can do some step towards this in > > Flink > > > > > > 1.10. > > > > > > > > > > > >> >> >>>>>> > > > > > > > > > > > >> >> >>>>>> Best, > > > > > > > > > > > >> >> >>>>>> Stephan > > > > > > > > > > > >> >> >>>>>> > > > > > > > > > > > >> >> >>>>>> > > > > > > > > > > > >> >> >>>>>> > > > > > > > > > > > >> >> >>>>>> On Mon, Jun 24, 2019 at 10:41 AM Flavio > > > > > Pompermaier > > > > > > < > > > > > > > > > > > >> >> >>>>> pomperma...@okkam.it> > > > > > > > > > > > >> >> >>>>>> wrote: > > > > > > > > > > > >> >> >>>>>> > > > > > > > > > > > >> >> >>>>>>> That's exactly what I suggested a long > time > > > > ago: > > > > > > the > > > > > > > > > Flink > > > > > > > > > > > REST > > > > > > > > > > > >> >> >>>> client > > > > > > > > > > > >> >> >>>>>>> should not require any Flink dependency, > > only > > > > > http > > > > > > > > > library > > > > > > > > > > to > > > > > > > > > > > >> >> >> call > > > > > > > > > > > >> >> >>>> the > > > > > > > > > > > >> >> >>>>>> REST > > > > > > > > > > > >> >> >>>>>>> services to submit and monitor a job. > > > > > > > > > > > >> >> >>>>>>> What I suggested also in [1] was to have > a > > way > > > > to > > > > > > > > > > > automatically > > > > > > > > > > > >> >> >>>> suggest > > > > > > > > > > > >> >> >>>>>> the > > > > > > > > > > > >> >> >>>>>>> user (via a UI) the available main > classes > > and > > > > > > their > > > > > > > > > > required > > > > > > > > > > > >> >> >>>>>>> parameters[2]. > > > > > > > > > > > >> >> >>>>>>> Another problem we have with Flink is > that > > the > > > > > Rest > > > > > > > > > client > > > > > > > > > > > and > > > > > > > > > > > >> >> >> the > > > > > > > > > > > >> >> >>>> CLI > > > > > > > > > > > >> >> >>>>>> one > > > > > > > > > > > >> >> >>>>>>> behaves differently and we use the CLI > > client > > > > > (via > > > > > > ssh) > > > > > > > > > > > because > > > > > > > > > > > >> >> >> it > > > > > > > > > > > >> >> >>>>> allows > > > > > > > > > > > >> >> >>>>>>> to call some other method after > > env.execute() > > > > [3] > > > > > > (we > > > > > > > > > have > > > > > > > > > > to > > > > > > > > > > > >> >> >> call > > > > > > > > > > > >> >> >>>>>> another > > > > > > > > > > > >> >> >>>>>>> REST service to signal the end of the > job). > > > > > > > > > > > >> >> >>>>>>> Int his regard, a dedicated interface, > > like the > > > > > > > > > JobListener > > > > > > > > > > > >> >> >>> suggested > > > > > > > > > > > >> >> >>>>> in > > > > > > > > > > > >> >> >>>>>>> the previous emails, would be very > helpful > > > > > (IMHO). > > > > > > > > > > > >> >> >>>>>>> > > > > > > > > > > > >> >> >>>>>>> [1] > > > > > > https://issues.apache.org/jira/browse/FLINK-10864 > > > > > > > > > > > >> >> >>>>>>> [2] > > > > > > https://issues.apache.org/jira/browse/FLINK-10862 > > > > > > > > > > > >> >> >>>>>>> [3] > > > > > > https://issues.apache.org/jira/browse/FLINK-10879 > > > > > > > > > > > >> >> >>>>>>> > > > > > > > > > > > >> >> >>>>>>> Best, > > > > > > > > > > > >> >> >>>>>>> Flavio > > > > > > > > > > > >> >> >>>>>>> > > > > > > > > > > > >> >> >>>>>>> On Mon, Jun 24, 2019 at 9:54 AM Jeff > Zhang > > < > > > > > > > > > > zjf...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > >> >> >>> wrote: > > > > > > > > > > > >> >> >>>>>>> > > > > > > > > > > > >> >> >>>>>>>> Hi, Tison, > > > > > > > > > > > >> >> >>>>>>>> > > > > > > > > > > > >> >> >>>>>>>> Thanks for your comments. Overall I > agree > > with > > > > > you > > > > > > > > that > > > > > > > > > it > > > > > > > > > > > is > > > > > > > > > > > >> >> >>>>> difficult > > > > > > > > > > > >> >> >>>>>>> for > > > > > > > > > > > >> >> >>>>>>>> down stream project to integrate with > > flink > > > > and > > > > > we > > > > > > > > need > > > > > > > > > to > > > > > > > > > > > >> >> >>> refactor > > > > > > > > > > > >> >> >>>>> the > > > > > > > > > > > >> >> >>>>>>>> current flink client api. > > > > > > > > > > > >> >> >>>>>>>> And I agree that CliFrontend should only > > > > parsing > > > > > > > > command > > > > > > > > > > > line > > > > > > > > > > > >> >> >>>>> arguments > > > > > > > > > > > >> >> >>>>>>> and > > > > > > > > > > > >> >> >>>>>>>> then pass them to ExecutionEnvironment. > > It is > > > > > > > > > > > >> >> >>>> ExecutionEnvironment's > > > > > > > > > > > >> >> >>>>>>>> responsibility to compile job, create > > cluster, > > > > > and > > > > > > > > > submit > > > > > > > > > > > job. > > > > > > > > > > > >> >> >>>>> Besides > > > > > > > > > > > >> >> >>>>>>>> that, Currently flink has many > > > > > > ExecutionEnvironment > > > > > > > > > > > >> >> >>>> implementations, > > > > > > > > > > > >> >> >>>>>> and > > > > > > > > > > > >> >> >>>>>>>> flink will use the specific one based on > > the > > > > > > context. > > > > > > > > > > IMHO, > > > > > > > > > > > it > > > > > > > > > > > >> >> >> is > > > > > > > > > > > >> >> >>>> not > > > > > > > > > > > >> >> >>>>>>>> necessary, ExecutionEnvironment should > be > > able > > > > > to > > > > > > do > > > > > > > > the > > > > > > > > > > > right > > > > > > > > > > > >> >> >>>> thing > > > > > > > > > > > >> >> >>>>>>> based > > > > > > > > > > > >> >> >>>>>>>> on the FlinkConf it is received. Too > many > > > > > > > > > > > ExecutionEnvironment > > > > > > > > > > > >> >> >>>>>>>> implementation is another burden for > > > > downstream > > > > > > > > project > > > > > > > > > > > >> >> >>>> integration. > > > > > > > > > > > >> >> >>>>>>>> > > > > > > > > > > > >> >> >>>>>>>> One thing I'd like to mention is flink's > > scala > > > > > > shell > > > > > > > > and > > > > > > > > > > sql > > > > > > > > > > > >> >> >>>> client, > > > > > > > > > > > >> >> >>>>>>>> although they are sub-modules of flink, > > they > > > > > > could be > > > > > > > > > > > treated > > > > > > > > > > > >> >> >> as > > > > > > > > > > > >> >> >>>>>>> downstream > > > > > > > > > > > >> >> >>>>>>>> project which use flink's client api. > > > > Currently > > > > > > you > > > > > > > > will > > > > > > > > > > > find > > > > > > > > > > > >> >> >> it > > > > > > > > > > > >> >> >>> is > > > > > > > > > > > >> >> >>>>> not > > > > > > > > > > > >> >> >>>>>>>> easy for them to integrate with flink, > > they > > > > > share > > > > > > many > > > > > > > > > > > >> >> >> duplicated > > > > > > > > > > > >> >> >>>>> code. > > > > > > > > > > > >> >> >>>>>>> It > > > > > > > > > > > >> >> >>>>>>>> is another sign that we should refactor > > flink > > > > > > client > > > > > > > > > api. > > > > > > > > > > > >> >> >>>>>>>> > > > > > > > > > > > >> >> >>>>>>>> I believe it is a large and hard change, > > and I > > > > > am > > > > > > > > afraid > > > > > > > > > > we > > > > > > > > > > > >> can > > > > > > > > > > > >> >> >>> not > > > > > > > > > > > >> >> >>>>>> keep > > > > > > > > > > > >> >> >>>>>>>> compatibility since many of changes are > > user > > > > > > facing. > > > > > > > > > > > >> >> >>>>>>>> > > > > > > > > > > > >> >> >>>>>>>> > > > > > > > > > > > >> >> >>>>>>>> > > > > > > > > > > > >> >> >>>>>>>> Zili Chen <wander4...@gmail.com> > > > > 于2019年6月24日周一 > > > > > > > > > 下午2:53写道: > > > > > > > > > > > >> >> >>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>> Hi all, > > > > > > > > > > > >> >> >>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>> After a closer look on our client apis, > > I can > > > > > see > > > > > > > > there > > > > > > > > > > are > > > > > > > > > > > >> >> >> two > > > > > > > > > > > >> >> >>>>> major > > > > > > > > > > > >> >> >>>>>>>>> issues to consistency and integration, > > namely > > > > > > > > different > > > > > > > > > > > >> >> >>>> deployment > > > > > > > > > > > >> >> >>>>> of > > > > > > > > > > > >> >> >>>>>>>>> job cluster which couples job graph > > creation > > > > > and > > > > > > > > > cluster > > > > > > > > > > > >> >> >>>>> deployment, > > > > > > > > > > > >> >> >>>>>>>>> and submission via CliFrontend > confusing > > > > > control > > > > > > flow > > > > > > > > > of > > > > > > > > > > > job > > > > > > > > > > > >> >> >>>> graph > > > > > > > > > > > >> >> >>>>>>>>> compilation and job submission. I'd > like > > to > > > > > > follow > > > > > > > > the > > > > > > > > > > > >> >> >> discuss > > > > > > > > > > > >> >> >>>>> above, > > > > > > > > > > > >> >> >>>>>>>>> mainly the process described by Jeff > and > > > > > > Stephan, and > > > > > > > > > > share > > > > > > > > > > > >> >> >> my > > > > > > > > > > > >> >> >>>>>>>>> ideas on these issues. > > > > > > > > > > > >> >> >>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>> 1) CliFrontend confuses the control > flow > > of > > > > job > > > > > > > > > > compilation > > > > > > > > > > > >> >> >> and > > > > > > > > > > > >> >> >>>>>>>> submission. > > > > > > > > > > > >> >> >>>>>>>>> Following the process of job submission > > > > Stephan > > > > > > and > > > > > > > > > Jeff > > > > > > > > > > > >> >> >>>> described, > > > > > > > > > > > >> >> >>>>>>>>> execution environment knows all configs > > of > > > > the > > > > > > > > cluster > > > > > > > > > > and > > > > > > > > > > > >> >> >>>>>>> topos/settings > > > > > > > > > > > >> >> >>>>>>>>> of the job. Ideally, in the main method > > of > > > > user > > > > > > > > > program, > > > > > > > > > > it > > > > > > > > > > > >> >> >>> calls > > > > > > > > > > > >> >> >>>>>>>> #execute > > > > > > > > > > > >> >> >>>>>>>>> (or named #submit) and Flink deploys > the > > > > > cluster, > > > > > > > > > compile > > > > > > > > > > > the > > > > > > > > > > > >> >> >>> job > > > > > > > > > > > >> >> >>>>>> graph > > > > > > > > > > > >> >> >>>>>>>>> and submit it to the cluster. However, > > > > current > > > > > > > > > > CliFrontend > > > > > > > > > > > >> >> >> does > > > > > > > > > > > >> >> >>>> all > > > > > > > > > > > >> >> >>>>>>> these > > > > > > > > > > > >> >> >>>>>>>>> things inside its #runProgram method, > > which > > > > > > > > introduces > > > > > > > > > a > > > > > > > > > > > lot > > > > > > > > > > > >> >> >> of > > > > > > > > > > > >> >> >>>>>>>> subclasses > > > > > > > > > > > >> >> >>>>>>>>> of (stream) execution environment. > > > > > > > > > > > >> >> >>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>> Actually, it sets up an exec env that > > hijacks > > > > > the > > > > > > > > > > > >> >> >>>>>> #execute/executePlan > > > > > > > > > > > >> >> >>>>>>>>> method, initializes the job graph and > > abort > > > > > > > > execution. > > > > > > > > > > And > > > > > > > > > > > >> >> >> then > > > > > > > > > > > >> >> >>>>>>>>> control flow back to CliFrontend, it > > deploys > > > > > the > > > > > > > > > > cluster(or > > > > > > > > > > > >> >> >>>>> retrieve > > > > > > > > > > > >> >> >>>>>>>>> the client) and submits the job graph. > > This > > > > is > > > > > > quite > > > > > > > > a > > > > > > > > > > > >> >> >> specific > > > > > > > > > > > >> >> >>>>>>> internal > > > > > > > > > > > >> >> >>>>>>>>> process inside Flink and none of > > consistency > > > > to > > > > > > > > > anything. > > > > > > > > > > > >> >> >>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>> 2) Deployment of job cluster couples > job > > > > graph > > > > > > > > creation > > > > > > > > > > and > > > > > > > > > > > >> >> >>>> cluster > > > > > > > > > > > >> >> >>>>>>>>> deployment. Abstractly, from user job > to > > a > > > > > > concrete > > > > > > > > > > > >> >> >> submission, > > > > > > > > > > > >> >> >>>> it > > > > > > > > > > > >> >> >>>>>>>> requires > > > > > > > > > > > >> >> >>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>> create JobGraph --\ > > > > > > > > > > > >> >> >>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>> create ClusterClient --> submit > JobGraph > > > > > > > > > > > >> >> >>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>> such a dependency. ClusterClient was > > created > > > > by > > > > > > > > > deploying > > > > > > > > > > > or > > > > > > > > > > > >> >> >>>>>>> retrieving. > > > > > > > > > > > >> >> >>>>>>>>> JobGraph submission requires a compiled > > > > > JobGraph > > > > > > and > > > > > > > > > > valid > > > > > > > > > > > >> >> >>>>>>> ClusterClient, > > > > > > > > > > > >> >> >>>>>>>>> but the creation of ClusterClient is > > > > abstractly > > > > > > > > > > independent > > > > > > > > > > > >> >> >> of > > > > > > > > > > > >> >> >>>> that > > > > > > > > > > > >> >> >>>>>> of > > > > > > > > > > > >> >> >>>>>>>>> JobGraph. However, in job cluster mode, > > we > > > > > > deploy job > > > > > > > > > > > cluster > > > > > > > > > > > >> >> >>>> with > > > > > > > > > > > >> >> >>>>> a > > > > > > > > > > > >> >> >>>>>>> job > > > > > > > > > > > >> >> >>>>>>>>> graph, which means we use another > > process: > > > > > > > > > > > >> >> >>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>> create JobGraph --> deploy cluster with > > the > > > > > > JobGraph > > > > > > > > > > > >> >> >>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>> Here is another inconsistency and > > downstream > > > > > > > > > > > projects/client > > > > > > > > > > > >> >> >>> apis > > > > > > > > > > > >> >> >>>>> are > > > > > > > > > > > >> >> >>>>>>>>> forced to handle different cases with > > rare > > > > > > supports > > > > > > > > > from > > > > > > > > > > > >> >> >> Flink. > > > > > > > > > > > >> >> >>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>> Since we likely reached a consensus on > > > > > > > > > > > >> >> >>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>> 1. all configs gathered by Flink > > > > configuration > > > > > > and > > > > > > > > > passed > > > > > > > > > > > >> >> >>>>>>>>> 2. execution environment knows all > > configs > > > > and > > > > > > > > handles > > > > > > > > > > > >> >> >>>>> execution(both > > > > > > > > > > > >> >> >>>>>>>>> deployment and submission) > > > > > > > > > > > >> >> >>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>> to the issues above I propose > eliminating > > > > > > > > > inconsistencies > > > > > > > > > > > by > > > > > > > > > > > >> >> >>>>>> following > > > > > > > > > > > >> >> >>>>>>>>> approach: > > > > > > > > > > > >> >> >>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>> 1) CliFrontend should exactly be a > front > > end, > > > > > at > > > > > > > > least > > > > > > > > > > for > > > > > > > > > > > >> >> >>> "run" > > > > > > > > > > > >> >> >>>>>>> command. > > > > > > > > > > > >> >> >>>>>>>>> That means it just gathered and passed > > all > > > > > config > > > > > > > > from > > > > > > > > > > > >> >> >> command > > > > > > > > > > > >> >> >>>> line > > > > > > > > > > > >> >> >>>>>> to > > > > > > > > > > > >> >> >>>>>>>>> the main method of user program. > > Execution > > > > > > > > environment > > > > > > > > > > > knows > > > > > > > > > > > >> >> >>> all > > > > > > > > > > > >> >> >>>>> the > > > > > > > > > > > >> >> >>>>>>> info > > > > > > > > > > > >> >> >>>>>>>>> and with an addition to utils for > > > > > ClusterClient, > > > > > > we > > > > > > > > > > > >> >> >> gracefully > > > > > > > > > > > >> >> >>>> get > > > > > > > > > > > >> >> >>>>> a > > > > > > > > > > > >> >> >>>>>>>>> ClusterClient by deploying or > > retrieving. In > > > > > this > > > > > > > > way, > > > > > > > > > we > > > > > > > > > > > >> >> >> don't > > > > > > > > > > > >> >> >>>>> need > > > > > > > > > > > >> >> >>>>>> to > > > > > > > > > > > >> >> >>>>>>>>> hijack #execute/executePlan methods and > > can > > > > > > remove > > > > > > > > > > various > > > > > > > > > > > >> >> >>>> hacking > > > > > > > > > > > >> >> >>>>>>>>> subclasses of exec env, as well as #run > > > > methods > > > > > > in > > > > > > > > > > > >> >> >>>>> ClusterClient(for > > > > > > > > > > > >> >> >>>>>> an > > > > > > > > > > > >> >> >>>>>>>>> interface-ized ClusterClient). Now the > > > > control > > > > > > flow > > > > > > > > > flows > > > > > > > > > > > >> >> >> from > > > > > > > > > > > >> >> >>>>>>>> CliFrontend > > > > > > > > > > > >> >> >>>>>>>>> to the main method and never returns. > > > > > > > > > > > >> >> >>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>> 2) Job cluster means a cluster for the > > > > specific > > > > > > job. > > > > > > > > > From > > > > > > > > > > > >> >> >>> another > > > > > > > > > > > >> >> >>>>>>>>> perspective, it is an ephemeral > session. > > We > > > > may > > > > > > > > > decouple > > > > > > > > > > > the > > > > > > > > > > > >> >> >>>>>> deployment > > > > > > > > > > > >> >> >>>>>>>>> with a compiled job graph, but start a > > > > session > > > > > > with > > > > > > > > > idle > > > > > > > > > > > >> >> >>> timeout > > > > > > > > > > > >> >> >>>>>>>>> and submit the job following. > > > > > > > > > > > >> >> >>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>> These topics, before we go into more > > details > > > > on > > > > > > > > design > > > > > > > > > or > > > > > > > > > > > >> >> >>>>>>> implementation, > > > > > > > > > > > >> >> >>>>>>>>> are better to be aware and discussed > for > > a > > > > > > consensus. > > > > > > > > > > > >> >> >>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>> Best, > > > > > > > > > > > >> >> >>>>>>>>> tison. > > > > > > > > > > > >> >> >>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>> Zili Chen <wander4...@gmail.com> > > > > 于2019年6月20日周四 > > > > > > > > > 上午3:21写道: > > > > > > > > > > > >> >> >>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>> Hi Jeff, > > > > > > > > > > > >> >> >>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>> Thanks for raising this thread and the > > > > design > > > > > > > > > document! > > > > > > > > > > > >> >> >>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>> As @Thomas Weise mentioned above, > > extending > > > > > > config > > > > > > > > to > > > > > > > > > > > flink > > > > > > > > > > > >> >> >>>>>>>>>> requires far more effort than it > should > > be. > > > > > > Another > > > > > > > > > > > example > > > > > > > > > > > >> >> >>>>>>>>>> is we achieve detach mode by introduce > > > > another > > > > > > > > > execution > > > > > > > > > > > >> >> >>>>>>>>>> environment which also hijack #execute > > > > method. > > > > > > > > > > > >> >> >>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>> I agree with your idea that user would > > > > > > configure all > > > > > > > > > > > things > > > > > > > > > > > >> >> >>>>>>>>>> and flink "just" respect it. On this > > topic I > > > > > > think > > > > > > > > the > > > > > > > > > > > >> >> >> unusual > > > > > > > > > > > >> >> >>>>>>>>>> control flow when CliFrontend handle > > "run" > > > > > > command > > > > > > > > is > > > > > > > > > > the > > > > > > > > > > > >> >> >>>> problem. > > > > > > > > > > > >> >> >>>>>>>>>> It handles several configs, mainly > about > > > > > cluster > > > > > > > > > > settings, > > > > > > > > > > > >> >> >> and > > > > > > > > > > > >> >> >>>>>>>>>> thus main method of user program is > > unaware > > > > of > > > > > > them. > > > > > > > > > > Also > > > > > > > > > > > it > > > > > > > > > > > >> >> >>>>>> compiles > > > > > > > > > > > >> >> >>>>>>>>>> app to job graph by run the main > method > > > > with a > > > > > > > > > hijacked > > > > > > > > > > > exec > > > > > > > > > > > >> >> >>>> env, > > > > > > > > > > > >> >> >>>>>>>>>> which constrain the main method > further. > > > > > > > > > > > >> >> >>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>> I'd like to write down a few of notes > on > > > > > > > > configs/args > > > > > > > > > > pass > > > > > > > > > > > >> >> >> and > > > > > > > > > > > >> >> >>>>>>> respect, > > > > > > > > > > > >> >> >>>>>>>>>> as well as decoupling job compilation > > and > > > > > > > > submission. > > > > > > > > > > > Share > > > > > > > > > > > >> >> >> on > > > > > > > > > > > >> >> >>>>> this > > > > > > > > > > > >> >> >>>>>>>>>> thread later. > > > > > > > > > > > >> >> >>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>> Best, > > > > > > > > > > > >> >> >>>>>>>>>> tison. > > > > > > > > > > > >> >> >>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>> SHI Xiaogang <shixiaoga...@gmail.com> > > > > > > 于2019年6月17日周一 > > > > > > > > > > > >> >> >> 下午7:29写道: > > > > > > > > > > > >> >> >>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>> Hi Jeff and Flavio, > > > > > > > > > > > >> >> >>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>> Thanks Jeff a lot for proposing the > > design > > > > > > > > document. > > > > > > > > > > > >> >> >>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>> We are also working on refactoring > > > > > > ClusterClient to > > > > > > > > > > allow > > > > > > > > > > > >> >> >>>>> flexible > > > > > > > > > > > >> >> >>>>>>> and > > > > > > > > > > > >> >> >>>>>>>>>>> efficient job management in our > > real-time > > > > > > platform. > > > > > > > > > > > >> >> >>>>>>>>>>> We would like to draft a document to > > share > > > > > our > > > > > > > > ideas > > > > > > > > > > with > > > > > > > > > > > >> >> >>> you. > > > > > > > > > > > >> >> >>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>> I think it's a good idea to have > > something > > > > > like > > > > > > > > > Apache > > > > > > > > > > > Livy > > > > > > > > > > > >> >> >>> for > > > > > > > > > > > >> >> >>>>>>> Flink, > > > > > > > > > > > >> >> >>>>>>>>>>> and > > > > > > > > > > > >> >> >>>>>>>>>>> the efforts discussed here will take > a > > > > great > > > > > > step > > > > > > > > > > forward > > > > > > > > > > > >> >> >> to > > > > > > > > > > > >> >> >>>> it. > > > > > > > > > > > >> >> >>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>> Regards, > > > > > > > > > > > >> >> >>>>>>>>>>> Xiaogang > > > > > > > > > > > >> >> >>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>> Flavio Pompermaier < > > pomperma...@okkam.it> > > > > > > > > > > 于2019年6月17日周一 > > > > > > > > > > > >> >> >>>>> 下午7:13写道: > > > > > > > > > > > >> >> >>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>> Is there any possibility to have > > something > > > > > > like > > > > > > > > > Apache > > > > > > > > > > > >> >> >> Livy > > > > > > > > > > > >> >> >>>> [1] > > > > > > > > > > > >> >> >>>>>>> also > > > > > > > > > > > >> >> >>>>>>>>>>> for > > > > > > > > > > > >> >> >>>>>>>>>>>> Flink in the future? > > > > > > > > > > > >> >> >>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>> [1] https://livy.apache.org/ > > > > > > > > > > > >> >> >>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>> On Tue, Jun 11, 2019 at 5:23 PM Jeff > > > > Zhang < > > > > > > > > > > > >> >> >>> zjf...@gmail.com > > > > > > > > > > > >> >> >>>>> > > > > > > > > > > > >> >> >>>>>>> wrote: > > > > > > > > > > > >> >> >>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> Any API we expose should not > have > > > > > > dependencies > > > > > > > > > on > > > > > > > > > > > >> >> >>> the > > > > > > > > > > > >> >> >>>>>>> runtime > > > > > > > > > > > >> >> >>>>>>>>>>>>> (flink-runtime) package or other > > > > > > implementation > > > > > > > > > > > >> >> >> details. > > > > > > > > > > > >> >> >>> To > > > > > > > > > > > >> >> >>>>> me, > > > > > > > > > > > >> >> >>>>>>>> this > > > > > > > > > > > >> >> >>>>>>>>>>>> means > > > > > > > > > > > >> >> >>>>>>>>>>>>> that the current ClusterClient > > cannot be > > > > > > exposed > > > > > > > > to > > > > > > > > > > > >> >> >> users > > > > > > > > > > > >> >> >>>>>> because > > > > > > > > > > > >> >> >>>>>>>> it > > > > > > > > > > > >> >> >>>>>>>>>>>> uses > > > > > > > > > > > >> >> >>>>>>>>>>>>> quite some classes from the > > optimiser and > > > > > > runtime > > > > > > > > > > > >> >> >>> packages. > > > > > > > > > > > >> >> >>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>> We should change ClusterClient from > > class > > > > > to > > > > > > > > > > interface. > > > > > > > > > > > >> >> >>>>>>>>>>>>> ExecutionEnvironment only use the > > > > interface > > > > > > > > > > > >> >> >> ClusterClient > > > > > > > > > > > >> >> >>>>> which > > > > > > > > > > > >> >> >>>>>>>>>>> should be > > > > > > > > > > > >> >> >>>>>>>>>>>>> in flink-clients while the concrete > > > > > > > > implementation > > > > > > > > > > > >> >> >> class > > > > > > > > > > > >> >> >>>>> could > > > > > > > > > > > >> >> >>>>>> be > > > > > > > > > > > >> >> >>>>>>>> in > > > > > > > > > > > >> >> >>>>>>>>>>>>> flink-runtime. > > > > > > > > > > > >> >> >>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> What happens when a > > failure/restart in > > > > > the > > > > > > > > > client > > > > > > > > > > > >> >> >>>>> happens? > > > > > > > > > > > >> >> >>>>>>>> There > > > > > > > > > > > >> >> >>>>>>>>>>> need > > > > > > > > > > > >> >> >>>>>>>>>>>>> to be a way of re-establishing the > > > > > > connection to > > > > > > > > > the > > > > > > > > > > > >> >> >> job, > > > > > > > > > > > >> >> >>>> set > > > > > > > > > > > >> >> >>>>>> up > > > > > > > > > > > >> >> >>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>> listeners again, etc. > > > > > > > > > > > >> >> >>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>> Good point. First we need to > define > > what > > > > > > does > > > > > > > > > > > >> >> >>>>> failure/restart > > > > > > > > > > > >> >> >>>>>> in > > > > > > > > > > > >> >> >>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>> client mean. IIUC, that usually > mean > > > > > network > > > > > > > > > failure > > > > > > > > > > > >> >> >>> which > > > > > > > > > > > >> >> >>>>> will > > > > > > > > > > > >> >> >>>>>>>>>>> happen in > > > > > > > > > > > >> >> >>>>>>>>>>>>> class RestClient. If my > > understanding is > > > > > > correct, > > > > > > > > > > > >> >> >>>>> restart/retry > > > > > > > > > > > >> >> >>>>>>>>>>> mechanism > > > > > > > > > > > >> >> >>>>>>>>>>>>> should be done in RestClient. > > > > > > > > > > > >> >> >>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>> Aljoscha Krettek < > > aljos...@apache.org> > > > > > > > > > 于2019年6月11日周二 > > > > > > > > > > > >> >> >>>>>> 下午11:10写道: > > > > > > > > > > > >> >> >>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>> Some points to consider: > > > > > > > > > > > >> >> >>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>> * Any API we expose should not > have > > > > > > dependencies > > > > > > > > > on > > > > > > > > > > > >> >> >> the > > > > > > > > > > > >> >> >>>>>> runtime > > > > > > > > > > > >> >> >>>>>>>>>>>>>> (flink-runtime) package or other > > > > > > implementation > > > > > > > > > > > >> >> >>> details. > > > > > > > > > > > >> >> >>>> To > > > > > > > > > > > >> >> >>>>>> me, > > > > > > > > > > > >> >> >>>>>>>>>>> this > > > > > > > > > > > >> >> >>>>>>>>>>>>> means > > > > > > > > > > > >> >> >>>>>>>>>>>>>> that the current ClusterClient > > cannot be > > > > > > exposed > > > > > > > > > to > > > > > > > > > > > >> >> >>> users > > > > > > > > > > > >> >> >>>>>>> because > > > > > > > > > > > >> >> >>>>>>>>>>> it > > > > > > > > > > > >> >> >>>>>>>>>>>>> uses > > > > > > > > > > > >> >> >>>>>>>>>>>>>> quite some classes from the > > optimiser > > > > and > > > > > > > > runtime > > > > > > > > > > > >> >> >>>> packages. > > > > > > > > > > > >> >> >>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>> * What happens when a > > failure/restart in > > > > > the > > > > > > > > > client > > > > > > > > > > > >> >> >>>>> happens? > > > > > > > > > > > >> >> >>>>>>>> There > > > > > > > > > > > >> >> >>>>>>>>>>> need > > > > > > > > > > > >> >> >>>>>>>>>>>>> to > > > > > > > > > > > >> >> >>>>>>>>>>>>>> be a way of re-establishing the > > > > connection > > > > > > to > > > > > > > > the > > > > > > > > > > > >> >> >> job, > > > > > > > > > > > >> >> >>>> set > > > > > > > > > > > >> >> >>>>> up > > > > > > > > > > > >> >> >>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>> listeners > > > > > > > > > > > >> >> >>>>>>>>>>>>>> again, etc. > > > > > > > > > > > >> >> >>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>> Aljoscha > > > > > > > > > > > >> >> >>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>> On 29. May 2019, at 10:17, Jeff > > Zhang < > > > > > > > > > > > >> >> >>>> zjf...@gmail.com> > > > > > > > > > > > >> >> >>>>>>>> wrote: > > > > > > > > > > > >> >> >>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>> Sorry folks, the design doc is > > late as > > > > > you > > > > > > > > > > > >> >> >> expected. > > > > > > > > > > > >> >> >>>>> Here's > > > > > > > > > > > >> >> >>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>> design > > > > > > > > > > > >> >> >>>>>>>>>>>>>> doc > > > > > > > > > > > >> >> >>>>>>>>>>>>>>> I drafted, welcome any comments > and > > > > > > feedback. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>> > > > > > > > > > > > >> >> >>>>>>> > > > > > > > > > > > >> >> >>>>>> > > > > > > > > > > > >> >> >>>>> > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>> > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing > > > > > > > > > > > >> >> >>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>> Stephan Ewen <se...@apache.org> > > > > > > 于2019年2月14日周四 > > > > > > > > > > > >> >> >>>> 下午8:43写道: > > > > > > > > > > > >> >> >>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> Nice that this discussion is > > > > happening. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> In the FLIP, we could also > > revisit the > > > > > > entire > > > > > > > > > role > > > > > > > > > > > >> >> >>> of > > > > > > > > > > > >> >> >>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>> environments > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> again. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> Initially, the idea was: > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> - the environments take care of > > the > > > > > > specific > > > > > > > > > > > >> >> >> setup > > > > > > > > > > > >> >> >>>> for > > > > > > > > > > > >> >> >>>>>>>>>>> standalone > > > > > > > > > > > >> >> >>>>>>>>>>>> (no > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> setup needed), yarn, mesos, etc. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> - the session ones have control > > over > > > > the > > > > > > > > > session. > > > > > > > > > > > >> >> >>> The > > > > > > > > > > > >> >> >>>>>>>>>>> environment > > > > > > > > > > > >> >> >>>>>>>>>>>>> holds > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> the session client. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> - running a job gives a > "control" > > > > object > > > > > > for > > > > > > > > > that > > > > > > > > > > > >> >> >>>> job. > > > > > > > > > > > >> >> >>>>>> That > > > > > > > > > > > >> >> >>>>>>>>>>>> behavior > > > > > > > > > > > >> >> >>>>>>>>>>>>> is > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> the same in all environments. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> The actual implementation > diverged > > > > quite > > > > > > a bit > > > > > > > > > > > >> >> >> from > > > > > > > > > > > >> >> >>>>> that. > > > > > > > > > > > >> >> >>>>>>>> Happy > > > > > > > > > > > >> >> >>>>>>>>>>> to > > > > > > > > > > > >> >> >>>>>>>>>>>>> see a > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> discussion about straitening > this > > out > > > > a > > > > > > bit > > > > > > > > > more. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> On Tue, Feb 12, 2019 at 4:58 AM > > Jeff > > > > > > Zhang < > > > > > > > > > > > >> >> >>>>>>> zjf...@gmail.com> > > > > > > > > > > > >> >> >>>>>>>>>>>> wrote: > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> Hi folks, > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> Sorry for late response, It > > seems we > > > > > > reach > > > > > > > > > > > >> >> >>> consensus > > > > > > > > > > > >> >> >>>> on > > > > > > > > > > > >> >> >>>>>>>> this, I > > > > > > > > > > > >> >> >>>>>>>>>>>> will > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> create > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> FLIP for this with more > detailed > > > > design > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> Thomas Weise <t...@apache.org> > > > > > > 于2018年12月21日周五 > > > > > > > > > > > >> >> >>>>> 上午11:43写道: > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> Great to see this discussion > > seeded! > > > > > The > > > > > > > > > > > >> >> >> problems > > > > > > > > > > > >> >> >>>> you > > > > > > > > > > > >> >> >>>>>> face > > > > > > > > > > > >> >> >>>>>>>>>>> with > > > > > > > > > > > >> >> >>>>>>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> Zeppelin integration are also > > > > > affecting > > > > > > > > other > > > > > > > > > > > >> >> >>>>> downstream > > > > > > > > > > > >> >> >>>>>>>>>>> projects, > > > > > > > > > > > >> >> >>>>>>>>>>>>>> like > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> Beam. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> We just enabled the savepoint > > > > restore > > > > > > option > > > > > > > > > in > > > > > > > > > > > >> >> >>>>>>>>>>>>>> RemoteStreamEnvironment > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> [1] > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> and that was more difficult > > than it > > > > > > should > > > > > > > > be. > > > > > > > > > > > >> >> >> The > > > > > > > > > > > >> >> >>>>> main > > > > > > > > > > > >> >> >>>>>>>> issue > > > > > > > > > > > >> >> >>>>>>>>>>> is > > > > > > > > > > > >> >> >>>>>>>>>>>>> that > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> environment and cluster client > > > > aren't > > > > > > > > > decoupled. > > > > > > > > > > > >> >> >>>>> Ideally > > > > > > > > > > > >> >> >>>>>>> it > > > > > > > > > > > >> >> >>>>>>>>>>> should > > > > > > > > > > > >> >> >>>>>>>>>>>>> be > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> possible to just get the > > matching > > > > > > cluster > > > > > > > > > client > > > > > > > > > > > >> >> >>>> from > > > > > > > > > > > >> >> >>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>> environment > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> and > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> then control the job through > it > > > > > > (environment > > > > > > > > > as > > > > > > > > > > > >> >> >>>>> factory > > > > > > > > > > > >> >> >>>>>>> for > > > > > > > > > > > >> >> >>>>>>>>>>>> cluster > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> client). But note that the > > > > environment > > > > > > > > classes > > > > > > > > > > > >> >> >> are > > > > > > > > > > > >> >> >>>>> part > > > > > > > > > > > >> >> >>>>>> of > > > > > > > > > > > >> >> >>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>> public > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> API, > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> and it is not straightforward > to > > > > make > > > > > > larger > > > > > > > > > > > >> >> >>> changes > > > > > > > > > > > >> >> >>>>>>> without > > > > > > > > > > > >> >> >>>>>>>>>>>>> breaking > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> backward compatibility. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> ClusterClient currently > exposes > > > > > internal > > > > > > > > > classes > > > > > > > > > > > >> >> >>>> like > > > > > > > > > > > >> >> >>>>>>>>>>> JobGraph and > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> StreamGraph. But it should be > > > > possible > > > > > > to > > > > > > > > wrap > > > > > > > > > > > >> >> >>> this > > > > > > > > > > > >> >> >>>>>> with a > > > > > > > > > > > >> >> >>>>>>>> new > > > > > > > > > > > >> >> >>>>>>>>>>>>> public > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> API > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> that brings the required job > > control > > > > > > > > > > > >> >> >> capabilities > > > > > > > > > > > >> >> >>>> for > > > > > > > > > > > >> >> >>>>>>>>>>> downstream > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> projects. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> Perhaps it is helpful to look > at > > > > some > > > > > > of the > > > > > > > > > > > >> >> >>>>> interfaces > > > > > > > > > > > >> >> >>>>>> in > > > > > > > > > > > >> >> >>>>>>>>>>> Beam > > > > > > > > > > > >> >> >>>>>>>>>>>>> while > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> thinking about this: [2] for > the > > > > > > portable > > > > > > > > job > > > > > > > > > > > >> >> >> API > > > > > > > > > > > >> >> >>>> and > > > > > > > > > > > >> >> >>>>>> [3] > > > > > > > > > > > >> >> >>>>>>>> for > > > > > > > > > > > >> >> >>>>>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>> old > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> asynchronous job control from > > the > > > > Beam > > > > > > Java > > > > > > > > > SDK. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> The backward compatibility > > > > discussion > > > > > > [4] is > > > > > > > > > > > >> >> >> also > > > > > > > > > > > >> >> >>>>>> relevant > > > > > > > > > > > >> >> >>>>>>>>>>> here. A > > > > > > > > > > > >> >> >>>>>>>>>>>>> new > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> API > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> should shield downstream > > projects > > > > from > > > > > > > > > internals > > > > > > > > > > > >> >> >>> and > > > > > > > > > > > >> >> >>>>>> allow > > > > > > > > > > > >> >> >>>>>>>>>>> them to > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> interoperate with multiple > > future > > > > > Flink > > > > > > > > > versions > > > > > > > > > > > >> >> >>> in > > > > > > > > > > > >> >> >>>>> the > > > > > > > > > > > >> >> >>>>>>> same > > > > > > > > > > > >> >> >>>>>>>>>>>> release > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> line > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> without forced upgrades. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> Thanks, > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> Thomas > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> [1] > > > > > > > > https://github.com/apache/flink/pull/7249 > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> [2] > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>> > > > > > > > > > > > >> >> >>>>>>> > > > > > > > > > > > >> >> >>>>>> > > > > > > > > > > > >> >> >>>>> > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>> > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> [3] > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>> > > > > > > > > > > > >> >> >>>>>>> > > > > > > > > > > > >> >> >>>>>> > > > > > > > > > > > >> >> >>>>> > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>> > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> [4] > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>> > > > > > > > > > > > >> >> >>>>>>> > > > > > > > > > > > >> >> >>>>>> > > > > > > > > > > > >> >> >>>>> > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>> > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> On Thu, Dec 20, 2018 at 6:15 > PM > > Jeff > > > > > > Zhang < > > > > > > > > > > > >> >> >>>>>>>> zjf...@gmail.com> > > > > > > > > > > > >> >> >>>>>>>>>>>>> wrote: > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> I'm not so sure whether > the > > user > > > > > > should > > > > > > > > be > > > > > > > > > > > >> >> >>> able > > > > > > > > > > > >> >> >>>> to > > > > > > > > > > > >> >> >>>>>>>> define > > > > > > > > > > > >> >> >>>>>>>>>>>> where > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> job > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> runs (in your example Yarn). > > This > > > > is > > > > > > > > actually > > > > > > > > > > > >> >> >>>>>> independent > > > > > > > > > > > >> >> >>>>>>>> of > > > > > > > > > > > >> >> >>>>>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>> job > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> development and is something > > which > > > > is > > > > > > > > decided > > > > > > > > > > > >> >> >> at > > > > > > > > > > > >> >> >>>>>>> deployment > > > > > > > > > > > >> >> >>>>>>>>>>> time. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> User don't need to specify > > > > execution > > > > > > mode > > > > > > > > > > > >> >> >>>>>>> programmatically. > > > > > > > > > > > >> >> >>>>>>>>>>> They > > > > > > > > > > > >> >> >>>>>>>>>>>>> can > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> also > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> pass the execution mode from > > the > > > > > > arguments > > > > > > > > in > > > > > > > > > > > >> >> >>> flink > > > > > > > > > > > >> >> >>>>> run > > > > > > > > > > > >> >> >>>>>>>>>>> command. > > > > > > > > > > > >> >> >>>>>>>>>>>>> e.g. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> bin/flink run -m yarn-cluster > > .... > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> bin/flink run -m local ... > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> bin/flink run -m host:port > ... > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Does this make sense to you ? > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> To me it makes sense that > > the > > > > > > > > > > > >> >> >>>> ExecutionEnvironment > > > > > > > > > > > >> >> >>>>>> is > > > > > > > > > > > >> >> >>>>>>>> not > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> directly > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> initialized by the user and > > instead > > > > > > context > > > > > > > > > > > >> >> >>>> sensitive > > > > > > > > > > > >> >> >>>>>> how > > > > > > > > > > > >> >> >>>>>>>> you > > > > > > > > > > > >> >> >>>>>>>>>>>> want > > > > > > > > > > > >> >> >>>>>>>>>>>>> to > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> execute your job (Flink CLI > vs. > > > > IDE, > > > > > > for > > > > > > > > > > > >> >> >>> example). > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Right, currently I notice > Flink > > > > would > > > > > > > > create > > > > > > > > > > > >> >> >>>>> different > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> ContextExecutionEnvironment > > based > > > > on > > > > > > > > > different > > > > > > > > > > > >> >> >>>>>> submission > > > > > > > > > > > >> >> >>>>>>>>>>>> scenarios > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> (Flink > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Cli vs IDE). To me this is > > kind of > > > > > hack > > > > > > > > > > > >> >> >> approach, > > > > > > > > > > > >> >> >>>> not > > > > > > > > > > > >> >> >>>>>> so > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> straightforward. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> What I suggested above is > that > > is > > > > > that > > > > > > > > flink > > > > > > > > > > > >> >> >>> should > > > > > > > > > > > >> >> >>>>>>> always > > > > > > > > > > > >> >> >>>>>>>>>>> create > > > > > > > > > > > >> >> >>>>>>>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> same > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> ExecutionEnvironment but with > > > > > different > > > > > > > > > > > >> >> >>>>> configuration, > > > > > > > > > > > >> >> >>>>>>> and > > > > > > > > > > > >> >> >>>>>>>>>>> based > > > > > > > > > > > >> >> >>>>>>>>>>>> on > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> configuration it would create > > the > > > > > > proper > > > > > > > > > > > >> >> >>>>> ClusterClient > > > > > > > > > > > >> >> >>>>>>> for > > > > > > > > > > > >> >> >>>>>>>>>>>>> different > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> behaviors. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Till Rohrmann < > > > > trohrm...@apache.org> > > > > > > > > > > > >> >> >>>> 于2018年12月20日周四 > > > > > > > > > > > >> >> >>>>>>>>>>> 下午11:18写道: > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> You are probably right that > we > > > > have > > > > > > code > > > > > > > > > > > >> >> >>>> duplication > > > > > > > > > > > >> >> >>>>>>> when > > > > > > > > > > > >> >> >>>>>>>> it > > > > > > > > > > > >> >> >>>>>>>>>>>> comes > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> to > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> creation of the > ClusterClient. > > > > This > > > > > > should > > > > > > > > > be > > > > > > > > > > > >> >> >>>>> reduced > > > > > > > > > > > >> >> >>>>>> in > > > > > > > > > > > >> >> >>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> future. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> I'm not so sure whether the > > user > > > > > > should be > > > > > > > > > > > >> >> >> able > > > > > > > > > > > >> >> >>> to > > > > > > > > > > > >> >> >>>>>>> define > > > > > > > > > > > >> >> >>>>>>>>>>> where > > > > > > > > > > > >> >> >>>>>>>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> job > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> runs (in your example Yarn). > > This > > > > is > > > > > > > > > actually > > > > > > > > > > > >> >> >>>>>>> independent > > > > > > > > > > > >> >> >>>>>>>>>>> of the > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> job > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> development and is something > > which > > > > > is > > > > > > > > > decided > > > > > > > > > > > >> >> >> at > > > > > > > > > > > >> >> >>>>>>>> deployment > > > > > > > > > > > >> >> >>>>>>>>>>>> time. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> To > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> me > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> it > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> makes sense that the > > > > > > ExecutionEnvironment > > > > > > > > is > > > > > > > > > > > >> >> >> not > > > > > > > > > > > >> >> >>>>>>> directly > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> initialized > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> by > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> the user and instead context > > > > > > sensitive how > > > > > > > > > you > > > > > > > > > > > >> >> >>>> want > > > > > > > > > > > >> >> >>>>> to > > > > > > > > > > > >> >> >>>>>>>>>>> execute > > > > > > > > > > > >> >> >>>>>>>>>>>>> your > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> job > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> (Flink CLI vs. IDE, for > > example). > > > > > > > > However, I > > > > > > > > > > > >> >> >>> agree > > > > > > > > > > > >> >> >>>>>> that > > > > > > > > > > > >> >> >>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> ExecutionEnvironment should > > give > > > > you > > > > > > > > access > > > > > > > > > to > > > > > > > > > > > >> >> >>> the > > > > > > > > > > > >> >> >>>>>>>>>>> ClusterClient > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> and > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> to > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> job (maybe in the form of > the > > > > > > JobGraph or > > > > > > > > a > > > > > > > > > > > >> >> >> job > > > > > > > > > > > >> >> >>>>> plan). > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> Cheers, > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> Till > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> On Thu, Dec 13, 2018 at 4:36 > > AM > > > > Jeff > > > > > > > > Zhang < > > > > > > > > > > > >> >> >>>>>>>>>>> zjf...@gmail.com> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> wrote: > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Hi Till, > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. > You > > are > > > > > > right > > > > > > > > > that I > > > > > > > > > > > >> >> >>>>> expect > > > > > > > > > > > >> >> >>>>>>>> better > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> programmatic > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> job submission/control api > > which > > > > > > could be > > > > > > > > > > > >> >> >> used > > > > > > > > > > > >> >> >>> by > > > > > > > > > > > >> >> >>>>>>>>>>> downstream > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> project. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> And > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> it would benefit for the > > flink > > > > > > ecosystem. > > > > > > > > > > > >> >> >> When > > > > > > > > > > > >> >> >>> I > > > > > > > > > > > >> >> >>>>> look > > > > > > > > > > > >> >> >>>>>>> at > > > > > > > > > > > >> >> >>>>>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>> code > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> of > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> flink > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> scala-shell and sql-client > (I > > > > > believe > > > > > > > > they > > > > > > > > > > > >> >> >> are > > > > > > > > > > > >> >> >>>> not > > > > > > > > > > > >> >> >>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>> core of > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> flink, > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> but > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> belong to the ecosystem of > > > > flink), > > > > > I > > > > > > find > > > > > > > > > > > >> >> >> many > > > > > > > > > > > >> >> >>>>>>> duplicated > > > > > > > > > > > >> >> >>>>>>>>>>> code > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> for > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> creating > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> ClusterClient from user > > provided > > > > > > > > > > > >> >> >> configuration > > > > > > > > > > > >> >> >>>>>>>>>>> (configuration > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> format > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> may > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> be > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> different from scala-shell > > and > > > > > > > > sql-client) > > > > > > > > > > > >> >> >> and > > > > > > > > > > > >> >> >>>> then > > > > > > > > > > > >> >> >>>>>> use > > > > > > > > > > > >> >> >>>>>>>>>>> that > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> ClusterClient > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> to manipulate jobs. I don't > > think > > > > > > this is > > > > > > > > > > > >> >> >>>>> convenient > > > > > > > > > > > >> >> >>>>>>> for > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> downstream > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> projects. What I expect is > > that > > > > > > > > downstream > > > > > > > > > > > >> >> >>>> project > > > > > > > > > > > >> >> >>>>>> only > > > > > > > > > > > >> >> >>>>>>>>>>> needs > > > > > > > > > > > >> >> >>>>>>>>>>>> to > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> provide > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> necessary configuration > info > > > > (maybe > > > > > > > > > > > >> >> >> introducing > > > > > > > > > > > >> >> >>>>> class > > > > > > > > > > > >> >> >>>>>>>>>>>> FlinkConf), > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> and > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> then > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> build ExecutionEnvironment > > based > > > > on > > > > > > this > > > > > > > > > > > >> >> >>>> FlinkConf, > > > > > > > > > > > >> >> >>>>>> and > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment will > > create > > > > > the > > > > > > > > proper > > > > > > > > > > > >> >> >>>>>>>> ClusterClient. > > > > > > > > > > > >> >> >>>>>>>>>>> It > > > > > > > > > > > >> >> >>>>>>>>>>>> not > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> only > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> benefit for the downstream > > > > project > > > > > > > > > > > >> >> >> development > > > > > > > > > > > >> >> >>>> but > > > > > > > > > > > >> >> >>>>>> also > > > > > > > > > > > >> >> >>>>>>>> be > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> helpful > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> for > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> their integration test with > > > > flink. > > > > > > Here's > > > > > > > > > one > > > > > > > > > > > >> >> >>>>> sample > > > > > > > > > > > >> >> >>>>>>> code > > > > > > > > > > > >> >> >>>>>>>>>>>> snippet > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> that > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> I > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> expect. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val conf = new > > > > > > FlinkConf().mode("yarn") > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val env = new > > > > > > ExecutionEnvironment(conf) > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val jobId = env.submit(...) > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> val jobStatus = > > > > > > > > > > > >> >> >>>>>>>>>>> > > > > env.getClusterClient().queryJobStatus(jobId) > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > > > env.getClusterClient().cancelJob(jobId) > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> What do you think ? > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Till Rohrmann < > > > > > trohrm...@apache.org> > > > > > > > > > > > >> >> >>>>> 于2018年12月11日周二 > > > > > > > > > > > >> >> >>>>>>>>>>> 下午6:28写道: > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Hi Jeff, > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> what you are proposing is > to > > > > > > provide the > > > > > > > > > > > >> >> >> user > > > > > > > > > > > >> >> >>>> with > > > > > > > > > > > >> >> >>>>>>>> better > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> programmatic > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> job > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> control. There was > actually > > an > > > > > > effort to > > > > > > > > > > > >> >> >>> achieve > > > > > > > > > > > >> >> >>>>>> this > > > > > > > > > > > >> >> >>>>>>>> but > > > > > > > > > > > >> >> >>>>>>>>>>> it > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> has > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> never > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> been > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> completed [1]. However, > > there > > > > are > > > > > > some > > > > > > > > > > > >> >> >>>> improvement > > > > > > > > > > > >> >> >>>>>> in > > > > > > > > > > > >> >> >>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>> code > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> base > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> now. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Look for example at the > > > > > > NewClusterClient > > > > > > > > > > > >> >> >>>> interface > > > > > > > > > > > >> >> >>>>>>> which > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> offers a > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> non-blocking job > submission. > > > > But I > > > > > > agree > > > > > > > > > > > >> >> >> that > > > > > > > > > > > >> >> >>> we > > > > > > > > > > > >> >> >>>>>> need > > > > > > > > > > > >> >> >>>>>>> to > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> improve > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Flink > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> in > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> this regard. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> I would not be in favour > if > > > > > > exposing all > > > > > > > > > > > >> >> >>>>>> ClusterClient > > > > > > > > > > > >> >> >>>>>>>>>>> calls > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> via > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment > > because it > > > > > > would > > > > > > > > > > > >> >> >> clutter > > > > > > > > > > > >> >> >>>> the > > > > > > > > > > > >> >> >>>>>>> class > > > > > > > > > > > >> >> >>>>>>>>>>> and > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> would > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> not > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> be > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> a > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> good separation of > concerns. > > > > > > Instead one > > > > > > > > > > > >> >> >> idea > > > > > > > > > > > >> >> >>>>> could > > > > > > > > > > > >> >> >>>>>> be > > > > > > > > > > > >> >> >>>>>>>> to > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> retrieve > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> current ClusterClient from > > the > > > > > > > > > > > >> >> >>>>> ExecutionEnvironment > > > > > > > > > > > >> >> >>>>>>>> which > > > > > > > > > > > >> >> >>>>>>>>>>> can > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> then > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> be > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> used > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> for cluster and job > > control. But > > > > > > before > > > > > > > > we > > > > > > > > > > > >> >> >>> start > > > > > > > > > > > >> >> >>>>> an > > > > > > > > > > > >> >> >>>>>>>> effort > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> here, > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> we > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> need > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> to > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> agree and capture what > > > > > > functionality we > > > > > > > > > want > > > > > > > > > > > >> >> >>> to > > > > > > > > > > > >> >> >>>>>>> provide. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Initially, the idea was > > that we > > > > > > have the > > > > > > > > > > > >> >> >>>>>>>> ClusterDescriptor > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> describing > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> how > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> to talk to cluster manager > > like > > > > > > Yarn or > > > > > > > > > > > >> >> >> Mesos. > > > > > > > > > > > >> >> >>>> The > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> ClusterDescriptor > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> can > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> be > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> used for deploying Flink > > > > clusters > > > > > > (job > > > > > > > > and > > > > > > > > > > > >> >> >>>>> session) > > > > > > > > > > > >> >> >>>>>>> and > > > > > > > > > > > >> >> >>>>>>>>>>> gives > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> you a > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> ClusterClient. The > > ClusterClient > > > > > > > > controls > > > > > > > > > > > >> >> >> the > > > > > > > > > > > >> >> >>>>>> cluster > > > > > > > > > > > >> >> >>>>>>>>>>> (e.g. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> submitting > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> jobs, listing all running > > jobs). > > > > > And > > > > > > > > then > > > > > > > > > > > >> >> >>> there > > > > > > > > > > > >> >> >>>>> was > > > > > > > > > > > >> >> >>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>> idea > > > > > > > > > > > >> >> >>>>>>>>>>>> to > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> introduce a > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> JobClient which you obtain > > from > > > > > the > > > > > > > > > > > >> >> >>>> ClusterClient > > > > > > > > > > > >> >> >>>>> to > > > > > > > > > > > >> >> >>>>>>>>>>> trigger > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> job > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> specific > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> operations (e.g. taking a > > > > > savepoint, > > > > > > > > > > > >> >> >>> cancelling > > > > > > > > > > > >> >> >>>>> the > > > > > > > > > > > >> >> >>>>>>>> job). > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> [1] > > > > > > > > > > > >> >> >>>>>> > > > > https://issues.apache.org/jira/browse/FLINK-4272 > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Cheers, > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Till > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> On Tue, Dec 11, 2018 at > > 10:13 AM > > > > > > Jeff > > > > > > > > > Zhang > > > > > > > > > > > >> >> >> < > > > > > > > > > > > >> >> >>>>>>>>>>> zjf...@gmail.com > > > > > > > > > > > >> >> >>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> wrote: > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Hi Folks, > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> I am trying to integrate > > flink > > > > > into > > > > > > > > > apache > > > > > > > > > > > >> >> >>>>> zeppelin > > > > > > > > > > > >> >> >>>>>>>>>>> which is > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> an > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> interactive > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> notebook. And I hit > several > > > > > issues > > > > > > that > > > > > > > > > is > > > > > > > > > > > >> >> >>>> caused > > > > > > > > > > > >> >> >>>>>> by > > > > > > > > > > > >> >> >>>>>>>>>>> flink > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> client > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> api. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> So > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> I'd like to proposal the > > > > > following > > > > > > > > > changes > > > > > > > > > > > >> >> >>> for > > > > > > > > > > > >> >> >>>>>> flink > > > > > > > > > > > >> >> >>>>>>>>>>> client > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> api. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 1. Support nonblocking > > > > execution. > > > > > > > > > > > >> >> >> Currently, > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > ExecutionEnvironment#execute > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> is a blocking method > which > > > > would > > > > > > do 2 > > > > > > > > > > > >> >> >> things, > > > > > > > > > > > >> >> >>>>> first > > > > > > > > > > > >> >> >>>>>>>>>>> submit > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> job > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> and > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> then > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> wait for job until it is > > > > > finished. > > > > > > I'd > > > > > > > > > like > > > > > > > > > > > >> >> >>>>>>> introduce a > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> nonblocking > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> execution method like > > > > > > > > > > > >> >> >>>> ExecutionEnvironment#submit > > > > > > > > > > > >> >> >>>>>>> which > > > > > > > > > > > >> >> >>>>>>>>>>> only > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> submit > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> job > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> and > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> then return jobId to > > client. > > > > And > > > > > > allow > > > > > > > > > user > > > > > > > > > > > >> >> >>> to > > > > > > > > > > > >> >> >>>>>> query > > > > > > > > > > > >> >> >>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>> job > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> status > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> via > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> jobId. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 2. Add cancel api in > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >> > ExecutionEnvironment/StreamExecutionEnvironment, > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> currently the only way to > > > > cancel > > > > > > job is > > > > > > > > > via > > > > > > > > > > > >> >> >>> cli > > > > > > > > > > > >> >> >>>>>>>>>>> (bin/flink), > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> this > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> is > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> not > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> convenient for downstream > > > > project > > > > > > to > > > > > > > > use > > > > > > > > > > > >> >> >> this > > > > > > > > > > > >> >> >>>>>>> feature. > > > > > > > > > > > >> >> >>>>>>>>>>> So I'd > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> like > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> to > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> add > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> cancel api in > > > > > ExecutionEnvironment > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 3. Add savepoint api in > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>> > > ExecutionEnvironment/StreamExecutionEnvironment. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> It > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> is similar as cancel api, > > we > > > > > > should use > > > > > > > > > > > >> >> >>>>>>>>>>> ExecutionEnvironment > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> as > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> unified > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> api for third party to > > > > integrate > > > > > > with > > > > > > > > > > > >> >> >> flink. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 4. Add listener for job > > > > execution > > > > > > > > > > > >> >> >> lifecycle. > > > > > > > > > > > >> >> >>>>>>> Something > > > > > > > > > > > >> >> >>>>>>>>>>> like > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> following, > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> so > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> that downstream project > > can do > > > > > > custom > > > > > > > > > logic > > > > > > > > > > > >> >> >>> in > > > > > > > > > > > >> >> >>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>> lifecycle > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> of > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> job. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> e.g. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Zeppelin would capture > the > > > > jobId > > > > > > after > > > > > > > > > job > > > > > > > > > > > >> >> >> is > > > > > > > > > > > >> >> >>>>>>> submitted > > > > > > > > > > > >> >> >>>>>>>>>>> and > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> then > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> use > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> this > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> jobId to cancel it later > > when > > > > > > > > necessary. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> public interface > > JobListener { > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> void > onJobSubmitted(JobID > > > > > jobId); > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> void > > > > > > onJobExecuted(JobExecutionResult > > > > > > > > > > > >> >> >>>>> jobResult); > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> void onJobCanceled(JobID > > > > jobId); > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> } > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 5. Enable session in > > > > > > > > > ExecutionEnvironment. > > > > > > > > > > > >> >> >>>>>> Currently > > > > > > > > > > > >> >> >>>>>>> it > > > > > > > > > > > >> >> >>>>>>>>>>> is > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> disabled, > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> but > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> session is very > convenient > > for > > > > > > third > > > > > > > > > party > > > > > > > > > > > >> >> >> to > > > > > > > > > > > >> >> >>>>>>>> submitting > > > > > > > > > > > >> >> >>>>>>>>>>> jobs > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> continually. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> I hope flink can enable > it > > > > again. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 6. Unify all flink client > > api > > > > > into > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>> > > ExecutionEnvironment/StreamExecutionEnvironment. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> This is a long term issue > > which > > > > > > needs > > > > > > > > > more > > > > > > > > > > > >> >> >>>>> careful > > > > > > > > > > > >> >> >>>>>>>>>>> thinking > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> and > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> design. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Currently some of > features > > of > > > > > > flink is > > > > > > > > > > > >> >> >>> exposed > > > > > > > > > > > >> >> >>>> in > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>> > > ExecutionEnvironment/StreamExecutionEnvironment, > > > > > > > > > > > >> >> >>>>>> but > > > > > > > > > > > >> >> >>>>>>>>>>> some are > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> exposed > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> in > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> cli instead of api, like > > the > > > > > > cancel and > > > > > > > > > > > >> >> >>>>> savepoint I > > > > > > > > > > > >> >> >>>>>>>>>>> mentioned > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> above. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> I > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> think the root cause is > > due to > > > > > that > > > > > > > > flink > > > > > > > > > > > >> >> >>>> didn't > > > > > > > > > > > >> >> >>>>>>> unify > > > > > > > > > > > >> >> >>>>>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> interaction > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> with > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> flink. Here I list 3 > > scenarios > > > > of > > > > > > flink > > > > > > > > > > > >> >> >>>> operation > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> - Local job execution. > > Flink > > > > > will > > > > > > > > > create > > > > > > > > > > > >> >> >>>>>>>>>>> LocalEnvironment > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> and > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> then > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> use > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> this LocalEnvironment to > > > > create > > > > > > > > > > > >> >> >>> LocalExecutor > > > > > > > > > > > >> >> >>>>> for > > > > > > > > > > > >> >> >>>>>>> job > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> execution. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> - Remote job execution. > > Flink > > > > > will > > > > > > > > > create > > > > > > > > > > > >> >> >>>>>>>> ClusterClient > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> first > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> and > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> then > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> create > ContextEnvironment > > > > based > > > > > > on the > > > > > > > > > > > >> >> >>>>>>> ClusterClient > > > > > > > > > > > >> >> >>>>>>>>>>> and > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> then > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> run > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> job. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> - Job cancelation. Flink > > will > > > > > > create > > > > > > > > > > > >> >> >>>>>> ClusterClient > > > > > > > > > > > >> >> >>>>>>>>>>> first > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> and > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> then > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> cancel > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> this job via this > > > > ClusterClient. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> As you can see in the > > above 3 > > > > > > > > scenarios. > > > > > > > > > > > >> >> >>> Flink > > > > > > > > > > > >> >> >>>>>> didn't > > > > > > > > > > > >> >> >>>>>>>>>>> use the > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> same > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> approach(code path) to > > interact > > > > > > with > > > > > > > > > flink > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> What I propose is > > following: > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Create the proper > > > > > > > > > > > >> >> >>>>>> LocalEnvironment/RemoteEnvironment > > > > > > > > > > > >> >> >>>>>>>>>>> (based > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> on > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> user > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> configuration) --> Use > this > > > > > > Environment > > > > > > > > > to > > > > > > > > > > > >> >> >>>> create > > > > > > > > > > > >> >> >>>>>>>> proper > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> ClusterClient > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> (LocalClusterClient or > > > > > > > > RestClusterClient) > > > > > > > > > > > >> >> >> to > > > > > > > > > > > >> >> >>>>>>>> interactive > > > > > > > > > > > >> >> >>>>>>>>>>> with > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Flink ( > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> job > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> execution or cancelation) > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> This way we can unify the > > > > process > > > > > > of > > > > > > > > > local > > > > > > > > > > > >> >> >>>>>> execution > > > > > > > > > > > >> >> >>>>>>>> and > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> remote > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> execution. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> And it is much easier for > > third > > > > > > party > > > > > > > > to > > > > > > > > > > > >> >> >>>>> integrate > > > > > > > > > > > >> >> >>>>>>> with > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> flink, > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> because > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment is > the > > > > > unified > > > > > > > > entry > > > > > > > > > > > >> >> >>> point > > > > > > > > > > > >> >> >>>>> for > > > > > > > > > > > >> >> >>>>>>>>>>> flink. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> What > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> third > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> party > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> needs to do is just pass > > > > > > configuration > > > > > > > > to > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> ExecutionEnvironment > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> and > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> ExecutionEnvironment will > > do > > > > the > > > > > > right > > > > > > > > > > > >> >> >> thing > > > > > > > > > > > >> >> >>>>> based > > > > > > > > > > > >> >> >>>>>> on > > > > > > > > > > > >> >> >>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> configuration. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Flink cli can also be > > > > considered > > > > > as > > > > > > > > flink > > > > > > > > > > > >> >> >> api > > > > > > > > > > > >> >> >>>>>>> consumer. > > > > > > > > > > > >> >> >>>>>>>>>>> it > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> just > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> pass > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> the > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> configuration to > > > > > > ExecutionEnvironment > > > > > > > > and > > > > > > > > > > > >> >> >> let > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> ExecutionEnvironment > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> to > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> create the proper > > ClusterClient > > > > > > instead > > > > > > > > > of > > > > > > > > > > > >> >> >>>>> letting > > > > > > > > > > > >> >> >>>>>>> cli > > > > > > > > > > > >> >> >>>>>>>> to > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> create > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> ClusterClient directly. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 6 would involve large > code > > > > > > refactoring, > > > > > > > > > so > > > > > > > > > > > >> >> >> I > > > > > > > > > > > >> >> >>>>> think > > > > > > > > > > > >> >> >>>>>> we > > > > > > > > > > > >> >> >>>>>>>> can > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> defer > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> it > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> for > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> future release, 1,2,3,4,5 > > could > > > > > be > > > > > > done > > > > > > > > > at > > > > > > > > > > > >> >> >>>> once I > > > > > > > > > > > >> >> >>>>>>>>>>> believe. > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> Let > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> me > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> know > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> your > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> comments and feedback, > > thanks > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> -- > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Best Regards > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Jeff Zhang > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> -- > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Best Regards > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Jeff Zhang > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> -- > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Best Regards > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> Jeff Zhang > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> -- > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> Best Regards > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> Jeff Zhang > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>> -- > > > > > > > > > > > >> >> >>>>>>>>>>>>>>> Best Regards > > > > > > > > > > > >> >> >>>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>>> Jeff Zhang > > > > > > > > > > > >> >> >>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>> -- > > > > > > > > > > > >> >> >>>>>>>>>>>>> Best Regards > > > > > > > > > > > >> >> >>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>>> Jeff Zhang > > > > > > > > > > > >> >> >>>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>>>> > > > > > > > > > > > >> >> >>>>>>>> > > > > > > > > > > > >> >> >>>>>>>> -- > > > > > > > > > > > >> >> >>>>>>>> Best Regards > > > > > > > > > > > >> >> >>>>>>>> > > > > > > > > > > > >> >> >>>>>>>> Jeff Zhang > > > > > > > > > > > >> >> >>>>>>>> > > > > > > > > > > > >> >> >>>>>>> > > > > > > > > > > > >> >> >>>>>> > > > > > > > > > > > >> >> >>>>> > > > > > > > > > > > >> >> >>>>> > > > > > > > > > > > >> >> >>>>> -- > > > > > > > > > > > >> >> >>>>> Best Regards > > > > > > > > > > > >> >> >>>>> > > > > > > > > > > > >> >> >>>>> Jeff Zhang > > > > > > > > > > > >> >> >>>>> > > > > > > > > > > > >> >> >>>> > > > > > > > > > > > >> >> >>> > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > -- > > > > > > > > > > > >> >> > Best Regards > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > Jeff Zhang > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > Best Regards > > Jeff Zhang >