Sorry folks, the design doc is late as you expected. Here's the design doc I drafted, welcome any comments and feedback.
https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing Stephan Ewen <se...@apache.org> 于2019年2月14日周四 下午8:43写道: > Nice that this discussion is happening. > > In the FLIP, we could also revisit the entire role of the environments > again. > > Initially, the idea was: > - the environments take care of the specific setup for standalone (no > setup needed), yarn, mesos, etc. > - the session ones have control over the session. The environment holds > the session client. > - running a job gives a "control" object for that job. That behavior is > the same in all environments. > > The actual implementation diverged quite a bit from that. Happy to see a > discussion about straitening this out a bit more. > > > On Tue, Feb 12, 2019 at 4:58 AM Jeff Zhang <zjf...@gmail.com> wrote: > > > Hi folks, > > > > Sorry for late response, It seems we reach consensus on this, I will > create > > FLIP for this with more detailed design > > > > > > Thomas Weise <t...@apache.org> 于2018年12月21日周五 上午11:43写道: > > > > > Great to see this discussion seeded! The problems you face with the > > > Zeppelin integration are also affecting other downstream projects, like > > > Beam. > > > > > > We just enabled the savepoint restore option in RemoteStreamEnvironment > > [1] > > > and that was more difficult than it should be. The main issue is that > > > environment and cluster client aren't decoupled. Ideally it should be > > > possible to just get the matching cluster client from the environment > and > > > then control the job through it (environment as factory for cluster > > > client). But note that the environment classes are part of the public > > API, > > > and it is not straightforward to make larger changes without breaking > > > backward compatibility. > > > > > > ClusterClient currently exposes internal classes like JobGraph and > > > StreamGraph. But it should be possible to wrap this with a new public > API > > > that brings the required job control capabilities for downstream > > projects. > > > Perhaps it is helpful to look at some of the interfaces in Beam while > > > thinking about this: [2] for the portable job API and [3] for the old > > > asynchronous job control from the Beam Java SDK. > > > > > > The backward compatibility discussion [4] is also relevant here. A new > > API > > > should shield downstream projects from internals and allow them to > > > interoperate with multiple future Flink versions in the same release > line > > > without forced upgrades. > > > > > > Thanks, > > > Thomas > > > > > > [1] https://github.com/apache/flink/pull/7249 > > > [2] > > > > > > > > > https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto > > > [3] > > > > > > > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java > > > [4] > > > > > > > > > https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E > > > > > > > > > On Thu, Dec 20, 2018 at 6:15 PM Jeff Zhang <zjf...@gmail.com> wrote: > > > > > > > >>> I'm not so sure whether the user should be able to define where > the > > > job > > > > runs (in your example Yarn). This is actually independent of the job > > > > development and is something which is decided at deployment time. > > > > > > > > User don't need to specify execution mode programmatically. They can > > also > > > > pass the execution mode from the arguments in flink run command. e.g. > > > > > > > > bin/flink run -m yarn-cluster .... > > > > bin/flink run -m local ... > > > > bin/flink run -m host:port ... > > > > > > > > Does this make sense to you ? > > > > > > > > >>> To me it makes sense that the ExecutionEnvironment is not > directly > > > > initialized by the user and instead context sensitive how you want to > > > > execute your job (Flink CLI vs. IDE, for example). > > > > > > > > Right, currently I notice Flink would create different > > > > ContextExecutionEnvironment based on different submission scenarios > > > (Flink > > > > Cli vs IDE). To me this is kind of hack approach, not so > > straightforward. > > > > What I suggested above is that is that flink should always create the > > > same > > > > ExecutionEnvironment but with different configuration, and based on > the > > > > configuration it would create the proper ClusterClient for different > > > > behaviors. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Till Rohrmann <trohrm...@apache.org> 于2018年12月20日周四 下午11:18写道: > > > > > > > > > You are probably right that we have code duplication when it comes > to > > > the > > > > > creation of the ClusterClient. This should be reduced in the > future. > > > > > > > > > > I'm not so sure whether the user should be able to define where the > > job > > > > > runs (in your example Yarn). This is actually independent of the > job > > > > > development and is something which is decided at deployment time. > To > > me > > > > it > > > > > makes sense that the ExecutionEnvironment is not directly > initialized > > > by > > > > > the user and instead context sensitive how you want to execute your > > job > > > > > (Flink CLI vs. IDE, for example). However, I agree that the > > > > > ExecutionEnvironment should give you access to the ClusterClient > and > > to > > > > the > > > > > job (maybe in the form of the JobGraph or a job plan). > > > > > > > > > > Cheers, > > > > > Till > > > > > > > > > > On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang <zjf...@gmail.com> > wrote: > > > > > > > > > > > Hi Till, > > > > > > Thanks for the feedback. You are right that I expect better > > > > programmatic > > > > > > job submission/control api which could be used by downstream > > project. > > > > And > > > > > > it would benefit for the flink ecosystem. When I look at the code > > of > > > > > flink > > > > > > scala-shell and sql-client (I believe they are not the core of > > flink, > > > > but > > > > > > belong to the ecosystem of flink), I find many duplicated code > for > > > > > creating > > > > > > ClusterClient from user provided configuration (configuration > > format > > > > may > > > > > be > > > > > > different from scala-shell and sql-client) and then use that > > > > > ClusterClient > > > > > > to manipulate jobs. I don't think this is convenient for > downstream > > > > > > projects. What I expect is that downstream project only needs to > > > > provide > > > > > > necessary configuration info (maybe introducing class FlinkConf), > > and > > > > > then > > > > > > build ExecutionEnvironment based on this FlinkConf, and > > > > > > ExecutionEnvironment will create the proper ClusterClient. It not > > > only > > > > > > benefit for the downstream project development but also be > helpful > > > for > > > > > > their integration test with flink. Here's one sample code snippet > > > that > > > > I > > > > > > expect. > > > > > > > > > > > > val conf = new FlinkConf().mode("yarn") > > > > > > val env = new ExecutionEnvironment(conf) > > > > > > val jobId = env.submit(...) > > > > > > val jobStatus = env.getClusterClient().queryJobStatus(jobId) > > > > > > env.getClusterClient().cancelJob(jobId) > > > > > > > > > > > > What do you think ? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Till Rohrmann <trohrm...@apache.org> 于2018年12月11日周二 下午6:28写道: > > > > > > > > > > > > > Hi Jeff, > > > > > > > > > > > > > > what you are proposing is to provide the user with better > > > > programmatic > > > > > > job > > > > > > > control. There was actually an effort to achieve this but it > has > > > > never > > > > > > been > > > > > > > completed [1]. However, there are some improvement in the code > > base > > > > > now. > > > > > > > Look for example at the NewClusterClient interface which > offers a > > > > > > > non-blocking job submission. But I agree that we need to > improve > > > > Flink > > > > > in > > > > > > > this regard. > > > > > > > > > > > > > > I would not be in favour if exposing all ClusterClient calls > via > > > the > > > > > > > ExecutionEnvironment because it would clutter the class and > would > > > not > > > > > be > > > > > > a > > > > > > > good separation of concerns. Instead one idea could be to > > retrieve > > > > the > > > > > > > current ClusterClient from the ExecutionEnvironment which can > > then > > > be > > > > > > used > > > > > > > for cluster and job control. But before we start an effort > here, > > we > > > > > need > > > > > > to > > > > > > > agree and capture what functionality we want to provide. > > > > > > > > > > > > > > Initially, the idea was that we have the ClusterDescriptor > > > describing > > > > > how > > > > > > > to talk to cluster manager like Yarn or Mesos. The > > > ClusterDescriptor > > > > > can > > > > > > be > > > > > > > used for deploying Flink clusters (job and session) and gives > > you a > > > > > > > ClusterClient. The ClusterClient controls the cluster (e.g. > > > > submitting > > > > > > > jobs, listing all running jobs). And then there was the idea to > > > > > > introduce a > > > > > > > JobClient which you obtain from the ClusterClient to trigger > job > > > > > specific > > > > > > > operations (e.g. taking a savepoint, cancelling the job). > > > > > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-4272 > > > > > > > > > > > > > > Cheers, > > > > > > > Till > > > > > > > > > > > > > > On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang <zjf...@gmail.com> > > > > wrote: > > > > > > > > > > > > > > > Hi Folks, > > > > > > > > > > > > > > > > I am trying to integrate flink into apache zeppelin which is > an > > > > > > > interactive > > > > > > > > notebook. And I hit several issues that is caused by flink > > client > > > > > api. > > > > > > So > > > > > > > > I'd like to proposal the following changes for flink client > > api. > > > > > > > > > > > > > > > > 1. Support nonblocking execution. Currently, > > > > > > ExecutionEnvironment#execute > > > > > > > > is a blocking method which would do 2 things, first submit > job > > > and > > > > > then > > > > > > > > wait for job until it is finished. I'd like introduce a > > > nonblocking > > > > > > > > execution method like ExecutionEnvironment#submit which only > > > submit > > > > > job > > > > > > > and > > > > > > > > then return jobId to client. And allow user to query the job > > > status > > > > > via > > > > > > > the > > > > > > > > jobId. > > > > > > > > > > > > > > > > 2. Add cancel api in > > > > ExecutionEnvironment/StreamExecutionEnvironment, > > > > > > > > currently the only way to cancel job is via cli (bin/flink), > > this > > > > is > > > > > > not > > > > > > > > convenient for downstream project to use this feature. So I'd > > > like > > > > to > > > > > > add > > > > > > > > cancel api in ExecutionEnvironment > > > > > > > > > > > > > > > > 3. Add savepoint api in > > > > > > ExecutionEnvironment/StreamExecutionEnvironment. > > > > > > > It > > > > > > > > is similar as cancel api, we should use ExecutionEnvironment > as > > > the > > > > > > > unified > > > > > > > > api for third party to integrate with flink. > > > > > > > > > > > > > > > > 4. Add listener for job execution lifecycle. Something like > > > > > following, > > > > > > so > > > > > > > > that downstream project can do custom logic in the lifecycle > of > > > > job. > > > > > > e.g. > > > > > > > > Zeppelin would capture the jobId after job is submitted and > > then > > > > use > > > > > > this > > > > > > > > jobId to cancel it later when necessary. > > > > > > > > > > > > > > > > public interface JobListener { > > > > > > > > > > > > > > > > void onJobSubmitted(JobID jobId); > > > > > > > > > > > > > > > > void onJobExecuted(JobExecutionResult jobResult); > > > > > > > > > > > > > > > > void onJobCanceled(JobID jobId); > > > > > > > > } > > > > > > > > > > > > > > > > 5. Enable session in ExecutionEnvironment. Currently it is > > > > disabled, > > > > > > but > > > > > > > > session is very convenient for third party to submitting jobs > > > > > > > continually. > > > > > > > > I hope flink can enable it again. > > > > > > > > > > > > > > > > 6. Unify all flink client api into > > > > > > > > ExecutionEnvironment/StreamExecutionEnvironment. > > > > > > > > > > > > > > > > This is a long term issue which needs more careful thinking > and > > > > > design. > > > > > > > > Currently some of features of flink is exposed in > > > > > > > > ExecutionEnvironment/StreamExecutionEnvironment, but some are > > > > exposed > > > > > > in > > > > > > > > cli instead of api, like the cancel and savepoint I mentioned > > > > above. > > > > > I > > > > > > > > think the root cause is due to that flink didn't unify the > > > > > interaction > > > > > > > with > > > > > > > > flink. Here I list 3 scenarios of flink operation > > > > > > > > > > > > > > > > - Local job execution. Flink will create LocalEnvironment > > and > > > > > then > > > > > > > use > > > > > > > > this LocalEnvironment to create LocalExecutor for job > > > execution. > > > > > > > > - Remote job execution. Flink will create ClusterClient > > first > > > > and > > > > > > then > > > > > > > > create ContextEnvironment based on the ClusterClient and > > then > > > > run > > > > > > the > > > > > > > > job. > > > > > > > > - Job cancelation. Flink will create ClusterClient first > and > > > > then > > > > > > > cancel > > > > > > > > this job via this ClusterClient. > > > > > > > > > > > > > > > > As you can see in the above 3 scenarios. Flink didn't use the > > > same > > > > > > > > approach(code path) to interact with flink > > > > > > > > What I propose is following: > > > > > > > > Create the proper LocalEnvironment/RemoteEnvironment (based > on > > > user > > > > > > > > configuration) --> Use this Environment to create proper > > > > > ClusterClient > > > > > > > > (LocalClusterClient or RestClusterClient) to interactive with > > > > Flink ( > > > > > > job > > > > > > > > execution or cancelation) > > > > > > > > > > > > > > > > This way we can unify the process of local execution and > remote > > > > > > > execution. > > > > > > > > And it is much easier for third party to integrate with > flink, > > > > > because > > > > > > > > ExecutionEnvironment is the unified entry point for flink. > What > > > > third > > > > > > > party > > > > > > > > needs to do is just pass configuration to > ExecutionEnvironment > > > and > > > > > > > > ExecutionEnvironment will do the right thing based on the > > > > > > configuration. > > > > > > > > Flink cli can also be considered as flink api consumer. it > just > > > > pass > > > > > > the > > > > > > > > configuration to ExecutionEnvironment and let > > > ExecutionEnvironment > > > > to > > > > > > > > create the proper ClusterClient instead of letting cli to > > create > > > > > > > > ClusterClient directly. > > > > > > > > > > > > > > > > > > > > > > > > 6 would involve large code refactoring, so I think we can > defer > > > it > > > > > for > > > > > > > > future release, 1,2,3,4,5 could be done at once I believe. > Let > > me > > > > > know > > > > > > > your > > > > > > > > comments and feedback, thanks > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Best Regards > > > > > > > > > > > > > > > > Jeff Zhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Best Regards > > > > > > > > > > > > Jeff Zhang > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Best Regards > > > > > > > > Jeff Zhang > > > > > > > > > > > > > -- > > Best Regards > > > > Jeff Zhang > > > -- Best Regards Jeff Zhang