Hi Jeff and Flavio, Thanks Jeff a lot for proposing the design document.
We are also working on refactoring ClusterClient to allow flexible and efficient job management in our real-time platform. We would like to draft a document to share our ideas with you. I think it's a good idea to have something like Apache Livy for Flink, and the efforts discussed here will take a great step forward to it. Regards, Xiaogang Flavio Pompermaier <pomperma...@okkam.it> 于2019年6月17日周一 下午7:13写道: > Is there any possibility to have something like Apache Livy [1] also for > Flink in the future? > > [1] https://livy.apache.org/ > > On Tue, Jun 11, 2019 at 5:23 PM Jeff Zhang <zjf...@gmail.com> wrote: > > > >>> Any API we expose should not have dependencies on the runtime > > (flink-runtime) package or other implementation details. To me, this > means > > that the current ClusterClient cannot be exposed to users because it > uses > > quite some classes from the optimiser and runtime packages. > > > > We should change ClusterClient from class to interface. > > ExecutionEnvironment only use the interface ClusterClient which should be > > in flink-clients while the concrete implementation class could be in > > flink-runtime. > > > > >>> What happens when a failure/restart in the client happens? There need > > to be a way of re-establishing the connection to the job, set up the > > listeners again, etc. > > > > Good point. First we need to define what does failure/restart in the > > client mean. IIUC, that usually mean network failure which will happen in > > class RestClient. If my understanding is correct, restart/retry mechanism > > should be done in RestClient. > > > > > > > > > > > > Aljoscha Krettek <aljos...@apache.org> 于2019年6月11日周二 下午11:10写道: > > > > > Some points to consider: > > > > > > * Any API we expose should not have dependencies on the runtime > > > (flink-runtime) package or other implementation details. To me, this > > means > > > that the current ClusterClient cannot be exposed to users because it > > uses > > > quite some classes from the optimiser and runtime packages. > > > > > > * What happens when a failure/restart in the client happens? There need > > to > > > be a way of re-establishing the connection to the job, set up the > > listeners > > > again, etc. > > > > > > Aljoscha > > > > > > > On 29. May 2019, at 10:17, Jeff Zhang <zjf...@gmail.com> wrote: > > > > > > > > Sorry folks, the design doc is late as you expected. Here's the > design > > > doc > > > > I drafted, welcome any comments and feedback. > > > > > > > > > > > > > > https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing > > > > > > > > > > > > > > > > Stephan Ewen <se...@apache.org> 于2019年2月14日周四 下午8:43写道: > > > > > > > >> Nice that this discussion is happening. > > > >> > > > >> In the FLIP, we could also revisit the entire role of the > environments > > > >> again. > > > >> > > > >> Initially, the idea was: > > > >> - the environments take care of the specific setup for standalone > (no > > > >> setup needed), yarn, mesos, etc. > > > >> - the session ones have control over the session. The environment > > holds > > > >> the session client. > > > >> - running a job gives a "control" object for that job. That > behavior > > is > > > >> the same in all environments. > > > >> > > > >> The actual implementation diverged quite a bit from that. Happy to > > see a > > > >> discussion about straitening this out a bit more. > > > >> > > > >> > > > >> On Tue, Feb 12, 2019 at 4:58 AM Jeff Zhang <zjf...@gmail.com> > wrote: > > > >> > > > >>> Hi folks, > > > >>> > > > >>> Sorry for late response, It seems we reach consensus on this, I > will > > > >> create > > > >>> FLIP for this with more detailed design > > > >>> > > > >>> > > > >>> Thomas Weise <t...@apache.org> 于2018年12月21日周五 上午11:43写道: > > > >>> > > > >>>> Great to see this discussion seeded! The problems you face with > the > > > >>>> Zeppelin integration are also affecting other downstream projects, > > > like > > > >>>> Beam. > > > >>>> > > > >>>> We just enabled the savepoint restore option in > > > RemoteStreamEnvironment > > > >>> [1] > > > >>>> and that was more difficult than it should be. The main issue is > > that > > > >>>> environment and cluster client aren't decoupled. Ideally it should > > be > > > >>>> possible to just get the matching cluster client from the > > environment > > > >> and > > > >>>> then control the job through it (environment as factory for > cluster > > > >>>> client). But note that the environment classes are part of the > > public > > > >>> API, > > > >>>> and it is not straightforward to make larger changes without > > breaking > > > >>>> backward compatibility. > > > >>>> > > > >>>> ClusterClient currently exposes internal classes like JobGraph and > > > >>>> StreamGraph. But it should be possible to wrap this with a new > > public > > > >> API > > > >>>> that brings the required job control capabilities for downstream > > > >>> projects. > > > >>>> Perhaps it is helpful to look at some of the interfaces in Beam > > while > > > >>>> thinking about this: [2] for the portable job API and [3] for the > > old > > > >>>> asynchronous job control from the Beam Java SDK. > > > >>>> > > > >>>> The backward compatibility discussion [4] is also relevant here. A > > new > > > >>> API > > > >>>> should shield downstream projects from internals and allow them to > > > >>>> interoperate with multiple future Flink versions in the same > release > > > >> line > > > >>>> without forced upgrades. > > > >>>> > > > >>>> Thanks, > > > >>>> Thomas > > > >>>> > > > >>>> [1] https://github.com/apache/flink/pull/7249 > > > >>>> [2] > > > >>>> > > > >>>> > > > >>> > > > >> > > > > > > https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto > > > >>>> [3] > > > >>>> > > > >>>> > > > >>> > > > >> > > > > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java > > > >>>> [4] > > > >>>> > > > >>>> > > > >>> > > > >> > > > > > > https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E > > > >>>> > > > >>>> > > > >>>> On Thu, Dec 20, 2018 at 6:15 PM Jeff Zhang <zjf...@gmail.com> > > wrote: > > > >>>> > > > >>>>>>>> I'm not so sure whether the user should be able to define > where > > > >> the > > > >>>> job > > > >>>>> runs (in your example Yarn). This is actually independent of the > > job > > > >>>>> development and is something which is decided at deployment time. > > > >>>>> > > > >>>>> User don't need to specify execution mode programmatically. They > > can > > > >>> also > > > >>>>> pass the execution mode from the arguments in flink run command. > > e.g. > > > >>>>> > > > >>>>> bin/flink run -m yarn-cluster .... > > > >>>>> bin/flink run -m local ... > > > >>>>> bin/flink run -m host:port ... > > > >>>>> > > > >>>>> Does this make sense to you ? > > > >>>>> > > > >>>>>>>> To me it makes sense that the ExecutionEnvironment is not > > > >> directly > > > >>>>> initialized by the user and instead context sensitive how you > want > > to > > > >>>>> execute your job (Flink CLI vs. IDE, for example). > > > >>>>> > > > >>>>> Right, currently I notice Flink would create different > > > >>>>> ContextExecutionEnvironment based on different submission > scenarios > > > >>>> (Flink > > > >>>>> Cli vs IDE). To me this is kind of hack approach, not so > > > >>> straightforward. > > > >>>>> What I suggested above is that is that flink should always create > > the > > > >>>> same > > > >>>>> ExecutionEnvironment but with different configuration, and based > on > > > >> the > > > >>>>> configuration it would create the proper ClusterClient for > > different > > > >>>>> behaviors. > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> Till Rohrmann <trohrm...@apache.org> 于2018年12月20日周四 下午11:18写道: > > > >>>>> > > > >>>>>> You are probably right that we have code duplication when it > comes > > > >> to > > > >>>> the > > > >>>>>> creation of the ClusterClient. This should be reduced in the > > > >> future. > > > >>>>>> > > > >>>>>> I'm not so sure whether the user should be able to define where > > the > > > >>> job > > > >>>>>> runs (in your example Yarn). This is actually independent of the > > > >> job > > > >>>>>> development and is something which is decided at deployment > time. > > > >> To > > > >>> me > > > >>>>> it > > > >>>>>> makes sense that the ExecutionEnvironment is not directly > > > >> initialized > > > >>>> by > > > >>>>>> the user and instead context sensitive how you want to execute > > your > > > >>> job > > > >>>>>> (Flink CLI vs. IDE, for example). However, I agree that the > > > >>>>>> ExecutionEnvironment should give you access to the ClusterClient > > > >> and > > > >>> to > > > >>>>> the > > > >>>>>> job (maybe in the form of the JobGraph or a job plan). > > > >>>>>> > > > >>>>>> Cheers, > > > >>>>>> Till > > > >>>>>> > > > >>>>>> On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang <zjf...@gmail.com> > > > >> wrote: > > > >>>>>> > > > >>>>>>> Hi Till, > > > >>>>>>> Thanks for the feedback. You are right that I expect better > > > >>>>> programmatic > > > >>>>>>> job submission/control api which could be used by downstream > > > >>> project. > > > >>>>> And > > > >>>>>>> it would benefit for the flink ecosystem. When I look at the > code > > > >>> of > > > >>>>>> flink > > > >>>>>>> scala-shell and sql-client (I believe they are not the core of > > > >>> flink, > > > >>>>> but > > > >>>>>>> belong to the ecosystem of flink), I find many duplicated code > > > >> for > > > >>>>>> creating > > > >>>>>>> ClusterClient from user provided configuration (configuration > > > >>> format > > > >>>>> may > > > >>>>>> be > > > >>>>>>> different from scala-shell and sql-client) and then use that > > > >>>>>> ClusterClient > > > >>>>>>> to manipulate jobs. I don't think this is convenient for > > > >> downstream > > > >>>>>>> projects. What I expect is that downstream project only needs > to > > > >>>>> provide > > > >>>>>>> necessary configuration info (maybe introducing class > FlinkConf), > > > >>> and > > > >>>>>> then > > > >>>>>>> build ExecutionEnvironment based on this FlinkConf, and > > > >>>>>>> ExecutionEnvironment will create the proper ClusterClient. It > not > > > >>>> only > > > >>>>>>> benefit for the downstream project development but also be > > > >> helpful > > > >>>> for > > > >>>>>>> their integration test with flink. Here's one sample code > snippet > > > >>>> that > > > >>>>> I > > > >>>>>>> expect. > > > >>>>>>> > > > >>>>>>> val conf = new FlinkConf().mode("yarn") > > > >>>>>>> val env = new ExecutionEnvironment(conf) > > > >>>>>>> val jobId = env.submit(...) > > > >>>>>>> val jobStatus = env.getClusterClient().queryJobStatus(jobId) > > > >>>>>>> env.getClusterClient().cancelJob(jobId) > > > >>>>>>> > > > >>>>>>> What do you think ? > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> Till Rohrmann <trohrm...@apache.org> 于2018年12月11日周二 下午6:28写道: > > > >>>>>>> > > > >>>>>>>> Hi Jeff, > > > >>>>>>>> > > > >>>>>>>> what you are proposing is to provide the user with better > > > >>>>> programmatic > > > >>>>>>> job > > > >>>>>>>> control. There was actually an effort to achieve this but it > > > >> has > > > >>>>> never > > > >>>>>>> been > > > >>>>>>>> completed [1]. However, there are some improvement in the code > > > >>> base > > > >>>>>> now. > > > >>>>>>>> Look for example at the NewClusterClient interface which > > > >> offers a > > > >>>>>>>> non-blocking job submission. But I agree that we need to > > > >> improve > > > >>>>> Flink > > > >>>>>> in > > > >>>>>>>> this regard. > > > >>>>>>>> > > > >>>>>>>> I would not be in favour if exposing all ClusterClient calls > > > >> via > > > >>>> the > > > >>>>>>>> ExecutionEnvironment because it would clutter the class and > > > >> would > > > >>>> not > > > >>>>>> be > > > >>>>>>> a > > > >>>>>>>> good separation of concerns. Instead one idea could be to > > > >>> retrieve > > > >>>>> the > > > >>>>>>>> current ClusterClient from the ExecutionEnvironment which can > > > >>> then > > > >>>> be > > > >>>>>>> used > > > >>>>>>>> for cluster and job control. But before we start an effort > > > >> here, > > > >>> we > > > >>>>>> need > > > >>>>>>> to > > > >>>>>>>> agree and capture what functionality we want to provide. > > > >>>>>>>> > > > >>>>>>>> Initially, the idea was that we have the ClusterDescriptor > > > >>>> describing > > > >>>>>> how > > > >>>>>>>> to talk to cluster manager like Yarn or Mesos. The > > > >>>> ClusterDescriptor > > > >>>>>> can > > > >>>>>>> be > > > >>>>>>>> used for deploying Flink clusters (job and session) and gives > > > >>> you a > > > >>>>>>>> ClusterClient. The ClusterClient controls the cluster (e.g. > > > >>>>> submitting > > > >>>>>>>> jobs, listing all running jobs). And then there was the idea > to > > > >>>>>>> introduce a > > > >>>>>>>> JobClient which you obtain from the ClusterClient to trigger > > > >> job > > > >>>>>> specific > > > >>>>>>>> operations (e.g. taking a savepoint, cancelling the job). > > > >>>>>>>> > > > >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-4272 > > > >>>>>>>> > > > >>>>>>>> Cheers, > > > >>>>>>>> Till > > > >>>>>>>> > > > >>>>>>>> On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang <zjf...@gmail.com > > > > > >>>>> wrote: > > > >>>>>>>> > > > >>>>>>>>> Hi Folks, > > > >>>>>>>>> > > > >>>>>>>>> I am trying to integrate flink into apache zeppelin which is > > > >> an > > > >>>>>>>> interactive > > > >>>>>>>>> notebook. And I hit several issues that is caused by flink > > > >>> client > > > >>>>>> api. > > > >>>>>>> So > > > >>>>>>>>> I'd like to proposal the following changes for flink client > > > >>> api. > > > >>>>>>>>> > > > >>>>>>>>> 1. Support nonblocking execution. Currently, > > > >>>>>>> ExecutionEnvironment#execute > > > >>>>>>>>> is a blocking method which would do 2 things, first submit > > > >> job > > > >>>> and > > > >>>>>> then > > > >>>>>>>>> wait for job until it is finished. I'd like introduce a > > > >>>> nonblocking > > > >>>>>>>>> execution method like ExecutionEnvironment#submit which only > > > >>>> submit > > > >>>>>> job > > > >>>>>>>> and > > > >>>>>>>>> then return jobId to client. And allow user to query the job > > > >>>> status > > > >>>>>> via > > > >>>>>>>> the > > > >>>>>>>>> jobId. > > > >>>>>>>>> > > > >>>>>>>>> 2. Add cancel api in > > > >>>>> ExecutionEnvironment/StreamExecutionEnvironment, > > > >>>>>>>>> currently the only way to cancel job is via cli (bin/flink), > > > >>> this > > > >>>>> is > > > >>>>>>> not > > > >>>>>>>>> convenient for downstream project to use this feature. So I'd > > > >>>> like > > > >>>>> to > > > >>>>>>> add > > > >>>>>>>>> cancel api in ExecutionEnvironment > > > >>>>>>>>> > > > >>>>>>>>> 3. Add savepoint api in > > > >>>>>>> ExecutionEnvironment/StreamExecutionEnvironment. > > > >>>>>>>> It > > > >>>>>>>>> is similar as cancel api, we should use ExecutionEnvironment > > > >> as > > > >>>> the > > > >>>>>>>> unified > > > >>>>>>>>> api for third party to integrate with flink. > > > >>>>>>>>> > > > >>>>>>>>> 4. Add listener for job execution lifecycle. Something like > > > >>>>>> following, > > > >>>>>>> so > > > >>>>>>>>> that downstream project can do custom logic in the lifecycle > > > >> of > > > >>>>> job. > > > >>>>>>> e.g. > > > >>>>>>>>> Zeppelin would capture the jobId after job is submitted and > > > >>> then > > > >>>>> use > > > >>>>>>> this > > > >>>>>>>>> jobId to cancel it later when necessary. > > > >>>>>>>>> > > > >>>>>>>>> public interface JobListener { > > > >>>>>>>>> > > > >>>>>>>>> void onJobSubmitted(JobID jobId); > > > >>>>>>>>> > > > >>>>>>>>> void onJobExecuted(JobExecutionResult jobResult); > > > >>>>>>>>> > > > >>>>>>>>> void onJobCanceled(JobID jobId); > > > >>>>>>>>> } > > > >>>>>>>>> > > > >>>>>>>>> 5. Enable session in ExecutionEnvironment. Currently it is > > > >>>>> disabled, > > > >>>>>>> but > > > >>>>>>>>> session is very convenient for third party to submitting jobs > > > >>>>>>>> continually. > > > >>>>>>>>> I hope flink can enable it again. > > > >>>>>>>>> > > > >>>>>>>>> 6. Unify all flink client api into > > > >>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment. > > > >>>>>>>>> > > > >>>>>>>>> This is a long term issue which needs more careful thinking > > > >> and > > > >>>>>> design. > > > >>>>>>>>> Currently some of features of flink is exposed in > > > >>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment, but some are > > > >>>>> exposed > > > >>>>>>> in > > > >>>>>>>>> cli instead of api, like the cancel and savepoint I mentioned > > > >>>>> above. > > > >>>>>> I > > > >>>>>>>>> think the root cause is due to that flink didn't unify the > > > >>>>>> interaction > > > >>>>>>>> with > > > >>>>>>>>> flink. Here I list 3 scenarios of flink operation > > > >>>>>>>>> > > > >>>>>>>>> - Local job execution. Flink will create LocalEnvironment > > > >>> and > > > >>>>>> then > > > >>>>>>>> use > > > >>>>>>>>> this LocalEnvironment to create LocalExecutor for job > > > >>>> execution. > > > >>>>>>>>> - Remote job execution. Flink will create ClusterClient > > > >>> first > > > >>>>> and > > > >>>>>>> then > > > >>>>>>>>> create ContextEnvironment based on the ClusterClient and > > > >>> then > > > >>>>> run > > > >>>>>>> the > > > >>>>>>>>> job. > > > >>>>>>>>> - Job cancelation. Flink will create ClusterClient first > > > >> and > > > >>>>> then > > > >>>>>>>> cancel > > > >>>>>>>>> this job via this ClusterClient. > > > >>>>>>>>> > > > >>>>>>>>> As you can see in the above 3 scenarios. Flink didn't use the > > > >>>> same > > > >>>>>>>>> approach(code path) to interact with flink > > > >>>>>>>>> What I propose is following: > > > >>>>>>>>> Create the proper LocalEnvironment/RemoteEnvironment (based > > > >> on > > > >>>> user > > > >>>>>>>>> configuration) --> Use this Environment to create proper > > > >>>>>> ClusterClient > > > >>>>>>>>> (LocalClusterClient or RestClusterClient) to interactive with > > > >>>>> Flink ( > > > >>>>>>> job > > > >>>>>>>>> execution or cancelation) > > > >>>>>>>>> > > > >>>>>>>>> This way we can unify the process of local execution and > > > >> remote > > > >>>>>>>> execution. > > > >>>>>>>>> And it is much easier for third party to integrate with > > > >> flink, > > > >>>>>> because > > > >>>>>>>>> ExecutionEnvironment is the unified entry point for flink. > > > >> What > > > >>>>> third > > > >>>>>>>> party > > > >>>>>>>>> needs to do is just pass configuration to > > > >> ExecutionEnvironment > > > >>>> and > > > >>>>>>>>> ExecutionEnvironment will do the right thing based on the > > > >>>>>>> configuration. > > > >>>>>>>>> Flink cli can also be considered as flink api consumer. it > > > >> just > > > >>>>> pass > > > >>>>>>> the > > > >>>>>>>>> configuration to ExecutionEnvironment and let > > > >>>> ExecutionEnvironment > > > >>>>> to > > > >>>>>>>>> create the proper ClusterClient instead of letting cli to > > > >>> create > > > >>>>>>>>> ClusterClient directly. > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> 6 would involve large code refactoring, so I think we can > > > >> defer > > > >>>> it > > > >>>>>> for > > > >>>>>>>>> future release, 1,2,3,4,5 could be done at once I believe. > > > >> Let > > > >>> me > > > >>>>>> know > > > >>>>>>>> your > > > >>>>>>>>> comments and feedback, thanks > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> -- > > > >>>>>>>>> Best Regards > > > >>>>>>>>> > > > >>>>>>>>> Jeff Zhang > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> -- > > > >>>>>>> Best Regards > > > >>>>>>> > > > >>>>>>> Jeff Zhang > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>>> > > > >>>>> -- > > > >>>>> Best Regards > > > >>>>> > > > >>>>> Jeff Zhang > > > >>>>> > > > >>>> > > > >>> > > > >>> > > > >>> -- > > > >>> Best Regards > > > >>> > > > >>> Jeff Zhang > > > >>> > > > >> > > > > > > > > > > > > -- > > > > Best Regards > > > > > > > > Jeff Zhang > > > > > > > > > > -- > > Best Regards > > > > Jeff Zhang > > >