Hi Folks,

I am trying to integrate flink into apache zeppelin which is an interactive
notebook. And I hit several issues that is caused by flink client api. So
I'd like to proposal the following changes for flink client api.

1. Support nonblocking execution. Currently, ExecutionEnvironment#execute
is a blocking method which would do 2 things, first submit job and then
wait for job until it is finished. I'd like introduce a nonblocking
execution method like ExecutionEnvironment#submit which only submit job and
then return jobId to client. And allow user to query the job status via the
jobId.

2. Add cancel api in ExecutionEnvironment/StreamExecutionEnvironment,
currently the only way to cancel job is via cli (bin/flink), this is not
convenient for downstream project to use this feature. So I'd like to add
cancel api in ExecutionEnvironment

3. Add savepoint api in ExecutionEnvironment/StreamExecutionEnvironment. It
is similar as cancel api, we should use ExecutionEnvironment as the unified
api for third party to integrate with flink.

4. Add listener for job execution lifecycle. Something like following, so
that downstream project can do custom logic in the lifecycle of job. e.g.
Zeppelin would capture the jobId after job is submitted and then use this
jobId to cancel it later when necessary.

public interface JobListener {

   void onJobSubmitted(JobID jobId);

   void onJobExecuted(JobExecutionResult jobResult);

   void onJobCanceled(JobID jobId);
}

5. Enable session in ExecutionEnvironment. Currently it is disabled, but
session is very convenient for third party to submitting jobs continually.
I hope flink can enable it again.

6. Unify all flink client api into
ExecutionEnvironment/StreamExecutionEnvironment.

This is a long term issue which needs more careful thinking and design.
Currently some of features of flink is exposed in
ExecutionEnvironment/StreamExecutionEnvironment, but some are exposed in
cli instead of api, like the cancel and savepoint I mentioned above. I
think the root cause is due to that flink didn't unify the interaction with
flink. Here I list 3 scenarios of flink operation

   - Local job execution.  Flink will create LocalEnvironment and then use
   this LocalEnvironment to create LocalExecutor for job execution.
   - Remote job execution. Flink will create ClusterClient first and then
   create ContextEnvironment based on the ClusterClient and then run the job.
   - Job cancelation. Flink will create ClusterClient first and then cancel
   this job via this ClusterClient.

As you can see in the above 3 scenarios. Flink didn't use the same
approach(code path) to interact with flink
What I propose is following:
Create the proper LocalEnvironment/RemoteEnvironment (based on user
configuration) --> Use this Environment to create proper ClusterClient
(LocalClusterClient or RestClusterClient) to interactive with Flink ( job
execution or cancelation)

This way we can unify the process of local execution and remote execution.
And it is much easier for third party to integrate with flink, because
ExecutionEnvironment is the unified entry point for flink. What third party
needs to do is just pass configuration to ExecutionEnvironment and
ExecutionEnvironment will do the right thing based on the configuration.
Flink cli can also be considered as flink api consumer. it just pass the
configuration to ExecutionEnvironment and let ExecutionEnvironment to
create the proper ClusterClient instead of letting cli to create
ClusterClient directly.


6 would involve large code refactoring, so I think we can defer it for
future release, 1,2,3,4,5 could be done at once I believe. Let me know your
comments and feedback, thanks



-- 
Best Regards

Jeff Zhang

Reply via email to