Is there any possibility to have something like Apache Livy [1] also for Flink in the future?
[1] https://livy.apache.org/ On Tue, Jun 11, 2019 at 5:23 PM Jeff Zhang <zjf...@gmail.com> wrote: > >>> Any API we expose should not have dependencies on the runtime > (flink-runtime) package or other implementation details. To me, this means > that the current ClusterClient cannot be exposed to users because it uses > quite some classes from the optimiser and runtime packages. > > We should change ClusterClient from class to interface. > ExecutionEnvironment only use the interface ClusterClient which should be > in flink-clients while the concrete implementation class could be in > flink-runtime. > > >>> What happens when a failure/restart in the client happens? There need > to be a way of re-establishing the connection to the job, set up the > listeners again, etc. > > Good point. First we need to define what does failure/restart in the > client mean. IIUC, that usually mean network failure which will happen in > class RestClient. If my understanding is correct, restart/retry mechanism > should be done in RestClient. > > > > > > Aljoscha Krettek <aljos...@apache.org> 于2019年6月11日周二 下午11:10写道: > > > Some points to consider: > > > > * Any API we expose should not have dependencies on the runtime > > (flink-runtime) package or other implementation details. To me, this > means > > that the current ClusterClient cannot be exposed to users because it > uses > > quite some classes from the optimiser and runtime packages. > > > > * What happens when a failure/restart in the client happens? There need > to > > be a way of re-establishing the connection to the job, set up the > listeners > > again, etc. > > > > Aljoscha > > > > > On 29. May 2019, at 10:17, Jeff Zhang <zjf...@gmail.com> wrote: > > > > > > Sorry folks, the design doc is late as you expected. Here's the design > > doc > > > I drafted, welcome any comments and feedback. > > > > > > > > > https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing > > > > > > > > > > > > Stephan Ewen <se...@apache.org> 于2019年2月14日周四 下午8:43写道: > > > > > >> Nice that this discussion is happening. > > >> > > >> In the FLIP, we could also revisit the entire role of the environments > > >> again. > > >> > > >> Initially, the idea was: > > >> - the environments take care of the specific setup for standalone (no > > >> setup needed), yarn, mesos, etc. > > >> - the session ones have control over the session. The environment > holds > > >> the session client. > > >> - running a job gives a "control" object for that job. That behavior > is > > >> the same in all environments. > > >> > > >> The actual implementation diverged quite a bit from that. Happy to > see a > > >> discussion about straitening this out a bit more. > > >> > > >> > > >> On Tue, Feb 12, 2019 at 4:58 AM Jeff Zhang <zjf...@gmail.com> wrote: > > >> > > >>> Hi folks, > > >>> > > >>> Sorry for late response, It seems we reach consensus on this, I will > > >> create > > >>> FLIP for this with more detailed design > > >>> > > >>> > > >>> Thomas Weise <t...@apache.org> 于2018年12月21日周五 上午11:43写道: > > >>> > > >>>> Great to see this discussion seeded! The problems you face with the > > >>>> Zeppelin integration are also affecting other downstream projects, > > like > > >>>> Beam. > > >>>> > > >>>> We just enabled the savepoint restore option in > > RemoteStreamEnvironment > > >>> [1] > > >>>> and that was more difficult than it should be. The main issue is > that > > >>>> environment and cluster client aren't decoupled. Ideally it should > be > > >>>> possible to just get the matching cluster client from the > environment > > >> and > > >>>> then control the job through it (environment as factory for cluster > > >>>> client). But note that the environment classes are part of the > public > > >>> API, > > >>>> and it is not straightforward to make larger changes without > breaking > > >>>> backward compatibility. > > >>>> > > >>>> ClusterClient currently exposes internal classes like JobGraph and > > >>>> StreamGraph. But it should be possible to wrap this with a new > public > > >> API > > >>>> that brings the required job control capabilities for downstream > > >>> projects. > > >>>> Perhaps it is helpful to look at some of the interfaces in Beam > while > > >>>> thinking about this: [2] for the portable job API and [3] for the > old > > >>>> asynchronous job control from the Beam Java SDK. > > >>>> > > >>>> The backward compatibility discussion [4] is also relevant here. A > new > > >>> API > > >>>> should shield downstream projects from internals and allow them to > > >>>> interoperate with multiple future Flink versions in the same release > > >> line > > >>>> without forced upgrades. > > >>>> > > >>>> Thanks, > > >>>> Thomas > > >>>> > > >>>> [1] https://github.com/apache/flink/pull/7249 > > >>>> [2] > > >>>> > > >>>> > > >>> > > >> > > > https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto > > >>>> [3] > > >>>> > > >>>> > > >>> > > >> > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java > > >>>> [4] > > >>>> > > >>>> > > >>> > > >> > > > https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E > > >>>> > > >>>> > > >>>> On Thu, Dec 20, 2018 at 6:15 PM Jeff Zhang <zjf...@gmail.com> > wrote: > > >>>> > > >>>>>>>> I'm not so sure whether the user should be able to define where > > >> the > > >>>> job > > >>>>> runs (in your example Yarn). This is actually independent of the > job > > >>>>> development and is something which is decided at deployment time. > > >>>>> > > >>>>> User don't need to specify execution mode programmatically. They > can > > >>> also > > >>>>> pass the execution mode from the arguments in flink run command. > e.g. > > >>>>> > > >>>>> bin/flink run -m yarn-cluster .... > > >>>>> bin/flink run -m local ... > > >>>>> bin/flink run -m host:port ... > > >>>>> > > >>>>> Does this make sense to you ? > > >>>>> > > >>>>>>>> To me it makes sense that the ExecutionEnvironment is not > > >> directly > > >>>>> initialized by the user and instead context sensitive how you want > to > > >>>>> execute your job (Flink CLI vs. IDE, for example). > > >>>>> > > >>>>> Right, currently I notice Flink would create different > > >>>>> ContextExecutionEnvironment based on different submission scenarios > > >>>> (Flink > > >>>>> Cli vs IDE). To me this is kind of hack approach, not so > > >>> straightforward. > > >>>>> What I suggested above is that is that flink should always create > the > > >>>> same > > >>>>> ExecutionEnvironment but with different configuration, and based on > > >> the > > >>>>> configuration it would create the proper ClusterClient for > different > > >>>>> behaviors. > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> Till Rohrmann <trohrm...@apache.org> 于2018年12月20日周四 下午11:18写道: > > >>>>> > > >>>>>> You are probably right that we have code duplication when it comes > > >> to > > >>>> the > > >>>>>> creation of the ClusterClient. This should be reduced in the > > >> future. > > >>>>>> > > >>>>>> I'm not so sure whether the user should be able to define where > the > > >>> job > > >>>>>> runs (in your example Yarn). This is actually independent of the > > >> job > > >>>>>> development and is something which is decided at deployment time. > > >> To > > >>> me > > >>>>> it > > >>>>>> makes sense that the ExecutionEnvironment is not directly > > >> initialized > > >>>> by > > >>>>>> the user and instead context sensitive how you want to execute > your > > >>> job > > >>>>>> (Flink CLI vs. IDE, for example). However, I agree that the > > >>>>>> ExecutionEnvironment should give you access to the ClusterClient > > >> and > > >>> to > > >>>>> the > > >>>>>> job (maybe in the form of the JobGraph or a job plan). > > >>>>>> > > >>>>>> Cheers, > > >>>>>> Till > > >>>>>> > > >>>>>> On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang <zjf...@gmail.com> > > >> wrote: > > >>>>>> > > >>>>>>> Hi Till, > > >>>>>>> Thanks for the feedback. You are right that I expect better > > >>>>> programmatic > > >>>>>>> job submission/control api which could be used by downstream > > >>> project. > > >>>>> And > > >>>>>>> it would benefit for the flink ecosystem. When I look at the code > > >>> of > > >>>>>> flink > > >>>>>>> scala-shell and sql-client (I believe they are not the core of > > >>> flink, > > >>>>> but > > >>>>>>> belong to the ecosystem of flink), I find many duplicated code > > >> for > > >>>>>> creating > > >>>>>>> ClusterClient from user provided configuration (configuration > > >>> format > > >>>>> may > > >>>>>> be > > >>>>>>> different from scala-shell and sql-client) and then use that > > >>>>>> ClusterClient > > >>>>>>> to manipulate jobs. I don't think this is convenient for > > >> downstream > > >>>>>>> projects. What I expect is that downstream project only needs to > > >>>>> provide > > >>>>>>> necessary configuration info (maybe introducing class FlinkConf), > > >>> and > > >>>>>> then > > >>>>>>> build ExecutionEnvironment based on this FlinkConf, and > > >>>>>>> ExecutionEnvironment will create the proper ClusterClient. It not > > >>>> only > > >>>>>>> benefit for the downstream project development but also be > > >> helpful > > >>>> for > > >>>>>>> their integration test with flink. Here's one sample code snippet > > >>>> that > > >>>>> I > > >>>>>>> expect. > > >>>>>>> > > >>>>>>> val conf = new FlinkConf().mode("yarn") > > >>>>>>> val env = new ExecutionEnvironment(conf) > > >>>>>>> val jobId = env.submit(...) > > >>>>>>> val jobStatus = env.getClusterClient().queryJobStatus(jobId) > > >>>>>>> env.getClusterClient().cancelJob(jobId) > > >>>>>>> > > >>>>>>> What do you think ? > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> Till Rohrmann <trohrm...@apache.org> 于2018年12月11日周二 下午6:28写道: > > >>>>>>> > > >>>>>>>> Hi Jeff, > > >>>>>>>> > > >>>>>>>> what you are proposing is to provide the user with better > > >>>>> programmatic > > >>>>>>> job > > >>>>>>>> control. There was actually an effort to achieve this but it > > >> has > > >>>>> never > > >>>>>>> been > > >>>>>>>> completed [1]. However, there are some improvement in the code > > >>> base > > >>>>>> now. > > >>>>>>>> Look for example at the NewClusterClient interface which > > >> offers a > > >>>>>>>> non-blocking job submission. But I agree that we need to > > >> improve > > >>>>> Flink > > >>>>>> in > > >>>>>>>> this regard. > > >>>>>>>> > > >>>>>>>> I would not be in favour if exposing all ClusterClient calls > > >> via > > >>>> the > > >>>>>>>> ExecutionEnvironment because it would clutter the class and > > >> would > > >>>> not > > >>>>>> be > > >>>>>>> a > > >>>>>>>> good separation of concerns. Instead one idea could be to > > >>> retrieve > > >>>>> the > > >>>>>>>> current ClusterClient from the ExecutionEnvironment which can > > >>> then > > >>>> be > > >>>>>>> used > > >>>>>>>> for cluster and job control. But before we start an effort > > >> here, > > >>> we > > >>>>>> need > > >>>>>>> to > > >>>>>>>> agree and capture what functionality we want to provide. > > >>>>>>>> > > >>>>>>>> Initially, the idea was that we have the ClusterDescriptor > > >>>> describing > > >>>>>> how > > >>>>>>>> to talk to cluster manager like Yarn or Mesos. The > > >>>> ClusterDescriptor > > >>>>>> can > > >>>>>>> be > > >>>>>>>> used for deploying Flink clusters (job and session) and gives > > >>> you a > > >>>>>>>> ClusterClient. The ClusterClient controls the cluster (e.g. > > >>>>> submitting > > >>>>>>>> jobs, listing all running jobs). And then there was the idea to > > >>>>>>> introduce a > > >>>>>>>> JobClient which you obtain from the ClusterClient to trigger > > >> job > > >>>>>> specific > > >>>>>>>> operations (e.g. taking a savepoint, cancelling the job). > > >>>>>>>> > > >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-4272 > > >>>>>>>> > > >>>>>>>> Cheers, > > >>>>>>>> Till > > >>>>>>>> > > >>>>>>>> On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang <zjf...@gmail.com> > > >>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Hi Folks, > > >>>>>>>>> > > >>>>>>>>> I am trying to integrate flink into apache zeppelin which is > > >> an > > >>>>>>>> interactive > > >>>>>>>>> notebook. And I hit several issues that is caused by flink > > >>> client > > >>>>>> api. > > >>>>>>> So > > >>>>>>>>> I'd like to proposal the following changes for flink client > > >>> api. > > >>>>>>>>> > > >>>>>>>>> 1. Support nonblocking execution. Currently, > > >>>>>>> ExecutionEnvironment#execute > > >>>>>>>>> is a blocking method which would do 2 things, first submit > > >> job > > >>>> and > > >>>>>> then > > >>>>>>>>> wait for job until it is finished. I'd like introduce a > > >>>> nonblocking > > >>>>>>>>> execution method like ExecutionEnvironment#submit which only > > >>>> submit > > >>>>>> job > > >>>>>>>> and > > >>>>>>>>> then return jobId to client. And allow user to query the job > > >>>> status > > >>>>>> via > > >>>>>>>> the > > >>>>>>>>> jobId. > > >>>>>>>>> > > >>>>>>>>> 2. Add cancel api in > > >>>>> ExecutionEnvironment/StreamExecutionEnvironment, > > >>>>>>>>> currently the only way to cancel job is via cli (bin/flink), > > >>> this > > >>>>> is > > >>>>>>> not > > >>>>>>>>> convenient for downstream project to use this feature. So I'd > > >>>> like > > >>>>> to > > >>>>>>> add > > >>>>>>>>> cancel api in ExecutionEnvironment > > >>>>>>>>> > > >>>>>>>>> 3. Add savepoint api in > > >>>>>>> ExecutionEnvironment/StreamExecutionEnvironment. > > >>>>>>>> It > > >>>>>>>>> is similar as cancel api, we should use ExecutionEnvironment > > >> as > > >>>> the > > >>>>>>>> unified > > >>>>>>>>> api for third party to integrate with flink. > > >>>>>>>>> > > >>>>>>>>> 4. Add listener for job execution lifecycle. Something like > > >>>>>> following, > > >>>>>>> so > > >>>>>>>>> that downstream project can do custom logic in the lifecycle > > >> of > > >>>>> job. > > >>>>>>> e.g. > > >>>>>>>>> Zeppelin would capture the jobId after job is submitted and > > >>> then > > >>>>> use > > >>>>>>> this > > >>>>>>>>> jobId to cancel it later when necessary. > > >>>>>>>>> > > >>>>>>>>> public interface JobListener { > > >>>>>>>>> > > >>>>>>>>> void onJobSubmitted(JobID jobId); > > >>>>>>>>> > > >>>>>>>>> void onJobExecuted(JobExecutionResult jobResult); > > >>>>>>>>> > > >>>>>>>>> void onJobCanceled(JobID jobId); > > >>>>>>>>> } > > >>>>>>>>> > > >>>>>>>>> 5. Enable session in ExecutionEnvironment. Currently it is > > >>>>> disabled, > > >>>>>>> but > > >>>>>>>>> session is very convenient for third party to submitting jobs > > >>>>>>>> continually. > > >>>>>>>>> I hope flink can enable it again. > > >>>>>>>>> > > >>>>>>>>> 6. Unify all flink client api into > > >>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment. > > >>>>>>>>> > > >>>>>>>>> This is a long term issue which needs more careful thinking > > >> and > > >>>>>> design. > > >>>>>>>>> Currently some of features of flink is exposed in > > >>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment, but some are > > >>>>> exposed > > >>>>>>> in > > >>>>>>>>> cli instead of api, like the cancel and savepoint I mentioned > > >>>>> above. > > >>>>>> I > > >>>>>>>>> think the root cause is due to that flink didn't unify the > > >>>>>> interaction > > >>>>>>>> with > > >>>>>>>>> flink. Here I list 3 scenarios of flink operation > > >>>>>>>>> > > >>>>>>>>> - Local job execution. Flink will create LocalEnvironment > > >>> and > > >>>>>> then > > >>>>>>>> use > > >>>>>>>>> this LocalEnvironment to create LocalExecutor for job > > >>>> execution. > > >>>>>>>>> - Remote job execution. Flink will create ClusterClient > > >>> first > > >>>>> and > > >>>>>>> then > > >>>>>>>>> create ContextEnvironment based on the ClusterClient and > > >>> then > > >>>>> run > > >>>>>>> the > > >>>>>>>>> job. > > >>>>>>>>> - Job cancelation. Flink will create ClusterClient first > > >> and > > >>>>> then > > >>>>>>>> cancel > > >>>>>>>>> this job via this ClusterClient. > > >>>>>>>>> > > >>>>>>>>> As you can see in the above 3 scenarios. Flink didn't use the > > >>>> same > > >>>>>>>>> approach(code path) to interact with flink > > >>>>>>>>> What I propose is following: > > >>>>>>>>> Create the proper LocalEnvironment/RemoteEnvironment (based > > >> on > > >>>> user > > >>>>>>>>> configuration) --> Use this Environment to create proper > > >>>>>> ClusterClient > > >>>>>>>>> (LocalClusterClient or RestClusterClient) to interactive with > > >>>>> Flink ( > > >>>>>>> job > > >>>>>>>>> execution or cancelation) > > >>>>>>>>> > > >>>>>>>>> This way we can unify the process of local execution and > > >> remote > > >>>>>>>> execution. > > >>>>>>>>> And it is much easier for third party to integrate with > > >> flink, > > >>>>>> because > > >>>>>>>>> ExecutionEnvironment is the unified entry point for flink. > > >> What > > >>>>> third > > >>>>>>>> party > > >>>>>>>>> needs to do is just pass configuration to > > >> ExecutionEnvironment > > >>>> and > > >>>>>>>>> ExecutionEnvironment will do the right thing based on the > > >>>>>>> configuration. > > >>>>>>>>> Flink cli can also be considered as flink api consumer. it > > >> just > > >>>>> pass > > >>>>>>> the > > >>>>>>>>> configuration to ExecutionEnvironment and let > > >>>> ExecutionEnvironment > > >>>>> to > > >>>>>>>>> create the proper ClusterClient instead of letting cli to > > >>> create > > >>>>>>>>> ClusterClient directly. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> 6 would involve large code refactoring, so I think we can > > >> defer > > >>>> it > > >>>>>> for > > >>>>>>>>> future release, 1,2,3,4,5 could be done at once I believe. > > >> Let > > >>> me > > >>>>>> know > > >>>>>>>> your > > >>>>>>>>> comments and feedback, thanks > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> -- > > >>>>>>>>> Best Regards > > >>>>>>>>> > > >>>>>>>>> Jeff Zhang > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> -- > > >>>>>>> Best Regards > > >>>>>>> > > >>>>>>> Jeff Zhang > > >>>>>>> > > >>>>>> > > >>>>> > > >>>>> > > >>>>> -- > > >>>>> Best Regards > > >>>>> > > >>>>> Jeff Zhang > > >>>>> > > >>>> > > >>> > > >>> > > >>> -- > > >>> Best Regards > > >>> > > >>> Jeff Zhang > > >>> > > >> > > > > > > > > > -- > > > Best Regards > > > > > > Jeff Zhang > > > > > > -- > Best Regards > > Jeff Zhang >