Some points to consider:

* Any API we expose should not have dependencies on the runtime (flink-runtime) 
package or other implementation details. To me, this means that the current 
ClusterClient cannot be exposed to users because it   uses quite some classes 
from the optimiser and runtime packages.

* What happens when a failure/restart in the client happens? There need to be a 
way of re-establishing the connection to the job, set up the listeners again, 
etc.

Aljoscha

> On 29. May 2019, at 10:17, Jeff Zhang <zjf...@gmail.com> wrote:
> 
> Sorry folks, the design doc is late as you expected. Here's the design doc
> I drafted, welcome any comments and feedback.
> 
> https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing
> 
> 
> 
> Stephan Ewen <se...@apache.org> 于2019年2月14日周四 下午8:43写道:
> 
>> Nice that this discussion is happening.
>> 
>> In the FLIP, we could also revisit the entire role of the environments
>> again.
>> 
>> Initially, the idea was:
>>  - the environments take care of the specific setup for standalone (no
>> setup needed), yarn, mesos, etc.
>>  - the session ones have control over the session. The environment holds
>> the session client.
>>  - running a job gives a "control" object for that job. That behavior is
>> the same in all environments.
>> 
>> The actual implementation diverged quite a bit from that. Happy to see a
>> discussion about straitening this out a bit more.
>> 
>> 
>> On Tue, Feb 12, 2019 at 4:58 AM Jeff Zhang <zjf...@gmail.com> wrote:
>> 
>>> Hi folks,
>>> 
>>> Sorry for late response, It seems we reach consensus on this, I will
>> create
>>> FLIP for this with more detailed design
>>> 
>>> 
>>> Thomas Weise <t...@apache.org> 于2018年12月21日周五 上午11:43写道:
>>> 
>>>> Great to see this discussion seeded! The problems you face with the
>>>> Zeppelin integration are also affecting other downstream projects, like
>>>> Beam.
>>>> 
>>>> We just enabled the savepoint restore option in RemoteStreamEnvironment
>>> [1]
>>>> and that was more difficult than it should be. The main issue is that
>>>> environment and cluster client aren't decoupled. Ideally it should be
>>>> possible to just get the matching cluster client from the environment
>> and
>>>> then control the job through it (environment as factory for cluster
>>>> client). But note that the environment classes are part of the public
>>> API,
>>>> and it is not straightforward to make larger changes without breaking
>>>> backward compatibility.
>>>> 
>>>> ClusterClient currently exposes internal classes like JobGraph and
>>>> StreamGraph. But it should be possible to wrap this with a new public
>> API
>>>> that brings the required job control capabilities for downstream
>>> projects.
>>>> Perhaps it is helpful to look at some of the interfaces in Beam while
>>>> thinking about this: [2] for the portable job API and [3] for the old
>>>> asynchronous job control from the Beam Java SDK.
>>>> 
>>>> The backward compatibility discussion [4] is also relevant here. A new
>>> API
>>>> should shield downstream projects from internals and allow them to
>>>> interoperate with multiple future Flink versions in the same release
>> line
>>>> without forced upgrades.
>>>> 
>>>> Thanks,
>>>> Thomas
>>>> 
>>>> [1] https://github.com/apache/flink/pull/7249
>>>> [2]
>>>> 
>>>> 
>>> 
>> https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto
>>>> [3]
>>>> 
>>>> 
>>> 
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
>>>> [4]
>>>> 
>>>> 
>>> 
>> https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E
>>>> 
>>>> 
>>>> On Thu, Dec 20, 2018 at 6:15 PM Jeff Zhang <zjf...@gmail.com> wrote:
>>>> 
>>>>>>>> I'm not so sure whether the user should be able to define where
>> the
>>>> job
>>>>> runs (in your example Yarn). This is actually independent of the job
>>>>> development and is something which is decided at deployment time.
>>>>> 
>>>>> User don't need to specify execution mode programmatically. They can
>>> also
>>>>> pass the execution mode from the arguments in flink run command. e.g.
>>>>> 
>>>>> bin/flink run -m yarn-cluster ....
>>>>> bin/flink run -m local ...
>>>>> bin/flink run -m host:port ...
>>>>> 
>>>>> Does this make sense to you ?
>>>>> 
>>>>>>>> To me it makes sense that the ExecutionEnvironment is not
>> directly
>>>>> initialized by the user and instead context sensitive how you want to
>>>>> execute your job (Flink CLI vs. IDE, for example).
>>>>> 
>>>>> Right, currently I notice Flink would create different
>>>>> ContextExecutionEnvironment based on different submission scenarios
>>>> (Flink
>>>>> Cli vs IDE). To me this is kind of hack approach, not so
>>> straightforward.
>>>>> What I suggested above is that is that flink should always create the
>>>> same
>>>>> ExecutionEnvironment but with different configuration, and based on
>> the
>>>>> configuration it would create the proper ClusterClient for different
>>>>> behaviors.
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> Till Rohrmann <trohrm...@apache.org> 于2018年12月20日周四 下午11:18写道:
>>>>> 
>>>>>> You are probably right that we have code duplication when it comes
>> to
>>>> the
>>>>>> creation of the ClusterClient. This should be reduced in the
>> future.
>>>>>> 
>>>>>> I'm not so sure whether the user should be able to define where the
>>> job
>>>>>> runs (in your example Yarn). This is actually independent of the
>> job
>>>>>> development and is something which is decided at deployment time.
>> To
>>> me
>>>>> it
>>>>>> makes sense that the ExecutionEnvironment is not directly
>> initialized
>>>> by
>>>>>> the user and instead context sensitive how you want to execute your
>>> job
>>>>>> (Flink CLI vs. IDE, for example). However, I agree that the
>>>>>> ExecutionEnvironment should give you access to the ClusterClient
>> and
>>> to
>>>>> the
>>>>>> job (maybe in the form of the JobGraph or a job plan).
>>>>>> 
>>>>>> Cheers,
>>>>>> Till
>>>>>> 
>>>>>> On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang <zjf...@gmail.com>
>> wrote:
>>>>>> 
>>>>>>> Hi Till,
>>>>>>> Thanks for the feedback. You are right that I expect better
>>>>> programmatic
>>>>>>> job submission/control api which could be used by downstream
>>> project.
>>>>> And
>>>>>>> it would benefit for the flink ecosystem. When I look at the code
>>> of
>>>>>> flink
>>>>>>> scala-shell and sql-client (I believe they are not the core of
>>> flink,
>>>>> but
>>>>>>> belong to the ecosystem of flink), I find many duplicated code
>> for
>>>>>> creating
>>>>>>> ClusterClient from user provided configuration (configuration
>>> format
>>>>> may
>>>>>> be
>>>>>>> different from scala-shell and sql-client) and then use that
>>>>>> ClusterClient
>>>>>>> to manipulate jobs. I don't think this is convenient for
>> downstream
>>>>>>> projects. What I expect is that downstream project only needs to
>>>>> provide
>>>>>>> necessary configuration info (maybe introducing class FlinkConf),
>>> and
>>>>>> then
>>>>>>> build ExecutionEnvironment based on this FlinkConf, and
>>>>>>> ExecutionEnvironment will create the proper ClusterClient. It not
>>>> only
>>>>>>> benefit for the downstream project development but also be
>> helpful
>>>> for
>>>>>>> their integration test with flink. Here's one sample code snippet
>>>> that
>>>>> I
>>>>>>> expect.
>>>>>>> 
>>>>>>> val conf = new FlinkConf().mode("yarn")
>>>>>>> val env = new ExecutionEnvironment(conf)
>>>>>>> val jobId = env.submit(...)
>>>>>>> val jobStatus = env.getClusterClient().queryJobStatus(jobId)
>>>>>>> env.getClusterClient().cancelJob(jobId)
>>>>>>> 
>>>>>>> What do you think ?
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Till Rohrmann <trohrm...@apache.org> 于2018年12月11日周二 下午6:28写道:
>>>>>>> 
>>>>>>>> Hi Jeff,
>>>>>>>> 
>>>>>>>> what you are proposing is to provide the user with better
>>>>> programmatic
>>>>>>> job
>>>>>>>> control. There was actually an effort to achieve this but it
>> has
>>>>> never
>>>>>>> been
>>>>>>>> completed [1]. However, there are some improvement in the code
>>> base
>>>>>> now.
>>>>>>>> Look for example at the NewClusterClient interface which
>> offers a
>>>>>>>> non-blocking job submission. But I agree that we need to
>> improve
>>>>> Flink
>>>>>> in
>>>>>>>> this regard.
>>>>>>>> 
>>>>>>>> I would not be in favour if exposing all ClusterClient calls
>> via
>>>> the
>>>>>>>> ExecutionEnvironment because it would clutter the class and
>> would
>>>> not
>>>>>> be
>>>>>>> a
>>>>>>>> good separation of concerns. Instead one idea could be to
>>> retrieve
>>>>> the
>>>>>>>> current ClusterClient from the ExecutionEnvironment which can
>>> then
>>>> be
>>>>>>> used
>>>>>>>> for cluster and job control. But before we start an effort
>> here,
>>> we
>>>>>> need
>>>>>>> to
>>>>>>>> agree and capture what functionality we want to provide.
>>>>>>>> 
>>>>>>>> Initially, the idea was that we have the ClusterDescriptor
>>>> describing
>>>>>> how
>>>>>>>> to talk to cluster manager like Yarn or Mesos. The
>>>> ClusterDescriptor
>>>>>> can
>>>>>>> be
>>>>>>>> used for deploying Flink clusters (job and session) and gives
>>> you a
>>>>>>>> ClusterClient. The ClusterClient controls the cluster (e.g.
>>>>> submitting
>>>>>>>> jobs, listing all running jobs). And then there was the idea to
>>>>>>> introduce a
>>>>>>>> JobClient which you obtain from the ClusterClient to trigger
>> job
>>>>>> specific
>>>>>>>> operations (e.g. taking a savepoint, cancelling the job).
>>>>>>>> 
>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-4272
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>> 
>>>>>>>> On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang <zjf...@gmail.com>
>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi Folks,
>>>>>>>>> 
>>>>>>>>> I am trying to integrate flink into apache zeppelin which is
>> an
>>>>>>>> interactive
>>>>>>>>> notebook. And I hit several issues that is caused by flink
>>> client
>>>>>> api.
>>>>>>> So
>>>>>>>>> I'd like to proposal the following changes for flink client
>>> api.
>>>>>>>>> 
>>>>>>>>> 1. Support nonblocking execution. Currently,
>>>>>>> ExecutionEnvironment#execute
>>>>>>>>> is a blocking method which would do 2 things, first submit
>> job
>>>> and
>>>>>> then
>>>>>>>>> wait for job until it is finished. I'd like introduce a
>>>> nonblocking
>>>>>>>>> execution method like ExecutionEnvironment#submit which only
>>>> submit
>>>>>> job
>>>>>>>> and
>>>>>>>>> then return jobId to client. And allow user to query the job
>>>> status
>>>>>> via
>>>>>>>> the
>>>>>>>>> jobId.
>>>>>>>>> 
>>>>>>>>> 2. Add cancel api in
>>>>> ExecutionEnvironment/StreamExecutionEnvironment,
>>>>>>>>> currently the only way to cancel job is via cli (bin/flink),
>>> this
>>>>> is
>>>>>>> not
>>>>>>>>> convenient for downstream project to use this feature. So I'd
>>>> like
>>>>> to
>>>>>>> add
>>>>>>>>> cancel api in ExecutionEnvironment
>>>>>>>>> 
>>>>>>>>> 3. Add savepoint api in
>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment.
>>>>>>>> It
>>>>>>>>> is similar as cancel api, we should use ExecutionEnvironment
>> as
>>>> the
>>>>>>>> unified
>>>>>>>>> api for third party to integrate with flink.
>>>>>>>>> 
>>>>>>>>> 4. Add listener for job execution lifecycle. Something like
>>>>>> following,
>>>>>>> so
>>>>>>>>> that downstream project can do custom logic in the lifecycle
>> of
>>>>> job.
>>>>>>> e.g.
>>>>>>>>> Zeppelin would capture the jobId after job is submitted and
>>> then
>>>>> use
>>>>>>> this
>>>>>>>>> jobId to cancel it later when necessary.
>>>>>>>>> 
>>>>>>>>> public interface JobListener {
>>>>>>>>> 
>>>>>>>>>   void onJobSubmitted(JobID jobId);
>>>>>>>>> 
>>>>>>>>>   void onJobExecuted(JobExecutionResult jobResult);
>>>>>>>>> 
>>>>>>>>>   void onJobCanceled(JobID jobId);
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> 5. Enable session in ExecutionEnvironment. Currently it is
>>>>> disabled,
>>>>>>> but
>>>>>>>>> session is very convenient for third party to submitting jobs
>>>>>>>> continually.
>>>>>>>>> I hope flink can enable it again.
>>>>>>>>> 
>>>>>>>>> 6. Unify all flink client api into
>>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment.
>>>>>>>>> 
>>>>>>>>> This is a long term issue which needs more careful thinking
>> and
>>>>>> design.
>>>>>>>>> Currently some of features of flink is exposed in
>>>>>>>>> ExecutionEnvironment/StreamExecutionEnvironment, but some are
>>>>> exposed
>>>>>>> in
>>>>>>>>> cli instead of api, like the cancel and savepoint I mentioned
>>>>> above.
>>>>>> I
>>>>>>>>> think the root cause is due to that flink didn't unify the
>>>>>> interaction
>>>>>>>> with
>>>>>>>>> flink. Here I list 3 scenarios of flink operation
>>>>>>>>> 
>>>>>>>>>   - Local job execution.  Flink will create LocalEnvironment
>>> and
>>>>>> then
>>>>>>>> use
>>>>>>>>>   this LocalEnvironment to create LocalExecutor for job
>>>> execution.
>>>>>>>>>   - Remote job execution. Flink will create ClusterClient
>>> first
>>>>> and
>>>>>>> then
>>>>>>>>>   create ContextEnvironment based on the ClusterClient and
>>> then
>>>>> run
>>>>>>> the
>>>>>>>>> job.
>>>>>>>>>   - Job cancelation. Flink will create ClusterClient first
>> and
>>>>> then
>>>>>>>> cancel
>>>>>>>>>   this job via this ClusterClient.
>>>>>>>>> 
>>>>>>>>> As you can see in the above 3 scenarios. Flink didn't use the
>>>> same
>>>>>>>>> approach(code path) to interact with flink
>>>>>>>>> What I propose is following:
>>>>>>>>> Create the proper LocalEnvironment/RemoteEnvironment (based
>> on
>>>> user
>>>>>>>>> configuration) --> Use this Environment to create proper
>>>>>> ClusterClient
>>>>>>>>> (LocalClusterClient or RestClusterClient) to interactive with
>>>>> Flink (
>>>>>>> job
>>>>>>>>> execution or cancelation)
>>>>>>>>> 
>>>>>>>>> This way we can unify the process of local execution and
>> remote
>>>>>>>> execution.
>>>>>>>>> And it is much easier for third party to integrate with
>> flink,
>>>>>> because
>>>>>>>>> ExecutionEnvironment is the unified entry point for flink.
>> What
>>>>> third
>>>>>>>> party
>>>>>>>>> needs to do is just pass configuration to
>> ExecutionEnvironment
>>>> and
>>>>>>>>> ExecutionEnvironment will do the right thing based on the
>>>>>>> configuration.
>>>>>>>>> Flink cli can also be considered as flink api consumer. it
>> just
>>>>> pass
>>>>>>> the
>>>>>>>>> configuration to ExecutionEnvironment and let
>>>> ExecutionEnvironment
>>>>> to
>>>>>>>>> create the proper ClusterClient instead of letting cli to
>>> create
>>>>>>>>> ClusterClient directly.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 6 would involve large code refactoring, so I think we can
>> defer
>>>> it
>>>>>> for
>>>>>>>>> future release, 1,2,3,4,5 could be done at once I believe.
>> Let
>>> me
>>>>>> know
>>>>>>>> your
>>>>>>>>> comments and feedback, thanks
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> Best Regards
>>>>>>>>> 
>>>>>>>>> Jeff Zhang
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> Best Regards
>>>>>>> 
>>>>>>> Jeff Zhang
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> Best Regards
>>>>> 
>>>>> Jeff Zhang
>>>>> 
>>>> 
>>> 
>>> 
>>> --
>>> Best Regards
>>> 
>>> Jeff Zhang
>>> 
>> 
> 
> 
> -- 
> Best Regards
> 
> Jeff Zhang

Reply via email to