Hi Till, Thanks for the feedback. You are right that I expect better programmatic job submission/control api which could be used by downstream project. And it would benefit for the flink ecosystem. When I look at the code of flink scala-shell and sql-client (I believe they are not the core of flink, but belong to the ecosystem of flink), I find many duplicated code for creating ClusterClient from user provided configuration (configuration format may be different from scala-shell and sql-client) and then use that ClusterClient to manipulate jobs. I don't think this is convenient for downstream projects. What I expect is that downstream project only needs to provide necessary configuration info (maybe introducing class FlinkConf), and then build ExecutionEnvironment based on this FlinkConf, and ExecutionEnvironment will create the proper ClusterClient. It not only benefit for the downstream project development but also be helpful for their integration test with flink. Here's one sample code snippet that I expect.
val conf = new FlinkConf().mode("yarn") val env = new ExecutionEnvironment(conf) val jobId = env.submit(...) val jobStatus = env.getClusterClient().queryJobStatus(jobId) env.getClusterClient().cancelJob(jobId) What do you think ? Till Rohrmann <trohrm...@apache.org> 于2018年12月11日周二 下午6:28写道: > Hi Jeff, > > what you are proposing is to provide the user with better programmatic job > control. There was actually an effort to achieve this but it has never been > completed [1]. However, there are some improvement in the code base now. > Look for example at the NewClusterClient interface which offers a > non-blocking job submission. But I agree that we need to improve Flink in > this regard. > > I would not be in favour if exposing all ClusterClient calls via the > ExecutionEnvironment because it would clutter the class and would not be a > good separation of concerns. Instead one idea could be to retrieve the > current ClusterClient from the ExecutionEnvironment which can then be used > for cluster and job control. But before we start an effort here, we need to > agree and capture what functionality we want to provide. > > Initially, the idea was that we have the ClusterDescriptor describing how > to talk to cluster manager like Yarn or Mesos. The ClusterDescriptor can be > used for deploying Flink clusters (job and session) and gives you a > ClusterClient. The ClusterClient controls the cluster (e.g. submitting > jobs, listing all running jobs). And then there was the idea to introduce a > JobClient which you obtain from the ClusterClient to trigger job specific > operations (e.g. taking a savepoint, cancelling the job). > > [1] https://issues.apache.org/jira/browse/FLINK-4272 > > Cheers, > Till > > On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang <zjf...@gmail.com> wrote: > > > Hi Folks, > > > > I am trying to integrate flink into apache zeppelin which is an > interactive > > notebook. And I hit several issues that is caused by flink client api. So > > I'd like to proposal the following changes for flink client api. > > > > 1. Support nonblocking execution. Currently, ExecutionEnvironment#execute > > is a blocking method which would do 2 things, first submit job and then > > wait for job until it is finished. I'd like introduce a nonblocking > > execution method like ExecutionEnvironment#submit which only submit job > and > > then return jobId to client. And allow user to query the job status via > the > > jobId. > > > > 2. Add cancel api in ExecutionEnvironment/StreamExecutionEnvironment, > > currently the only way to cancel job is via cli (bin/flink), this is not > > convenient for downstream project to use this feature. So I'd like to add > > cancel api in ExecutionEnvironment > > > > 3. Add savepoint api in ExecutionEnvironment/StreamExecutionEnvironment. > It > > is similar as cancel api, we should use ExecutionEnvironment as the > unified > > api for third party to integrate with flink. > > > > 4. Add listener for job execution lifecycle. Something like following, so > > that downstream project can do custom logic in the lifecycle of job. e.g. > > Zeppelin would capture the jobId after job is submitted and then use this > > jobId to cancel it later when necessary. > > > > public interface JobListener { > > > > void onJobSubmitted(JobID jobId); > > > > void onJobExecuted(JobExecutionResult jobResult); > > > > void onJobCanceled(JobID jobId); > > } > > > > 5. Enable session in ExecutionEnvironment. Currently it is disabled, but > > session is very convenient for third party to submitting jobs > continually. > > I hope flink can enable it again. > > > > 6. Unify all flink client api into > > ExecutionEnvironment/StreamExecutionEnvironment. > > > > This is a long term issue which needs more careful thinking and design. > > Currently some of features of flink is exposed in > > ExecutionEnvironment/StreamExecutionEnvironment, but some are exposed in > > cli instead of api, like the cancel and savepoint I mentioned above. I > > think the root cause is due to that flink didn't unify the interaction > with > > flink. Here I list 3 scenarios of flink operation > > > > - Local job execution. Flink will create LocalEnvironment and then > use > > this LocalEnvironment to create LocalExecutor for job execution. > > - Remote job execution. Flink will create ClusterClient first and then > > create ContextEnvironment based on the ClusterClient and then run the > > job. > > - Job cancelation. Flink will create ClusterClient first and then > cancel > > this job via this ClusterClient. > > > > As you can see in the above 3 scenarios. Flink didn't use the same > > approach(code path) to interact with flink > > What I propose is following: > > Create the proper LocalEnvironment/RemoteEnvironment (based on user > > configuration) --> Use this Environment to create proper ClusterClient > > (LocalClusterClient or RestClusterClient) to interactive with Flink ( job > > execution or cancelation) > > > > This way we can unify the process of local execution and remote > execution. > > And it is much easier for third party to integrate with flink, because > > ExecutionEnvironment is the unified entry point for flink. What third > party > > needs to do is just pass configuration to ExecutionEnvironment and > > ExecutionEnvironment will do the right thing based on the configuration. > > Flink cli can also be considered as flink api consumer. it just pass the > > configuration to ExecutionEnvironment and let ExecutionEnvironment to > > create the proper ClusterClient instead of letting cli to create > > ClusterClient directly. > > > > > > 6 would involve large code refactoring, so I think we can defer it for > > future release, 1,2,3,4,5 could be done at once I believe. Let me know > your > > comments and feedback, thanks > > > > > > > > -- > > Best Regards > > > > Jeff Zhang > > > -- Best Regards Jeff Zhang