You are probably right that we have code duplication when it comes to the creation of the ClusterClient. This should be reduced in the future.
I'm not so sure whether the user should be able to define where the job runs (in your example Yarn). This is actually independent of the job development and is something which is decided at deployment time. To me it makes sense that the ExecutionEnvironment is not directly initialized by the user and instead context sensitive how you want to execute your job (Flink CLI vs. IDE, for example). However, I agree that the ExecutionEnvironment should give you access to the ClusterClient and to the job (maybe in the form of the JobGraph or a job plan). Cheers, Till On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang <zjf...@gmail.com> wrote: > Hi Till, > Thanks for the feedback. You are right that I expect better programmatic > job submission/control api which could be used by downstream project. And > it would benefit for the flink ecosystem. When I look at the code of flink > scala-shell and sql-client (I believe they are not the core of flink, but > belong to the ecosystem of flink), I find many duplicated code for creating > ClusterClient from user provided configuration (configuration format may be > different from scala-shell and sql-client) and then use that ClusterClient > to manipulate jobs. I don't think this is convenient for downstream > projects. What I expect is that downstream project only needs to provide > necessary configuration info (maybe introducing class FlinkConf), and then > build ExecutionEnvironment based on this FlinkConf, and > ExecutionEnvironment will create the proper ClusterClient. It not only > benefit for the downstream project development but also be helpful for > their integration test with flink. Here's one sample code snippet that I > expect. > > val conf = new FlinkConf().mode("yarn") > val env = new ExecutionEnvironment(conf) > val jobId = env.submit(...) > val jobStatus = env.getClusterClient().queryJobStatus(jobId) > env.getClusterClient().cancelJob(jobId) > > What do you think ? > > > > > Till Rohrmann <trohrm...@apache.org> 于2018年12月11日周二 下午6:28写道: > > > Hi Jeff, > > > > what you are proposing is to provide the user with better programmatic > job > > control. There was actually an effort to achieve this but it has never > been > > completed [1]. However, there are some improvement in the code base now. > > Look for example at the NewClusterClient interface which offers a > > non-blocking job submission. But I agree that we need to improve Flink in > > this regard. > > > > I would not be in favour if exposing all ClusterClient calls via the > > ExecutionEnvironment because it would clutter the class and would not be > a > > good separation of concerns. Instead one idea could be to retrieve the > > current ClusterClient from the ExecutionEnvironment which can then be > used > > for cluster and job control. But before we start an effort here, we need > to > > agree and capture what functionality we want to provide. > > > > Initially, the idea was that we have the ClusterDescriptor describing how > > to talk to cluster manager like Yarn or Mesos. The ClusterDescriptor can > be > > used for deploying Flink clusters (job and session) and gives you a > > ClusterClient. The ClusterClient controls the cluster (e.g. submitting > > jobs, listing all running jobs). And then there was the idea to > introduce a > > JobClient which you obtain from the ClusterClient to trigger job specific > > operations (e.g. taking a savepoint, cancelling the job). > > > > [1] https://issues.apache.org/jira/browse/FLINK-4272 > > > > Cheers, > > Till > > > > On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang <zjf...@gmail.com> wrote: > > > > > Hi Folks, > > > > > > I am trying to integrate flink into apache zeppelin which is an > > interactive > > > notebook. And I hit several issues that is caused by flink client api. > So > > > I'd like to proposal the following changes for flink client api. > > > > > > 1. Support nonblocking execution. Currently, > ExecutionEnvironment#execute > > > is a blocking method which would do 2 things, first submit job and then > > > wait for job until it is finished. I'd like introduce a nonblocking > > > execution method like ExecutionEnvironment#submit which only submit job > > and > > > then return jobId to client. And allow user to query the job status via > > the > > > jobId. > > > > > > 2. Add cancel api in ExecutionEnvironment/StreamExecutionEnvironment, > > > currently the only way to cancel job is via cli (bin/flink), this is > not > > > convenient for downstream project to use this feature. So I'd like to > add > > > cancel api in ExecutionEnvironment > > > > > > 3. Add savepoint api in > ExecutionEnvironment/StreamExecutionEnvironment. > > It > > > is similar as cancel api, we should use ExecutionEnvironment as the > > unified > > > api for third party to integrate with flink. > > > > > > 4. Add listener for job execution lifecycle. Something like following, > so > > > that downstream project can do custom logic in the lifecycle of job. > e.g. > > > Zeppelin would capture the jobId after job is submitted and then use > this > > > jobId to cancel it later when necessary. > > > > > > public interface JobListener { > > > > > > void onJobSubmitted(JobID jobId); > > > > > > void onJobExecuted(JobExecutionResult jobResult); > > > > > > void onJobCanceled(JobID jobId); > > > } > > > > > > 5. Enable session in ExecutionEnvironment. Currently it is disabled, > but > > > session is very convenient for third party to submitting jobs > > continually. > > > I hope flink can enable it again. > > > > > > 6. Unify all flink client api into > > > ExecutionEnvironment/StreamExecutionEnvironment. > > > > > > This is a long term issue which needs more careful thinking and design. > > > Currently some of features of flink is exposed in > > > ExecutionEnvironment/StreamExecutionEnvironment, but some are exposed > in > > > cli instead of api, like the cancel and savepoint I mentioned above. I > > > think the root cause is due to that flink didn't unify the interaction > > with > > > flink. Here I list 3 scenarios of flink operation > > > > > > - Local job execution. Flink will create LocalEnvironment and then > > use > > > this LocalEnvironment to create LocalExecutor for job execution. > > > - Remote job execution. Flink will create ClusterClient first and > then > > > create ContextEnvironment based on the ClusterClient and then run > the > > > job. > > > - Job cancelation. Flink will create ClusterClient first and then > > cancel > > > this job via this ClusterClient. > > > > > > As you can see in the above 3 scenarios. Flink didn't use the same > > > approach(code path) to interact with flink > > > What I propose is following: > > > Create the proper LocalEnvironment/RemoteEnvironment (based on user > > > configuration) --> Use this Environment to create proper ClusterClient > > > (LocalClusterClient or RestClusterClient) to interactive with Flink ( > job > > > execution or cancelation) > > > > > > This way we can unify the process of local execution and remote > > execution. > > > And it is much easier for third party to integrate with flink, because > > > ExecutionEnvironment is the unified entry point for flink. What third > > party > > > needs to do is just pass configuration to ExecutionEnvironment and > > > ExecutionEnvironment will do the right thing based on the > configuration. > > > Flink cli can also be considered as flink api consumer. it just pass > the > > > configuration to ExecutionEnvironment and let ExecutionEnvironment to > > > create the proper ClusterClient instead of letting cli to create > > > ClusterClient directly. > > > > > > > > > 6 would involve large code refactoring, so I think we can defer it for > > > future release, 1,2,3,4,5 could be done at once I believe. Let me know > > your > > > comments and feedback, thanks > > > > > > > > > > > > -- > > > Best Regards > > > > > > Jeff Zhang > > > > > > > > -- > Best Regards > > Jeff Zhang >