Hi Folks, I am trying to integrate flink into apache zeppelin which is an interactive notebook. And I hit several issues that is caused by flink client api. So I'd like to proposal the following changes for flink client api.
1. Support nonblocking execution. Currently, ExecutionEnvironment#execute is a blocking method which would do 2 things, first submit job and then wait for job until it is finished. I'd like introduce a nonblocking execution method like ExecutionEnvironment#submit which only submit job and then return jobId to client. And allow user to query the job status via the jobId. 2. Add cancel api in ExecutionEnvironment/StreamExecutionEnvironment, currently the only way to cancel job is via cli (bin/flink), this is not convenient for downstream project to use this feature. So I'd like to add cancel api in ExecutionEnvironment 3. Add savepoint api in ExecutionEnvironment/StreamExecutionEnvironment. It is similar as cancel api, we should use ExecutionEnvironment as the unified api for third party to integrate with flink. 4. Add listener for job execution lifecycle. Something like following, so that downstream project can do custom logic in the lifecycle of job. e.g. Zeppelin would capture the jobId after job is submitted and then use this jobId to cancel it later when necessary. public interface JobListener { void onJobSubmitted(JobID jobId); void onJobExecuted(JobExecutionResult jobResult); void onJobCanceled(JobID jobId); } 5. Enable session in ExecutionEnvironment. Currently it is disabled, but session is very convenient for third party to submitting jobs continually. I hope flink can enable it again. 6. Unify all flink client api into ExecutionEnvironment/StreamExecutionEnvironment. This is a long term issue which needs more careful thinking and design. Currently some of features of flink is exposed in ExecutionEnvironment/StreamExecutionEnvironment, but some are exposed in cli instead of api, like the cancel and savepoint I mentioned above. I think the root cause is due to that flink didn't unify the interaction with flink. Here I list 3 scenarios of flink operation - Local job execution. Flink will create LocalEnvironment and then use this LocalEnvironment to create LocalExecutor for job execution. - Remote job execution. Flink will create ClusterClient first and then create ContextEnvironment based on the ClusterClient and then run the job. - Job cancelation. Flink will create ClusterClient first and then cancel this job via this ClusterClient. As you can see in the above 3 scenarios. Flink didn't use the same approach(code path) to interact with flink What I propose is following: Create the proper LocalEnvironment/RemoteEnvironment (based on user configuration) --> Use this Environment to create proper ClusterClient (LocalClusterClient or RestClusterClient) to interactive with Flink ( job execution or cancelation) This way we can unify the process of local execution and remote execution. And it is much easier for third party to integrate with flink, because ExecutionEnvironment is the unified entry point for flink. What third party needs to do is just pass configuration to ExecutionEnvironment and ExecutionEnvironment will do the right thing based on the configuration. Flink cli can also be considered as flink api consumer. it just pass the configuration to ExecutionEnvironment and let ExecutionEnvironment to create the proper ClusterClient instead of letting cli to create ClusterClient directly. 6 would involve large code refactoring, so I think we can defer it for future release, 1,2,3,4,5 could be done at once I believe. Let me know your comments and feedback, thanks -- Best Regards Jeff Zhang