Hi Jeff and Flavio,

Thanks Jeff a lot for proposing the design document.

We are also working on refactoring ClusterClient to allow flexible and
efficient job management in our real-time platform.
We would like to draft a document to share our ideas with you.

I think it's a good idea to have something like Apache Livy for Flink, and
the efforts discussed here will take a great step forward to it.

Regards,
Xiaogang

Flavio Pompermaier <pomperma...@okkam.it> 于2019年6月17日周一 下午7:13写道:

> Is there any possibility to have something like Apache Livy [1] also for
> Flink in the future?
>
> [1] https://livy.apache.org/
>
> On Tue, Jun 11, 2019 at 5:23 PM Jeff Zhang <zjf...@gmail.com> wrote:
>
> > >>>  Any API we expose should not have dependencies on the runtime
> > (flink-runtime) package or other implementation details. To me, this
> means
> > that the current ClusterClient cannot be exposed to users because it
>  uses
> > quite some classes from the optimiser and runtime packages.
> >
> > We should change ClusterClient from class to interface.
> > ExecutionEnvironment only use the interface ClusterClient which should be
> > in flink-clients while the concrete implementation class could be in
> > flink-runtime.
> >
> > >>> What happens when a failure/restart in the client happens? There need
> > to be a way of re-establishing the connection to the job, set up the
> > listeners again, etc.
> >
> > Good point.  First we need to define what does failure/restart in the
> > client mean. IIUC, that usually mean network failure which will happen in
> > class RestClient. If my understanding is correct, restart/retry mechanism
> > should be done in RestClient.
> >
> >
> >
> >
> >
> > Aljoscha Krettek <aljos...@apache.org> 于2019年6月11日周二 下午11:10写道:
> >
> > > Some points to consider:
> > >
> > > * Any API we expose should not have dependencies on the runtime
> > > (flink-runtime) package or other implementation details. To me, this
> > means
> > > that the current ClusterClient cannot be exposed to users because it
> >  uses
> > > quite some classes from the optimiser and runtime packages.
> > >
> > > * What happens when a failure/restart in the client happens? There need
> > to
> > > be a way of re-establishing the connection to the job, set up the
> > listeners
> > > again, etc.
> > >
> > > Aljoscha
> > >
> > > > On 29. May 2019, at 10:17, Jeff Zhang <zjf...@gmail.com> wrote:
> > > >
> > > > Sorry folks, the design doc is late as you expected. Here's the
> design
> > > doc
> > > > I drafted, welcome any comments and feedback.
> > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing
> > > >
> > > >
> > > >
> > > > Stephan Ewen <se...@apache.org> 于2019年2月14日周四 下午8:43写道:
> > > >
> > > >> Nice that this discussion is happening.
> > > >>
> > > >> In the FLIP, we could also revisit the entire role of the
> environments
> > > >> again.
> > > >>
> > > >> Initially, the idea was:
> > > >>  - the environments take care of the specific setup for standalone
> (no
> > > >> setup needed), yarn, mesos, etc.
> > > >>  - the session ones have control over the session. The environment
> > holds
> > > >> the session client.
> > > >>  - running a job gives a "control" object for that job. That
> behavior
> > is
> > > >> the same in all environments.
> > > >>
> > > >> The actual implementation diverged quite a bit from that. Happy to
> > see a
> > > >> discussion about straitening this out a bit more.
> > > >>
> > > >>
> > > >> On Tue, Feb 12, 2019 at 4:58 AM Jeff Zhang <zjf...@gmail.com>
> wrote:
> > > >>
> > > >>> Hi folks,
> > > >>>
> > > >>> Sorry for late response, It seems we reach consensus on this, I
> will
> > > >> create
> > > >>> FLIP for this with more detailed design
> > > >>>
> > > >>>
> > > >>> Thomas Weise <t...@apache.org> 于2018年12月21日周五 上午11:43写道:
> > > >>>
> > > >>>> Great to see this discussion seeded! The problems you face with
> the
> > > >>>> Zeppelin integration are also affecting other downstream projects,
> > > like
> > > >>>> Beam.
> > > >>>>
> > > >>>> We just enabled the savepoint restore option in
> > > RemoteStreamEnvironment
> > > >>> [1]
> > > >>>> and that was more difficult than it should be. The main issue is
> > that
> > > >>>> environment and cluster client aren't decoupled. Ideally it should
> > be
> > > >>>> possible to just get the matching cluster client from the
> > environment
> > > >> and
> > > >>>> then control the job through it (environment as factory for
> cluster
> > > >>>> client). But note that the environment classes are part of the
> > public
> > > >>> API,
> > > >>>> and it is not straightforward to make larger changes without
> > breaking
> > > >>>> backward compatibility.
> > > >>>>
> > > >>>> ClusterClient currently exposes internal classes like JobGraph and
> > > >>>> StreamGraph. But it should be possible to wrap this with a new
> > public
> > > >> API
> > > >>>> that brings the required job control capabilities for downstream
> > > >>> projects.
> > > >>>> Perhaps it is helpful to look at some of the interfaces in Beam
> > while
> > > >>>> thinking about this: [2] for the portable job API and [3] for the
> > old
> > > >>>> asynchronous job control from the Beam Java SDK.
> > > >>>>
> > > >>>> The backward compatibility discussion [4] is also relevant here. A
> > new
> > > >>> API
> > > >>>> should shield downstream projects from internals and allow them to
> > > >>>> interoperate with multiple future Flink versions in the same
> release
> > > >> line
> > > >>>> without forced upgrades.
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Thomas
> > > >>>>
> > > >>>> [1] https://github.com/apache/flink/pull/7249
> > > >>>> [2]
> > > >>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
> https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto
> > > >>>> [3]
> > > >>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
> > > >>>> [4]
> > > >>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
> https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E
> > > >>>>
> > > >>>>
> > > >>>> On Thu, Dec 20, 2018 at 6:15 PM Jeff Zhang <zjf...@gmail.com>
> > wrote:
> > > >>>>
> > > >>>>>>>> I'm not so sure whether the user should be able to define
> where
> > > >> the
> > > >>>> job
> > > >>>>> runs (in your example Yarn). This is actually independent of the
> > job
> > > >>>>> development and is something which is decided at deployment time.
> > > >>>>>
> > > >>>>> User don't need to specify execution mode programmatically. They
> > can
> > > >>> also
> > > >>>>> pass the execution mode from the arguments in flink run command.
> > e.g.
> > > >>>>>
> > > >>>>> bin/flink run -m yarn-cluster ....
> > > >>>>> bin/flink run -m local ...
> > > >>>>> bin/flink run -m host:port ...
> > > >>>>>
> > > >>>>> Does this make sense to you ?
> > > >>>>>
> > > >>>>>>>> To me it makes sense that the ExecutionEnvironment is not
> > > >> directly
> > > >>>>> initialized by the user and instead context sensitive how you
> want
> > to
> > > >>>>> execute your job (Flink CLI vs. IDE, for example).
> > > >>>>>
> > > >>>>> Right, currently I notice Flink would create different
> > > >>>>> ContextExecutionEnvironment based on different submission
> scenarios
> > > >>>> (Flink
> > > >>>>> Cli vs IDE). To me this is kind of hack approach, not so
> > > >>> straightforward.
> > > >>>>> What I suggested above is that is that flink should always create
> > the
> > > >>>> same
> > > >>>>> ExecutionEnvironment but with different configuration, and based
> on
> > > >> the
> > > >>>>> configuration it would create the proper ClusterClient for
> > different
> > > >>>>> behaviors.
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> Till Rohrmann <trohrm...@apache.org> 于2018年12月20日周四 下午11:18写道:
> > > >>>>>
> > > >>>>>> You are probably right that we have code duplication when it
> comes
> > > >> to
> > > >>>> the
> > > >>>>>> creation of the ClusterClient. This should be reduced in the
> > > >> future.
> > > >>>>>>
> > > >>>>>> I'm not so sure whether the user should be able to define where
> > the
> > > >>> job
> > > >>>>>> runs (in your example Yarn). This is actually independent of the
> > > >> job
> > > >>>>>> development and is something which is decided at deployment
> time.
> > > >> To
> > > >>> me
> > > >>>>> it
> > > >>>>>> makes sense that the ExecutionEnvironment is not directly
> > > >> initialized
> > > >>>> by
> > > >>>>>> the user and instead context sensitive how you want to execute
> > your
> > > >>> job
> > > >>>>>> (Flink CLI vs. IDE, for example). However, I agree that the
> > > >>>>>> ExecutionEnvironment should give you access to the ClusterClient
> > > >> and
> > > >>> to
> > > >>>>> the
> > > >>>>>> job (maybe in the form of the JobGraph or a job plan).
> > > >>>>>>
> > > >>>>>> Cheers,
> > > >>>>>> Till
> > > >>>>>>
> > > >>>>>> On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang <zjf...@gmail.com>
> > > >> wrote:
> > > >>>>>>
> > > >>>>>>> Hi Till,
> > > >>>>>>> Thanks for the feedback. You are right that I expect better
> > > >>>>> programmatic
> > > >>>>>>> job submission/control api which could be used by downstream
> > > >>> project.
> > > >>>>> And
> > > >>>>>>> it would benefit for the flink ecosystem. When I look at the
> code
> > > >>> of
> > > >>>>>> flink
> > > >>>>>>> scala-shell and sql-client (I believe they are not the core of
> > > >>> flink,
> > > >>>>> but
> > > >>>>>>> belong to the ecosystem of flink), I find many duplicated code
> > > >> for
> > > >>>>>> creating
> > > >>>>>>> ClusterClient from user provided configuration (configuration
> > > >>> format
> > > >>>>> may
> > > >>>>>> be
> > > >>>>>>> different from scala-shell and sql-client) and then use that
> > > >>>>>> ClusterClient
> > > >>>>>>> to manipulate jobs. I don't think this is convenient for
> > > >> downstream
> > > >>>>>>> projects. What I expect is that downstream project only needs
> to
> > > >>>>> provide
> > > >>>>>>> necessary configuration info (maybe introducing class
> FlinkConf),
> > > >>> and
> > > >>>>>> then
> > > >>>>>>> build ExecutionEnvironment based on this FlinkConf, and
> > > >>>>>>> ExecutionEnvironment will create the proper ClusterClient. It
> not
> > > >>>> only
> > > >>>>>>> benefit for the downstream project development but also be
> > > >> helpful
> > > >>>> for
> > > >>>>>>> their integration test with flink. Here's one sample code
> snippet
> > > >>>> that
> > > >>>>> I
> > > >>>>>>> expect.
> > > >>>>>>>
> > > >>>>>>> val conf = new FlinkConf().mode("yarn")
> > > >>>>>>> val env = new ExecutionEnvironment(conf)
> > > >>>>>>> val jobId = env.submit(...)
> > > >>>>>>> val jobStatus = env.getClusterClient().queryJobStatus(jobId)
> > > >>>>>>> env.getClusterClient().cancelJob(jobId)
> > > >>>>>>>
> > > >>>>>>> What do you think ?
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Till Rohrmann <trohrm...@apache.org> 于2018年12月11日周二 下午6:28写道:
> > > >>>>>>>
> > > >>>>>>>> Hi Jeff,
> > > >>>>>>>>
> > > >>>>>>>> what you are proposing is to provide the user with better
> > > >>>>> programmatic
> > > >>>>>>> job
> > > >>>>>>>> control. There was actually an effort to achieve this but it
> > > >> has
> > > >>>>> never
> > > >>>>>>> been
> > > >>>>>>>> completed [1]. However, there are some improvement in the code
> > > >>> base
> > > >>>>>> now.
> > > >>>>>>>> Look for example at the NewClusterClient interface which
> > > >> offers a
> > > >>>>>>>> non-blocking job submission. But I agree that we need to
> > > >> improve
> > > >>>>> Flink
> > > >>>>>> in
> > > >>>>>>>> this regard.
> > > >>>>>>>>
> > > >>>>>>>> I would not be in favour if exposing all ClusterClient calls
> > > >> via
> > > >>>> the
> > > >>>>>>>> ExecutionEnvironment because it would clutter the class and
> > > >> would
> > > >>>> not
> > > >>>>>> be
> > > >>>>>>> a
> > > >>>>>>>> good separation of concerns. Instead one idea could be to
> > > >>> retrieve
> > > >>>>> the
> > > >>>>>>>> current ClusterClient from the ExecutionEnvironment which can
> > > >>> then
> > > >>>> be
> > > >>>>>>> used
> > > >>>>>>>> for cluster and job control. But before we start an effort
> > > >> here,
> > > >>> we
> > > >>>>>> need
> > > >>>>>>> to
> > > >>>>>>>> agree and capture what functionality we want to provide.
> > > >>>>>>>>
> > > >>>>>>>> Initially, the idea was that we have the ClusterDescriptor
> > > >>>> describing
> > > >>>>>> how
> > > >>>>>>>> to talk to cluster manager like Yarn or Mesos. The
> > > >>>> ClusterDescriptor
> > > >>>>>> can
> > > >>>>>>> be
> > > >>>>>>>> used for deploying Flink clusters (job and session) and gives
> > > >>> you a
> > > >>>>>>>> ClusterClient. The ClusterClient controls the cluster (e.g.
> > > >>>>> submitting
> > > >>>>>>>> jobs, listing all running jobs). And then there was the idea
> to
> > > >>>>>>> introduce a
> > > >>>>>>>> JobClient which you obtain from the ClusterClient to trigger
> > > >> job
> > > >>>>>> specific
> > > >>>>>>>> operations (e.g. taking a savepoint, cancelling the job).
> > > >>>>>>>>
> > > >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-4272
> > > >>>>>>>>
> > > >>>>>>>> Cheers,
> > > >>>>>>>> Till
> > > >>>>>>>>
> > > >>>>>>>> On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang <zjf...@gmail.com
> >
> > > >>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Hi Folks,
> > > >>>>>>>>>
> > > >>>>>>>>> I am trying to integrate flink into apache zeppelin which is
> > > >> an
> > > >>>>>>>> interactive
> > > >>>>>>>>> notebook. And I hit several issues that is caused by flink
> > > >>> client
> > > >>>>>> api.
> > > >>>>>>> So
> > > >>>>>>>>> I'd like to proposal the following changes for flink client
> > > >>> api.
> > > >>>>>>>>>
> > > >>>>>>>>> 1. Support nonblocking execution. Currently,
> > > >>>>>>> ExecutionEnvironment#execute
> > > >>>>>>>>> is a blocking method which would do 2 things, first submit
> > > >> job
> > > >>>> and
> > > >>>>>> then
> > > >>>>>>>>> wait for job until it is finished. I'd like introduce a
> > > >>>> nonblocking
> > > >>>>>>>>> execution method like ExecutionEnvironment#submit which only
> > > >>>> submit
> > > >>>>>> job
> > > >>>>>>>> and
> > > >>>>>>>>> then return jobId to client. And allow user to query the job
> > > >>>> status
> > > >>>>>> via
> > > >>>>>>>> the
> > > >>>>>>>>> jobId.
> > > >>>>>>>>>
> > > >>>>>>>>> 2. Add cancel api in
> > > >>>>> ExecutionEnvironment/StreamExecutionEnvironment,
> > > >>>>>>>>> currently the only way to cancel job is via cli (bin/flink),
> > > >>> this
> > > >>>>> is
> > > >>>>>>> not
> > > >>>>>>>>> convenient for downstream project to use this feature. So I'd
> > > >>>> like
> > > >>>>> to
> > > >>>>>>> add
> > > >>>>>>>>> cancel api in ExecutionEnvironment
> > > >>>>>>>>>
> > > >>>>>>>>> 3. Add savepoint api in
> > > >>>>>>> ExecutionEnvironment/StreamExecutionEnvironment.
> > > >>>>>>>> It
> > > >>>>>>>>> is similar as cancel api, we should use ExecutionEnvironment
> > > >> as
> > > >>>> the
> > > >>>>>>>> unified
> > > >>>>>>>>> api for third party to integrate with flink.
> > > >>>>>>>>>
> > > >>>>>>>>> 4. Add listener for job execution lifecycle. Something like
> > > >>>>>> following,
> > > >>>>>>> so
> > > >>>>>>>>> that downstream project can do custom logic in the lifecycle
> > > >> of
> > > >>>>> job.
> > > >>>>>>> e.g.
> > > >>>>>>>>> Zeppelin would capture the jobId after job is submitted and
> > > >>> then
> > > >>>>> use
> > > >>>>>>> this
> > > >>>>>>>>> jobId to cancel it later when necessary.
> > > >>>>>>>>>
> > > >>>>>>>>> public interface JobListener {
> > > >>>>>>>>>
> > > >>>>>>>>>   void onJobSubmitted(JobID jobId);
> > > >>>>>>>>>
> > > >>>>>>>>>   void onJobExecuted(JobExecutionResult jobResult);
> > > >>>>>>>>>
> > > >>>>>>>>>   void onJobCanceled(JobID jobId);
> > > >>>>>>>>> }
> > > >>>>>>>>>
> > > >>>>>>>>> 5. Enable session in ExecutionEnvironment. Currently it is
> > > >>>>> disabled,
> > > >>>>>>> but
> > > >>>>>>>>> session is very convenient for third party to submitting jobs
> > > >>>>>>>> continually.
> > > >>>>>>>>> I hope flink can enable it again.
> > > >>>>>>>>>
> > > >>>>>>>>> 6. Unify all flink client api into
> > > >>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment.
> > > >>>>>>>>>
> > > >>>>>>>>> This is a long term issue which needs more careful thinking
> > > >> and
> > > >>>>>> design.
> > > >>>>>>>>> Currently some of features of flink is exposed in
> > > >>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment, but some are
> > > >>>>> exposed
> > > >>>>>>> in
> > > >>>>>>>>> cli instead of api, like the cancel and savepoint I mentioned
> > > >>>>> above.
> > > >>>>>> I
> > > >>>>>>>>> think the root cause is due to that flink didn't unify the
> > > >>>>>> interaction
> > > >>>>>>>> with
> > > >>>>>>>>> flink. Here I list 3 scenarios of flink operation
> > > >>>>>>>>>
> > > >>>>>>>>>   - Local job execution.  Flink will create LocalEnvironment
> > > >>> and
> > > >>>>>> then
> > > >>>>>>>> use
> > > >>>>>>>>>   this LocalEnvironment to create LocalExecutor for job
> > > >>>> execution.
> > > >>>>>>>>>   - Remote job execution. Flink will create ClusterClient
> > > >>> first
> > > >>>>> and
> > > >>>>>>> then
> > > >>>>>>>>>   create ContextEnvironment based on the ClusterClient and
> > > >>> then
> > > >>>>> run
> > > >>>>>>> the
> > > >>>>>>>>> job.
> > > >>>>>>>>>   - Job cancelation. Flink will create ClusterClient first
> > > >> and
> > > >>>>> then
> > > >>>>>>>> cancel
> > > >>>>>>>>>   this job via this ClusterClient.
> > > >>>>>>>>>
> > > >>>>>>>>> As you can see in the above 3 scenarios. Flink didn't use the
> > > >>>> same
> > > >>>>>>>>> approach(code path) to interact with flink
> > > >>>>>>>>> What I propose is following:
> > > >>>>>>>>> Create the proper LocalEnvironment/RemoteEnvironment (based
> > > >> on
> > > >>>> user
> > > >>>>>>>>> configuration) --> Use this Environment to create proper
> > > >>>>>> ClusterClient
> > > >>>>>>>>> (LocalClusterClient or RestClusterClient) to interactive with
> > > >>>>> Flink (
> > > >>>>>>> job
> > > >>>>>>>>> execution or cancelation)
> > > >>>>>>>>>
> > > >>>>>>>>> This way we can unify the process of local execution and
> > > >> remote
> > > >>>>>>>> execution.
> > > >>>>>>>>> And it is much easier for third party to integrate with
> > > >> flink,
> > > >>>>>> because
> > > >>>>>>>>> ExecutionEnvironment is the unified entry point for flink.
> > > >> What
> > > >>>>> third
> > > >>>>>>>> party
> > > >>>>>>>>> needs to do is just pass configuration to
> > > >> ExecutionEnvironment
> > > >>>> and
> > > >>>>>>>>> ExecutionEnvironment will do the right thing based on the
> > > >>>>>>> configuration.
> > > >>>>>>>>> Flink cli can also be considered as flink api consumer. it
> > > >> just
> > > >>>>> pass
> > > >>>>>>> the
> > > >>>>>>>>> configuration to ExecutionEnvironment and let
> > > >>>> ExecutionEnvironment
> > > >>>>> to
> > > >>>>>>>>> create the proper ClusterClient instead of letting cli to
> > > >>> create
> > > >>>>>>>>> ClusterClient directly.
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> 6 would involve large code refactoring, so I think we can
> > > >> defer
> > > >>>> it
> > > >>>>>> for
> > > >>>>>>>>> future release, 1,2,3,4,5 could be done at once I believe.
> > > >> Let
> > > >>> me
> > > >>>>>> know
> > > >>>>>>>> your
> > > >>>>>>>>> comments and feedback, thanks
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> --
> > > >>>>>>>>> Best Regards
> > > >>>>>>>>>
> > > >>>>>>>>> Jeff Zhang
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> --
> > > >>>>>>> Best Regards
> > > >>>>>>>
> > > >>>>>>> Jeff Zhang
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> --
> > > >>>>> Best Regards
> > > >>>>>
> > > >>>>> Jeff Zhang
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>>
> > > >>> --
> > > >>> Best Regards
> > > >>>
> > > >>> Jeff Zhang
> > > >>>
> > > >>
> > > >
> > > >
> > > > --
> > > > Best Regards
> > > >
> > > > Jeff Zhang
> > >
> > >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>

Reply via email to