Some points to consider: * Any API we expose should not have dependencies on the runtime (flink-runtime) package or other implementation details. To me, this means that the current ClusterClient cannot be exposed to users because it uses quite some classes from the optimiser and runtime packages.
* What happens when a failure/restart in the client happens? There need to be a way of re-establishing the connection to the job, set up the listeners again, etc. Aljoscha > On 29. May 2019, at 10:17, Jeff Zhang <zjf...@gmail.com> wrote: > > Sorry folks, the design doc is late as you expected. Here's the design doc > I drafted, welcome any comments and feedback. > > https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing > > > > Stephan Ewen <se...@apache.org> 于2019年2月14日周四 下午8:43写道: > >> Nice that this discussion is happening. >> >> In the FLIP, we could also revisit the entire role of the environments >> again. >> >> Initially, the idea was: >> - the environments take care of the specific setup for standalone (no >> setup needed), yarn, mesos, etc. >> - the session ones have control over the session. The environment holds >> the session client. >> - running a job gives a "control" object for that job. That behavior is >> the same in all environments. >> >> The actual implementation diverged quite a bit from that. Happy to see a >> discussion about straitening this out a bit more. >> >> >> On Tue, Feb 12, 2019 at 4:58 AM Jeff Zhang <zjf...@gmail.com> wrote: >> >>> Hi folks, >>> >>> Sorry for late response, It seems we reach consensus on this, I will >> create >>> FLIP for this with more detailed design >>> >>> >>> Thomas Weise <t...@apache.org> 于2018年12月21日周五 上午11:43写道: >>> >>>> Great to see this discussion seeded! The problems you face with the >>>> Zeppelin integration are also affecting other downstream projects, like >>>> Beam. >>>> >>>> We just enabled the savepoint restore option in RemoteStreamEnvironment >>> [1] >>>> and that was more difficult than it should be. The main issue is that >>>> environment and cluster client aren't decoupled. Ideally it should be >>>> possible to just get the matching cluster client from the environment >> and >>>> then control the job through it (environment as factory for cluster >>>> client). But note that the environment classes are part of the public >>> API, >>>> and it is not straightforward to make larger changes without breaking >>>> backward compatibility. >>>> >>>> ClusterClient currently exposes internal classes like JobGraph and >>>> StreamGraph. But it should be possible to wrap this with a new public >> API >>>> that brings the required job control capabilities for downstream >>> projects. >>>> Perhaps it is helpful to look at some of the interfaces in Beam while >>>> thinking about this: [2] for the portable job API and [3] for the old >>>> asynchronous job control from the Beam Java SDK. >>>> >>>> The backward compatibility discussion [4] is also relevant here. A new >>> API >>>> should shield downstream projects from internals and allow them to >>>> interoperate with multiple future Flink versions in the same release >> line >>>> without forced upgrades. >>>> >>>> Thanks, >>>> Thomas >>>> >>>> [1] https://github.com/apache/flink/pull/7249 >>>> [2] >>>> >>>> >>> >> https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto >>>> [3] >>>> >>>> >>> >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java >>>> [4] >>>> >>>> >>> >> https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E >>>> >>>> >>>> On Thu, Dec 20, 2018 at 6:15 PM Jeff Zhang <zjf...@gmail.com> wrote: >>>> >>>>>>>> I'm not so sure whether the user should be able to define where >> the >>>> job >>>>> runs (in your example Yarn). This is actually independent of the job >>>>> development and is something which is decided at deployment time. >>>>> >>>>> User don't need to specify execution mode programmatically. They can >>> also >>>>> pass the execution mode from the arguments in flink run command. e.g. >>>>> >>>>> bin/flink run -m yarn-cluster .... >>>>> bin/flink run -m local ... >>>>> bin/flink run -m host:port ... >>>>> >>>>> Does this make sense to you ? >>>>> >>>>>>>> To me it makes sense that the ExecutionEnvironment is not >> directly >>>>> initialized by the user and instead context sensitive how you want to >>>>> execute your job (Flink CLI vs. IDE, for example). >>>>> >>>>> Right, currently I notice Flink would create different >>>>> ContextExecutionEnvironment based on different submission scenarios >>>> (Flink >>>>> Cli vs IDE). To me this is kind of hack approach, not so >>> straightforward. >>>>> What I suggested above is that is that flink should always create the >>>> same >>>>> ExecutionEnvironment but with different configuration, and based on >> the >>>>> configuration it would create the proper ClusterClient for different >>>>> behaviors. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> Till Rohrmann <trohrm...@apache.org> 于2018年12月20日周四 下午11:18写道: >>>>> >>>>>> You are probably right that we have code duplication when it comes >> to >>>> the >>>>>> creation of the ClusterClient. This should be reduced in the >> future. >>>>>> >>>>>> I'm not so sure whether the user should be able to define where the >>> job >>>>>> runs (in your example Yarn). This is actually independent of the >> job >>>>>> development and is something which is decided at deployment time. >> To >>> me >>>>> it >>>>>> makes sense that the ExecutionEnvironment is not directly >> initialized >>>> by >>>>>> the user and instead context sensitive how you want to execute your >>> job >>>>>> (Flink CLI vs. IDE, for example). However, I agree that the >>>>>> ExecutionEnvironment should give you access to the ClusterClient >> and >>> to >>>>> the >>>>>> job (maybe in the form of the JobGraph or a job plan). >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang <zjf...@gmail.com> >> wrote: >>>>>> >>>>>>> Hi Till, >>>>>>> Thanks for the feedback. You are right that I expect better >>>>> programmatic >>>>>>> job submission/control api which could be used by downstream >>> project. >>>>> And >>>>>>> it would benefit for the flink ecosystem. When I look at the code >>> of >>>>>> flink >>>>>>> scala-shell and sql-client (I believe they are not the core of >>> flink, >>>>> but >>>>>>> belong to the ecosystem of flink), I find many duplicated code >> for >>>>>> creating >>>>>>> ClusterClient from user provided configuration (configuration >>> format >>>>> may >>>>>> be >>>>>>> different from scala-shell and sql-client) and then use that >>>>>> ClusterClient >>>>>>> to manipulate jobs. I don't think this is convenient for >> downstream >>>>>>> projects. What I expect is that downstream project only needs to >>>>> provide >>>>>>> necessary configuration info (maybe introducing class FlinkConf), >>> and >>>>>> then >>>>>>> build ExecutionEnvironment based on this FlinkConf, and >>>>>>> ExecutionEnvironment will create the proper ClusterClient. It not >>>> only >>>>>>> benefit for the downstream project development but also be >> helpful >>>> for >>>>>>> their integration test with flink. Here's one sample code snippet >>>> that >>>>> I >>>>>>> expect. >>>>>>> >>>>>>> val conf = new FlinkConf().mode("yarn") >>>>>>> val env = new ExecutionEnvironment(conf) >>>>>>> val jobId = env.submit(...) >>>>>>> val jobStatus = env.getClusterClient().queryJobStatus(jobId) >>>>>>> env.getClusterClient().cancelJob(jobId) >>>>>>> >>>>>>> What do you think ? >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> Till Rohrmann <trohrm...@apache.org> 于2018年12月11日周二 下午6:28写道: >>>>>>> >>>>>>>> Hi Jeff, >>>>>>>> >>>>>>>> what you are proposing is to provide the user with better >>>>> programmatic >>>>>>> job >>>>>>>> control. There was actually an effort to achieve this but it >> has >>>>> never >>>>>>> been >>>>>>>> completed [1]. However, there are some improvement in the code >>> base >>>>>> now. >>>>>>>> Look for example at the NewClusterClient interface which >> offers a >>>>>>>> non-blocking job submission. But I agree that we need to >> improve >>>>> Flink >>>>>> in >>>>>>>> this regard. >>>>>>>> >>>>>>>> I would not be in favour if exposing all ClusterClient calls >> via >>>> the >>>>>>>> ExecutionEnvironment because it would clutter the class and >> would >>>> not >>>>>> be >>>>>>> a >>>>>>>> good separation of concerns. Instead one idea could be to >>> retrieve >>>>> the >>>>>>>> current ClusterClient from the ExecutionEnvironment which can >>> then >>>> be >>>>>>> used >>>>>>>> for cluster and job control. But before we start an effort >> here, >>> we >>>>>> need >>>>>>> to >>>>>>>> agree and capture what functionality we want to provide. >>>>>>>> >>>>>>>> Initially, the idea was that we have the ClusterDescriptor >>>> describing >>>>>> how >>>>>>>> to talk to cluster manager like Yarn or Mesos. The >>>> ClusterDescriptor >>>>>> can >>>>>>> be >>>>>>>> used for deploying Flink clusters (job and session) and gives >>> you a >>>>>>>> ClusterClient. The ClusterClient controls the cluster (e.g. >>>>> submitting >>>>>>>> jobs, listing all running jobs). And then there was the idea to >>>>>>> introduce a >>>>>>>> JobClient which you obtain from the ClusterClient to trigger >> job >>>>>> specific >>>>>>>> operations (e.g. taking a savepoint, cancelling the job). >>>>>>>> >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-4272 >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Till >>>>>>>> >>>>>>>> On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang <zjf...@gmail.com> >>>>> wrote: >>>>>>>> >>>>>>>>> Hi Folks, >>>>>>>>> >>>>>>>>> I am trying to integrate flink into apache zeppelin which is >> an >>>>>>>> interactive >>>>>>>>> notebook. And I hit several issues that is caused by flink >>> client >>>>>> api. >>>>>>> So >>>>>>>>> I'd like to proposal the following changes for flink client >>> api. >>>>>>>>> >>>>>>>>> 1. Support nonblocking execution. Currently, >>>>>>> ExecutionEnvironment#execute >>>>>>>>> is a blocking method which would do 2 things, first submit >> job >>>> and >>>>>> then >>>>>>>>> wait for job until it is finished. I'd like introduce a >>>> nonblocking >>>>>>>>> execution method like ExecutionEnvironment#submit which only >>>> submit >>>>>> job >>>>>>>> and >>>>>>>>> then return jobId to client. And allow user to query the job >>>> status >>>>>> via >>>>>>>> the >>>>>>>>> jobId. >>>>>>>>> >>>>>>>>> 2. Add cancel api in >>>>> ExecutionEnvironment/StreamExecutionEnvironment, >>>>>>>>> currently the only way to cancel job is via cli (bin/flink), >>> this >>>>> is >>>>>>> not >>>>>>>>> convenient for downstream project to use this feature. So I'd >>>> like >>>>> to >>>>>>> add >>>>>>>>> cancel api in ExecutionEnvironment >>>>>>>>> >>>>>>>>> 3. Add savepoint api in >>>>>>> ExecutionEnvironment/StreamExecutionEnvironment. >>>>>>>> It >>>>>>>>> is similar as cancel api, we should use ExecutionEnvironment >> as >>>> the >>>>>>>> unified >>>>>>>>> api for third party to integrate with flink. >>>>>>>>> >>>>>>>>> 4. Add listener for job execution lifecycle. Something like >>>>>> following, >>>>>>> so >>>>>>>>> that downstream project can do custom logic in the lifecycle >> of >>>>> job. >>>>>>> e.g. >>>>>>>>> Zeppelin would capture the jobId after job is submitted and >>> then >>>>> use >>>>>>> this >>>>>>>>> jobId to cancel it later when necessary. >>>>>>>>> >>>>>>>>> public interface JobListener { >>>>>>>>> >>>>>>>>> void onJobSubmitted(JobID jobId); >>>>>>>>> >>>>>>>>> void onJobExecuted(JobExecutionResult jobResult); >>>>>>>>> >>>>>>>>> void onJobCanceled(JobID jobId); >>>>>>>>> } >>>>>>>>> >>>>>>>>> 5. Enable session in ExecutionEnvironment. Currently it is >>>>> disabled, >>>>>>> but >>>>>>>>> session is very convenient for third party to submitting jobs >>>>>>>> continually. >>>>>>>>> I hope flink can enable it again. >>>>>>>>> >>>>>>>>> 6. Unify all flink client api into >>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment. >>>>>>>>> >>>>>>>>> This is a long term issue which needs more careful thinking >> and >>>>>> design. >>>>>>>>> Currently some of features of flink is exposed in >>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment, but some are >>>>> exposed >>>>>>> in >>>>>>>>> cli instead of api, like the cancel and savepoint I mentioned >>>>> above. >>>>>> I >>>>>>>>> think the root cause is due to that flink didn't unify the >>>>>> interaction >>>>>>>> with >>>>>>>>> flink. Here I list 3 scenarios of flink operation >>>>>>>>> >>>>>>>>> - Local job execution. Flink will create LocalEnvironment >>> and >>>>>> then >>>>>>>> use >>>>>>>>> this LocalEnvironment to create LocalExecutor for job >>>> execution. >>>>>>>>> - Remote job execution. Flink will create ClusterClient >>> first >>>>> and >>>>>>> then >>>>>>>>> create ContextEnvironment based on the ClusterClient and >>> then >>>>> run >>>>>>> the >>>>>>>>> job. >>>>>>>>> - Job cancelation. Flink will create ClusterClient first >> and >>>>> then >>>>>>>> cancel >>>>>>>>> this job via this ClusterClient. >>>>>>>>> >>>>>>>>> As you can see in the above 3 scenarios. Flink didn't use the >>>> same >>>>>>>>> approach(code path) to interact with flink >>>>>>>>> What I propose is following: >>>>>>>>> Create the proper LocalEnvironment/RemoteEnvironment (based >> on >>>> user >>>>>>>>> configuration) --> Use this Environment to create proper >>>>>> ClusterClient >>>>>>>>> (LocalClusterClient or RestClusterClient) to interactive with >>>>> Flink ( >>>>>>> job >>>>>>>>> execution or cancelation) >>>>>>>>> >>>>>>>>> This way we can unify the process of local execution and >> remote >>>>>>>> execution. >>>>>>>>> And it is much easier for third party to integrate with >> flink, >>>>>> because >>>>>>>>> ExecutionEnvironment is the unified entry point for flink. >> What >>>>> third >>>>>>>> party >>>>>>>>> needs to do is just pass configuration to >> ExecutionEnvironment >>>> and >>>>>>>>> ExecutionEnvironment will do the right thing based on the >>>>>>> configuration. >>>>>>>>> Flink cli can also be considered as flink api consumer. it >> just >>>>> pass >>>>>>> the >>>>>>>>> configuration to ExecutionEnvironment and let >>>> ExecutionEnvironment >>>>> to >>>>>>>>> create the proper ClusterClient instead of letting cli to >>> create >>>>>>>>> ClusterClient directly. >>>>>>>>> >>>>>>>>> >>>>>>>>> 6 would involve large code refactoring, so I think we can >> defer >>>> it >>>>>> for >>>>>>>>> future release, 1,2,3,4,5 could be done at once I believe. >> Let >>> me >>>>>> know >>>>>>>> your >>>>>>>>> comments and feedback, thanks >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Best Regards >>>>>>>>> >>>>>>>>> Jeff Zhang >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best Regards >>>>>>> >>>>>>> Jeff Zhang >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Best Regards >>>>> >>>>> Jeff Zhang >>>>> >>>> >>> >>> >>> -- >>> Best Regards >>> >>> Jeff Zhang >>> >> > > > -- > Best Regards > > Jeff Zhang