Just one note on my side: it is not clear to me whether the client needs to
be able to generate a job graph or not.
In my opinion, the job jar must resides only on the server/jobManager side
and the client requires a way to get the job graph.
If you really want to access to the job graph, I'd add a dedicated method
on the ClusterClient. like:

   - getJobGraph(jarId, mainClass): JobGraph
   - listMainClasses(jarId): List<String>

These would require some addition also on the job manager endpoint as
well..what do you think?

On Wed, Jul 31, 2019 at 12:42 PM Zili Chen <wander4...@gmail.com> wrote:

> Hi all,
>
> Here is a document[1] on client api enhancement from our perspective.
> We have investigated current implementations. And we propose
>
> 1. Unify the implementation of cluster deployment and job submission in
> Flink.
> 2. Provide programmatic interfaces to allow flexible job and cluster
> management.
>
> The first proposal is aimed at reducing code paths of cluster deployment
> and
> job submission so that one can adopt Flink in his usage easily. The second
> proposal is aimed at providing rich interfaces for advanced users
> who want to make accurate control of these stages.
>
> Quick reference on open questions:
>
> 1. Exclude job cluster deployment from client side or redefine the semantic
> of job cluster? Since it fits in a process quite different from session
> cluster deployment and job submission.
>
> 2. Maintain the codepaths handling class o.a.f.api.common.Program or
> implement customized program handling logic by customized CliFrontend?
> See also this thread[2] and the document[1].
>
> 3. Expose ClusterClient as public api or just expose api in
> ExecutionEnvironment
> and delegate them to ClusterClient? Further, in either way is it worth to
> introduce a JobClient which is an encapsulation of ClusterClient that
> associated to specific job?
>
> Best,
> tison.
>
> [1]
>
> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit?usp=sharing
> [2]
>
> https://lists.apache.org/thread.html/7ffc9936a384b891dbcf0a481d26c6d13b2125607c200577780d1e18@%3Cdev.flink.apache.org%3E
>
> Jeff Zhang <zjf...@gmail.com> 于2019年7月24日周三 上午9:19写道:
>
> > Thanks Stephan, I will follow up this issue in next few weeks, and will
> > refine the design doc. We could discuss more details after 1.9 release.
> >
> > Stephan Ewen <se...@apache.org> 于2019年7月24日周三 上午12:58写道:
> >
> > > Hi all!
> > >
> > > This thread has stalled for a bit, which I assume ist mostly due to the
> > > Flink 1.9 feature freeze and release testing effort.
> > >
> > > I personally still recognize this issue as one important to be solved.
> > I'd
> > > be happy to help resume this discussion soon (after the 1.9 release)
> and
> > > see if we can do some step towards this in Flink 1.10.
> > >
> > > Best,
> > > Stephan
> > >
> > >
> > >
> > > On Mon, Jun 24, 2019 at 10:41 AM Flavio Pompermaier <
> > pomperma...@okkam.it>
> > > wrote:
> > >
> > > > That's exactly what I suggested a long time ago: the Flink REST
> client
> > > > should not require any Flink dependency, only http library to call
> the
> > > REST
> > > > services to submit and monitor a job.
> > > > What I suggested also in [1] was to have a way to automatically
> suggest
> > > the
> > > > user (via a UI) the available main classes and their required
> > > > parameters[2].
> > > > Another problem we have with Flink is that the Rest client and the
> CLI
> > > one
> > > > behaves differently and we use the CLI client (via ssh) because it
> > allows
> > > > to call some other method after env.execute() [3] (we have to call
> > > another
> > > > REST service to signal the end of the job).
> > > > Int his regard, a dedicated interface, like the JobListener suggested
> > in
> > > > the previous emails, would be very helpful (IMHO).
> > > >
> > > > [1] https://issues.apache.org/jira/browse/FLINK-10864
> > > > [2] https://issues.apache.org/jira/browse/FLINK-10862
> > > > [3] https://issues.apache.org/jira/browse/FLINK-10879
> > > >
> > > > Best,
> > > > Flavio
> > > >
> > > > On Mon, Jun 24, 2019 at 9:54 AM Jeff Zhang <zjf...@gmail.com> wrote:
> > > >
> > > > > Hi, Tison,
> > > > >
> > > > > Thanks for your comments. Overall I agree with you that it is
> > difficult
> > > > for
> > > > > down stream project to integrate with flink and we need to refactor
> > the
> > > > > current flink client api.
> > > > > And I agree that CliFrontend should only parsing command line
> > arguments
> > > > and
> > > > > then pass them to ExecutionEnvironment. It is
> ExecutionEnvironment's
> > > > > responsibility to compile job, create cluster, and submit job.
> > Besides
> > > > > that, Currently flink has many ExecutionEnvironment
> implementations,
> > > and
> > > > > flink will use the specific one based on the context. IMHO, it is
> not
> > > > > necessary, ExecutionEnvironment should be able to do the right
> thing
> > > > based
> > > > > on the FlinkConf it is received. Too many ExecutionEnvironment
> > > > > implementation is another burden for downstream project
> integration.
> > > > >
> > > > > One thing I'd like to mention is flink's scala shell and sql
> client,
> > > > > although they are sub-modules of flink, they could be treated as
> > > > downstream
> > > > > project which use flink's client api. Currently you will find it is
> > not
> > > > > easy for them to integrate with flink, they share many duplicated
> > code.
> > > > It
> > > > > is another sign that we should refactor flink client api.
> > > > >
> > > > > I believe it is a large and hard change, and I am afraid we can not
> > > keep
> > > > > compatibility since many of changes are user facing.
> > > > >
> > > > >
> > > > >
> > > > > Zili Chen <wander4...@gmail.com> 于2019年6月24日周一 下午2:53写道:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > After a closer look on our client apis, I can see there are two
> > major
> > > > > > issues to consistency and integration, namely different
> deployment
> > of
> > > > > > job cluster which couples job graph creation and cluster
> > deployment,
> > > > > > and submission via CliFrontend confusing control flow of job
> graph
> > > > > > compilation and job submission. I'd like to follow the discuss
> > above,
> > > > > > mainly the process described by Jeff and Stephan, and share my
> > > > > > ideas on these issues.
> > > > > >
> > > > > > 1) CliFrontend confuses the control flow of job compilation and
> > > > > submission.
> > > > > > Following the process of job submission Stephan and Jeff
> described,
> > > > > > execution environment knows all configs of the cluster and
> > > > topos/settings
> > > > > > of the job. Ideally, in the main method of user program, it calls
> > > > > #execute
> > > > > > (or named #submit) and Flink deploys the cluster, compile the job
> > > graph
> > > > > > and submit it to the cluster. However, current CliFrontend does
> all
> > > > these
> > > > > > things inside its #runProgram method, which introduces a lot of
> > > > > subclasses
> > > > > > of (stream) execution environment.
> > > > > >
> > > > > > Actually, it sets up an exec env that hijacks the
> > > #execute/executePlan
> > > > > > method, initializes the job graph and abort execution. And then
> > > > > > control flow back to CliFrontend, it deploys the cluster(or
> > retrieve
> > > > > > the client) and submits the job graph. This is quite a specific
> > > > internal
> > > > > > process inside Flink and none of consistency to anything.
> > > > > >
> > > > > > 2) Deployment of job cluster couples job graph creation and
> cluster
> > > > > > deployment. Abstractly, from user job to a concrete submission,
> it
> > > > > requires
> > > > > >
> > > > > >      create JobGraph --\
> > > > > >
> > > > > > create ClusterClient -->  submit JobGraph
> > > > > >
> > > > > > such a dependency. ClusterClient was created by deploying or
> > > > retrieving.
> > > > > > JobGraph submission requires a compiled JobGraph and valid
> > > > ClusterClient,
> > > > > > but the creation of ClusterClient is abstractly independent of
> that
> > > of
> > > > > > JobGraph. However, in job cluster mode, we deploy job cluster
> with
> > a
> > > > job
> > > > > > graph, which means we use another process:
> > > > > >
> > > > > > create JobGraph --> deploy cluster with the JobGraph
> > > > > >
> > > > > > Here is another inconsistency and downstream projects/client apis
> > are
> > > > > > forced to handle different cases with rare supports from Flink.
> > > > > >
> > > > > > Since we likely reached a consensus on
> > > > > >
> > > > > > 1. all configs gathered by Flink configuration and passed
> > > > > > 2. execution environment knows all configs and handles
> > execution(both
> > > > > > deployment and submission)
> > > > > >
> > > > > > to the issues above I propose eliminating inconsistencies by
> > > following
> > > > > > approach:
> > > > > >
> > > > > > 1) CliFrontend should exactly be a front end, at least for "run"
> > > > command.
> > > > > > That means it just gathered and passed all config from command
> line
> > > to
> > > > > > the main method of user program. Execution environment knows all
> > the
> > > > info
> > > > > > and with an addition to utils for ClusterClient, we gracefully
> get
> > a
> > > > > > ClusterClient by deploying or retrieving. In this way, we don't
> > need
> > > to
> > > > > > hijack #execute/executePlan methods and can remove various
> hacking
> > > > > > subclasses of exec env, as well as #run methods in
> > ClusterClient(for
> > > an
> > > > > > interface-ized ClusterClient). Now the control flow flows from
> > > > > CliFrontend
> > > > > > to the main method and never returns.
> > > > > >
> > > > > > 2) Job cluster means a cluster for the specific job. From another
> > > > > > perspective, it is an ephemeral session. We may decouple the
> > > deployment
> > > > > > with a compiled job graph, but start a session with idle timeout
> > > > > > and submit the job following.
> > > > > >
> > > > > > These topics, before we go into more details on design or
> > > > implementation,
> > > > > > are better to be aware and discussed for a consensus.
> > > > > >
> > > > > > Best,
> > > > > > tison.
> > > > > >
> > > > > >
> > > > > > Zili Chen <wander4...@gmail.com> 于2019年6月20日周四 上午3:21写道:
> > > > > >
> > > > > >> Hi Jeff,
> > > > > >>
> > > > > >> Thanks for raising this thread and the design document!
> > > > > >>
> > > > > >> As @Thomas Weise mentioned above, extending config to flink
> > > > > >> requires far more effort than it should be. Another example
> > > > > >> is we achieve detach mode by introduce another execution
> > > > > >> environment which also hijack #execute method.
> > > > > >>
> > > > > >> I agree with your idea that user would configure all things
> > > > > >> and flink "just" respect it. On this topic I think the unusual
> > > > > >> control flow when CliFrontend handle "run" command is the
> problem.
> > > > > >> It handles several configs, mainly about cluster settings, and
> > > > > >> thus main method of user program is unaware of them. Also it
> > > compiles
> > > > > >> app to job graph by run the main method with a hijacked exec
> env,
> > > > > >> which constrain the main method further.
> > > > > >>
> > > > > >> I'd like to write down a few of notes on configs/args pass and
> > > > respect,
> > > > > >> as well as decoupling job compilation and submission. Share on
> > this
> > > > > >> thread later.
> > > > > >>
> > > > > >> Best,
> > > > > >> tison.
> > > > > >>
> > > > > >>
> > > > > >> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月17日周一 下午7:29写道:
> > > > > >>
> > > > > >>> Hi Jeff and Flavio,
> > > > > >>>
> > > > > >>> Thanks Jeff a lot for proposing the design document.
> > > > > >>>
> > > > > >>> We are also working on refactoring ClusterClient to allow
> > flexible
> > > > and
> > > > > >>> efficient job management in our real-time platform.
> > > > > >>> We would like to draft a document to share our ideas with you.
> > > > > >>>
> > > > > >>> I think it's a good idea to have something like Apache Livy for
> > > > Flink,
> > > > > >>> and
> > > > > >>> the efforts discussed here will take a great step forward to
> it.
> > > > > >>>
> > > > > >>> Regards,
> > > > > >>> Xiaogang
> > > > > >>>
> > > > > >>> Flavio Pompermaier <pomperma...@okkam.it> 于2019年6月17日周一
> > 下午7:13写道:
> > > > > >>>
> > > > > >>> > Is there any possibility to have something like Apache Livy
> [1]
> > > > also
> > > > > >>> for
> > > > > >>> > Flink in the future?
> > > > > >>> >
> > > > > >>> > [1] https://livy.apache.org/
> > > > > >>> >
> > > > > >>> > On Tue, Jun 11, 2019 at 5:23 PM Jeff Zhang <zjf...@gmail.com
> >
> > > > wrote:
> > > > > >>> >
> > > > > >>> > > >>>  Any API we expose should not have dependencies on the
> > > > runtime
> > > > > >>> > > (flink-runtime) package or other implementation details. To
> > me,
> > > > > this
> > > > > >>> > means
> > > > > >>> > > that the current ClusterClient cannot be exposed to users
> > > because
> > > > > it
> > > > > >>> >  uses
> > > > > >>> > > quite some classes from the optimiser and runtime packages.
> > > > > >>> > >
> > > > > >>> > > We should change ClusterClient from class to interface.
> > > > > >>> > > ExecutionEnvironment only use the interface ClusterClient
> > which
> > > > > >>> should be
> > > > > >>> > > in flink-clients while the concrete implementation class
> > could
> > > be
> > > > > in
> > > > > >>> > > flink-runtime.
> > > > > >>> > >
> > > > > >>> > > >>> What happens when a failure/restart in the client
> > happens?
> > > > > There
> > > > > >>> need
> > > > > >>> > > to be a way of re-establishing the connection to the job,
> set
> > > up
> > > > > the
> > > > > >>> > > listeners again, etc.
> > > > > >>> > >
> > > > > >>> > > Good point.  First we need to define what does
> > failure/restart
> > > in
> > > > > the
> > > > > >>> > > client mean. IIUC, that usually mean network failure which
> > will
> > > > > >>> happen in
> > > > > >>> > > class RestClient. If my understanding is correct,
> > restart/retry
> > > > > >>> mechanism
> > > > > >>> > > should be done in RestClient.
> > > > > >>> > >
> > > > > >>> > >
> > > > > >>> > >
> > > > > >>> > >
> > > > > >>> > >
> > > > > >>> > > Aljoscha Krettek <aljos...@apache.org> 于2019年6月11日周二
> > > 下午11:10写道:
> > > > > >>> > >
> > > > > >>> > > > Some points to consider:
> > > > > >>> > > >
> > > > > >>> > > > * Any API we expose should not have dependencies on the
> > > runtime
> > > > > >>> > > > (flink-runtime) package or other implementation details.
> To
> > > me,
> > > > > >>> this
> > > > > >>> > > means
> > > > > >>> > > > that the current ClusterClient cannot be exposed to users
> > > > because
> > > > > >>> it
> > > > > >>> > >  uses
> > > > > >>> > > > quite some classes from the optimiser and runtime
> packages.
> > > > > >>> > > >
> > > > > >>> > > > * What happens when a failure/restart in the client
> > happens?
> > > > > There
> > > > > >>> need
> > > > > >>> > > to
> > > > > >>> > > > be a way of re-establishing the connection to the job,
> set
> > up
> > > > the
> > > > > >>> > > listeners
> > > > > >>> > > > again, etc.
> > > > > >>> > > >
> > > > > >>> > > > Aljoscha
> > > > > >>> > > >
> > > > > >>> > > > > On 29. May 2019, at 10:17, Jeff Zhang <
> zjf...@gmail.com>
> > > > > wrote:
> > > > > >>> > > > >
> > > > > >>> > > > > Sorry folks, the design doc is late as you expected.
> > Here's
> > > > the
> > > > > >>> > design
> > > > > >>> > > > doc
> > > > > >>> > > > > I drafted, welcome any comments and feedback.
> > > > > >>> > > > >
> > > > > >>> > > > >
> > > > > >>> > > >
> > > > > >>> > >
> > > > > >>> >
> > > > > >>>
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing
> > > > > >>> > > > >
> > > > > >>> > > > >
> > > > > >>> > > > >
> > > > > >>> > > > > Stephan Ewen <se...@apache.org> 于2019年2月14日周四
> 下午8:43写道:
> > > > > >>> > > > >
> > > > > >>> > > > >> Nice that this discussion is happening.
> > > > > >>> > > > >>
> > > > > >>> > > > >> In the FLIP, we could also revisit the entire role of
> > the
> > > > > >>> > environments
> > > > > >>> > > > >> again.
> > > > > >>> > > > >>
> > > > > >>> > > > >> Initially, the idea was:
> > > > > >>> > > > >>  - the environments take care of the specific setup
> for
> > > > > >>> standalone
> > > > > >>> > (no
> > > > > >>> > > > >> setup needed), yarn, mesos, etc.
> > > > > >>> > > > >>  - the session ones have control over the session. The
> > > > > >>> environment
> > > > > >>> > > holds
> > > > > >>> > > > >> the session client.
> > > > > >>> > > > >>  - running a job gives a "control" object for that
> job.
> > > That
> > > > > >>> > behavior
> > > > > >>> > > is
> > > > > >>> > > > >> the same in all environments.
> > > > > >>> > > > >>
> > > > > >>> > > > >> The actual implementation diverged quite a bit from
> > that.
> > > > > Happy
> > > > > >>> to
> > > > > >>> > > see a
> > > > > >>> > > > >> discussion about straitening this out a bit more.
> > > > > >>> > > > >>
> > > > > >>> > > > >>
> > > > > >>> > > > >> On Tue, Feb 12, 2019 at 4:58 AM Jeff Zhang <
> > > > zjf...@gmail.com>
> > > > > >>> > wrote:
> > > > > >>> > > > >>
> > > > > >>> > > > >>> Hi folks,
> > > > > >>> > > > >>>
> > > > > >>> > > > >>> Sorry for late response, It seems we reach consensus
> on
> > > > > this, I
> > > > > >>> > will
> > > > > >>> > > > >> create
> > > > > >>> > > > >>> FLIP for this with more detailed design
> > > > > >>> > > > >>>
> > > > > >>> > > > >>>
> > > > > >>> > > > >>> Thomas Weise <t...@apache.org> 于2018年12月21日周五
> > 上午11:43写道:
> > > > > >>> > > > >>>
> > > > > >>> > > > >>>> Great to see this discussion seeded! The problems
> you
> > > face
> > > > > >>> with
> > > > > >>> > the
> > > > > >>> > > > >>>> Zeppelin integration are also affecting other
> > downstream
> > > > > >>> projects,
> > > > > >>> > > > like
> > > > > >>> > > > >>>> Beam.
> > > > > >>> > > > >>>>
> > > > > >>> > > > >>>> We just enabled the savepoint restore option in
> > > > > >>> > > > RemoteStreamEnvironment
> > > > > >>> > > > >>> [1]
> > > > > >>> > > > >>>> and that was more difficult than it should be. The
> > main
> > > > > issue
> > > > > >>> is
> > > > > >>> > > that
> > > > > >>> > > > >>>> environment and cluster client aren't decoupled.
> > Ideally
> > > > it
> > > > > >>> should
> > > > > >>> > > be
> > > > > >>> > > > >>>> possible to just get the matching cluster client
> from
> > > the
> > > > > >>> > > environment
> > > > > >>> > > > >> and
> > > > > >>> > > > >>>> then control the job through it (environment as
> > factory
> > > > for
> > > > > >>> > cluster
> > > > > >>> > > > >>>> client). But note that the environment classes are
> > part
> > > of
> > > > > the
> > > > > >>> > > public
> > > > > >>> > > > >>> API,
> > > > > >>> > > > >>>> and it is not straightforward to make larger changes
> > > > without
> > > > > >>> > > breaking
> > > > > >>> > > > >>>> backward compatibility.
> > > > > >>> > > > >>>>
> > > > > >>> > > > >>>> ClusterClient currently exposes internal classes
> like
> > > > > >>> JobGraph and
> > > > > >>> > > > >>>> StreamGraph. But it should be possible to wrap this
> > > with a
> > > > > new
> > > > > >>> > > public
> > > > > >>> > > > >> API
> > > > > >>> > > > >>>> that brings the required job control capabilities
> for
> > > > > >>> downstream
> > > > > >>> > > > >>> projects.
> > > > > >>> > > > >>>> Perhaps it is helpful to look at some of the
> > interfaces
> > > in
> > > > > >>> Beam
> > > > > >>> > > while
> > > > > >>> > > > >>>> thinking about this: [2] for the portable job API
> and
> > > [3]
> > > > > for
> > > > > >>> the
> > > > > >>> > > old
> > > > > >>> > > > >>>> asynchronous job control from the Beam Java SDK.
> > > > > >>> > > > >>>>
> > > > > >>> > > > >>>> The backward compatibility discussion [4] is also
> > > relevant
> > > > > >>> here. A
> > > > > >>> > > new
> > > > > >>> > > > >>> API
> > > > > >>> > > > >>>> should shield downstream projects from internals and
> > > allow
> > > > > >>> them to
> > > > > >>> > > > >>>> interoperate with multiple future Flink versions in
> > the
> > > > same
> > > > > >>> > release
> > > > > >>> > > > >> line
> > > > > >>> > > > >>>> without forced upgrades.
> > > > > >>> > > > >>>>
> > > > > >>> > > > >>>> Thanks,
> > > > > >>> > > > >>>> Thomas
> > > > > >>> > > > >>>>
> > > > > >>> > > > >>>> [1] https://github.com/apache/flink/pull/7249
> > > > > >>> > > > >>>> [2]
> > > > > >>> > > > >>>>
> > > > > >>> > > > >>>>
> > > > > >>> > > > >>>
> > > > > >>> > > > >>
> > > > > >>> > > >
> > > > > >>> > >
> > > > > >>> >
> > > > > >>>
> > > > >
> > > >
> > >
> >
> https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto
> > > > > >>> > > > >>>> [3]
> > > > > >>> > > > >>>>
> > > > > >>> > > > >>>>
> > > > > >>> > > > >>>
> > > > > >>> > > > >>
> > > > > >>> > > >
> > > > > >>> > >
> > > > > >>> >
> > > > > >>>
> > > > >
> > > >
> > >
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java
> > > > > >>> > > > >>>> [4]
> > > > > >>> > > > >>>>
> > > > > >>> > > > >>>>
> > > > > >>> > > > >>>
> > > > > >>> > > > >>
> > > > > >>> > > >
> > > > > >>> > >
> > > > > >>> >
> > > > > >>>
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/064c75c5d10f0806095b14f6d76942598917a14429c1acbddd151fe2@%3Cdev.flink.apache.org%3E
> > > > > >>> > > > >>>>
> > > > > >>> > > > >>>>
> > > > > >>> > > > >>>> On Thu, Dec 20, 2018 at 6:15 PM Jeff Zhang <
> > > > > zjf...@gmail.com>
> > > > > >>> > > wrote:
> > > > > >>> > > > >>>>
> > > > > >>> > > > >>>>>>>> I'm not so sure whether the user should be able
> to
> > > > > define
> > > > > >>> > where
> > > > > >>> > > > >> the
> > > > > >>> > > > >>>> job
> > > > > >>> > > > >>>>> runs (in your example Yarn). This is actually
> > > independent
> > > > > of
> > > > > >>> the
> > > > > >>> > > job
> > > > > >>> > > > >>>>> development and is something which is decided at
> > > > deployment
> > > > > >>> time.
> > > > > >>> > > > >>>>>
> > > > > >>> > > > >>>>> User don't need to specify execution mode
> > > > programmatically.
> > > > > >>> They
> > > > > >>> > > can
> > > > > >>> > > > >>> also
> > > > > >>> > > > >>>>> pass the execution mode from the arguments in flink
> > run
> > > > > >>> command.
> > > > > >>> > > e.g.
> > > > > >>> > > > >>>>>
> > > > > >>> > > > >>>>> bin/flink run -m yarn-cluster ....
> > > > > >>> > > > >>>>> bin/flink run -m local ...
> > > > > >>> > > > >>>>> bin/flink run -m host:port ...
> > > > > >>> > > > >>>>>
> > > > > >>> > > > >>>>> Does this make sense to you ?
> > > > > >>> > > > >>>>>
> > > > > >>> > > > >>>>>>>> To me it makes sense that the
> ExecutionEnvironment
> > > is
> > > > > not
> > > > > >>> > > > >> directly
> > > > > >>> > > > >>>>> initialized by the user and instead context
> sensitive
> > > how
> > > > > you
> > > > > >>> > want
> > > > > >>> > > to
> > > > > >>> > > > >>>>> execute your job (Flink CLI vs. IDE, for example).
> > > > > >>> > > > >>>>>
> > > > > >>> > > > >>>>> Right, currently I notice Flink would create
> > different
> > > > > >>> > > > >>>>> ContextExecutionEnvironment based on different
> > > submission
> > > > > >>> > scenarios
> > > > > >>> > > > >>>> (Flink
> > > > > >>> > > > >>>>> Cli vs IDE). To me this is kind of hack approach,
> not
> > > so
> > > > > >>> > > > >>> straightforward.
> > > > > >>> > > > >>>>> What I suggested above is that is that flink should
> > > > always
> > > > > >>> create
> > > > > >>> > > the
> > > > > >>> > > > >>>> same
> > > > > >>> > > > >>>>> ExecutionEnvironment but with different
> > configuration,
> > > > and
> > > > > >>> based
> > > > > >>> > on
> > > > > >>> > > > >> the
> > > > > >>> > > > >>>>> configuration it would create the proper
> > ClusterClient
> > > > for
> > > > > >>> > > different
> > > > > >>> > > > >>>>> behaviors.
> > > > > >>> > > > >>>>>
> > > > > >>> > > > >>>>>
> > > > > >>> > > > >>>>>
> > > > > >>> > > > >>>>>
> > > > > >>> > > > >>>>>
> > > > > >>> > > > >>>>>
> > > > > >>> > > > >>>>>
> > > > > >>> > > > >>>>> Till Rohrmann <trohrm...@apache.org>
> 于2018年12月20日周四
> > > > > >>> 下午11:18写道:
> > > > > >>> > > > >>>>>
> > > > > >>> > > > >>>>>> You are probably right that we have code
> duplication
> > > > when
> > > > > it
> > > > > >>> > comes
> > > > > >>> > > > >> to
> > > > > >>> > > > >>>> the
> > > > > >>> > > > >>>>>> creation of the ClusterClient. This should be
> > reduced
> > > in
> > > > > the
> > > > > >>> > > > >> future.
> > > > > >>> > > > >>>>>>
> > > > > >>> > > > >>>>>> I'm not so sure whether the user should be able to
> > > > define
> > > > > >>> where
> > > > > >>> > > the
> > > > > >>> > > > >>> job
> > > > > >>> > > > >>>>>> runs (in your example Yarn). This is actually
> > > > independent
> > > > > >>> of the
> > > > > >>> > > > >> job
> > > > > >>> > > > >>>>>> development and is something which is decided at
> > > > > deployment
> > > > > >>> > time.
> > > > > >>> > > > >> To
> > > > > >>> > > > >>> me
> > > > > >>> > > > >>>>> it
> > > > > >>> > > > >>>>>> makes sense that the ExecutionEnvironment is not
> > > > directly
> > > > > >>> > > > >> initialized
> > > > > >>> > > > >>>> by
> > > > > >>> > > > >>>>>> the user and instead context sensitive how you
> want
> > to
> > > > > >>> execute
> > > > > >>> > > your
> > > > > >>> > > > >>> job
> > > > > >>> > > > >>>>>> (Flink CLI vs. IDE, for example). However, I agree
> > > that
> > > > > the
> > > > > >>> > > > >>>>>> ExecutionEnvironment should give you access to the
> > > > > >>> ClusterClient
> > > > > >>> > > > >> and
> > > > > >>> > > > >>> to
> > > > > >>> > > > >>>>> the
> > > > > >>> > > > >>>>>> job (maybe in the form of the JobGraph or a job
> > plan).
> > > > > >>> > > > >>>>>>
> > > > > >>> > > > >>>>>> Cheers,
> > > > > >>> > > > >>>>>> Till
> > > > > >>> > > > >>>>>>
> > > > > >>> > > > >>>>>> On Thu, Dec 13, 2018 at 4:36 AM Jeff Zhang <
> > > > > >>> zjf...@gmail.com>
> > > > > >>> > > > >> wrote:
> > > > > >>> > > > >>>>>>
> > > > > >>> > > > >>>>>>> Hi Till,
> > > > > >>> > > > >>>>>>> Thanks for the feedback. You are right that I
> > expect
> > > > > better
> > > > > >>> > > > >>>>> programmatic
> > > > > >>> > > > >>>>>>> job submission/control api which could be used by
> > > > > >>> downstream
> > > > > >>> > > > >>> project.
> > > > > >>> > > > >>>>> And
> > > > > >>> > > > >>>>>>> it would benefit for the flink ecosystem. When I
> > look
> > > > at
> > > > > >>> the
> > > > > >>> > code
> > > > > >>> > > > >>> of
> > > > > >>> > > > >>>>>> flink
> > > > > >>> > > > >>>>>>> scala-shell and sql-client (I believe they are
> not
> > > the
> > > > > >>> core of
> > > > > >>> > > > >>> flink,
> > > > > >>> > > > >>>>> but
> > > > > >>> > > > >>>>>>> belong to the ecosystem of flink), I find many
> > > > duplicated
> > > > > >>> code
> > > > > >>> > > > >> for
> > > > > >>> > > > >>>>>> creating
> > > > > >>> > > > >>>>>>> ClusterClient from user provided configuration
> > > > > >>> (configuration
> > > > > >>> > > > >>> format
> > > > > >>> > > > >>>>> may
> > > > > >>> > > > >>>>>> be
> > > > > >>> > > > >>>>>>> different from scala-shell and sql-client) and
> then
> > > use
> > > > > >>> that
> > > > > >>> > > > >>>>>> ClusterClient
> > > > > >>> > > > >>>>>>> to manipulate jobs. I don't think this is
> > convenient
> > > > for
> > > > > >>> > > > >> downstream
> > > > > >>> > > > >>>>>>> projects. What I expect is that downstream
> project
> > > only
> > > > > >>> needs
> > > > > >>> > to
> > > > > >>> > > > >>>>> provide
> > > > > >>> > > > >>>>>>> necessary configuration info (maybe introducing
> > class
> > > > > >>> > FlinkConf),
> > > > > >>> > > > >>> and
> > > > > >>> > > > >>>>>> then
> > > > > >>> > > > >>>>>>> build ExecutionEnvironment based on this
> FlinkConf,
> > > and
> > > > > >>> > > > >>>>>>> ExecutionEnvironment will create the proper
> > > > > ClusterClient.
> > > > > >>> It
> > > > > >>> > not
> > > > > >>> > > > >>>> only
> > > > > >>> > > > >>>>>>> benefit for the downstream project development
> but
> > > also
> > > > > be
> > > > > >>> > > > >> helpful
> > > > > >>> > > > >>>> for
> > > > > >>> > > > >>>>>>> their integration test with flink. Here's one
> > sample
> > > > code
> > > > > >>> > snippet
> > > > > >>> > > > >>>> that
> > > > > >>> > > > >>>>> I
> > > > > >>> > > > >>>>>>> expect.
> > > > > >>> > > > >>>>>>>
> > > > > >>> > > > >>>>>>> val conf = new FlinkConf().mode("yarn")
> > > > > >>> > > > >>>>>>> val env = new ExecutionEnvironment(conf)
> > > > > >>> > > > >>>>>>> val jobId = env.submit(...)
> > > > > >>> > > > >>>>>>> val jobStatus =
> > > > > >>> env.getClusterClient().queryJobStatus(jobId)
> > > > > >>> > > > >>>>>>> env.getClusterClient().cancelJob(jobId)
> > > > > >>> > > > >>>>>>>
> > > > > >>> > > > >>>>>>> What do you think ?
> > > > > >>> > > > >>>>>>>
> > > > > >>> > > > >>>>>>>
> > > > > >>> > > > >>>>>>>
> > > > > >>> > > > >>>>>>>
> > > > > >>> > > > >>>>>>> Till Rohrmann <trohrm...@apache.org>
> > 于2018年12月11日周二
> > > > > >>> 下午6:28写道:
> > > > > >>> > > > >>>>>>>
> > > > > >>> > > > >>>>>>>> Hi Jeff,
> > > > > >>> > > > >>>>>>>>
> > > > > >>> > > > >>>>>>>> what you are proposing is to provide the user
> with
> > > > > better
> > > > > >>> > > > >>>>> programmatic
> > > > > >>> > > > >>>>>>> job
> > > > > >>> > > > >>>>>>>> control. There was actually an effort to achieve
> > > this
> > > > > but
> > > > > >>> it
> > > > > >>> > > > >> has
> > > > > >>> > > > >>>>> never
> > > > > >>> > > > >>>>>>> been
> > > > > >>> > > > >>>>>>>> completed [1]. However, there are some
> improvement
> > > in
> > > > > the
> > > > > >>> code
> > > > > >>> > > > >>> base
> > > > > >>> > > > >>>>>> now.
> > > > > >>> > > > >>>>>>>> Look for example at the NewClusterClient
> interface
> > > > which
> > > > > >>> > > > >> offers a
> > > > > >>> > > > >>>>>>>> non-blocking job submission. But I agree that we
> > > need
> > > > to
> > > > > >>> > > > >> improve
> > > > > >>> > > > >>>>> Flink
> > > > > >>> > > > >>>>>> in
> > > > > >>> > > > >>>>>>>> this regard.
> > > > > >>> > > > >>>>>>>>
> > > > > >>> > > > >>>>>>>> I would not be in favour if exposing all
> > > ClusterClient
> > > > > >>> calls
> > > > > >>> > > > >> via
> > > > > >>> > > > >>>> the
> > > > > >>> > > > >>>>>>>> ExecutionEnvironment because it would clutter
> the
> > > > class
> > > > > >>> and
> > > > > >>> > > > >> would
> > > > > >>> > > > >>>> not
> > > > > >>> > > > >>>>>> be
> > > > > >>> > > > >>>>>>> a
> > > > > >>> > > > >>>>>>>> good separation of concerns. Instead one idea
> > could
> > > be
> > > > > to
> > > > > >>> > > > >>> retrieve
> > > > > >>> > > > >>>>> the
> > > > > >>> > > > >>>>>>>> current ClusterClient from the
> > ExecutionEnvironment
> > > > > which
> > > > > >>> can
> > > > > >>> > > > >>> then
> > > > > >>> > > > >>>> be
> > > > > >>> > > > >>>>>>> used
> > > > > >>> > > > >>>>>>>> for cluster and job control. But before we start
> > an
> > > > > effort
> > > > > >>> > > > >> here,
> > > > > >>> > > > >>> we
> > > > > >>> > > > >>>>>> need
> > > > > >>> > > > >>>>>>> to
> > > > > >>> > > > >>>>>>>> agree and capture what functionality we want to
> > > > provide.
> > > > > >>> > > > >>>>>>>>
> > > > > >>> > > > >>>>>>>> Initially, the idea was that we have the
> > > > > ClusterDescriptor
> > > > > >>> > > > >>>> describing
> > > > > >>> > > > >>>>>> how
> > > > > >>> > > > >>>>>>>> to talk to cluster manager like Yarn or Mesos.
> The
> > > > > >>> > > > >>>> ClusterDescriptor
> > > > > >>> > > > >>>>>> can
> > > > > >>> > > > >>>>>>> be
> > > > > >>> > > > >>>>>>>> used for deploying Flink clusters (job and
> > session)
> > > > and
> > > > > >>> gives
> > > > > >>> > > > >>> you a
> > > > > >>> > > > >>>>>>>> ClusterClient. The ClusterClient controls the
> > > cluster
> > > > > >>> (e.g.
> > > > > >>> > > > >>>>> submitting
> > > > > >>> > > > >>>>>>>> jobs, listing all running jobs). And then there
> > was
> > > > the
> > > > > >>> idea
> > > > > >>> > to
> > > > > >>> > > > >>>>>>> introduce a
> > > > > >>> > > > >>>>>>>> JobClient which you obtain from the
> ClusterClient
> > to
> > > > > >>> trigger
> > > > > >>> > > > >> job
> > > > > >>> > > > >>>>>> specific
> > > > > >>> > > > >>>>>>>> operations (e.g. taking a savepoint, cancelling
> > the
> > > > > job).
> > > > > >>> > > > >>>>>>>>
> > > > > >>> > > > >>>>>>>> [1]
> > > https://issues.apache.org/jira/browse/FLINK-4272
> > > > > >>> > > > >>>>>>>>
> > > > > >>> > > > >>>>>>>> Cheers,
> > > > > >>> > > > >>>>>>>> Till
> > > > > >>> > > > >>>>>>>>
> > > > > >>> > > > >>>>>>>> On Tue, Dec 11, 2018 at 10:13 AM Jeff Zhang <
> > > > > >>> zjf...@gmail.com
> > > > > >>> > >
> > > > > >>> > > > >>>>> wrote:
> > > > > >>> > > > >>>>>>>>
> > > > > >>> > > > >>>>>>>>> Hi Folks,
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>> I am trying to integrate flink into apache
> > zeppelin
> > > > > >>> which is
> > > > > >>> > > > >> an
> > > > > >>> > > > >>>>>>>> interactive
> > > > > >>> > > > >>>>>>>>> notebook. And I hit several issues that is
> caused
> > > by
> > > > > >>> flink
> > > > > >>> > > > >>> client
> > > > > >>> > > > >>>>>> api.
> > > > > >>> > > > >>>>>>> So
> > > > > >>> > > > >>>>>>>>> I'd like to proposal the following changes for
> > > flink
> > > > > >>> client
> > > > > >>> > > > >>> api.
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>> 1. Support nonblocking execution. Currently,
> > > > > >>> > > > >>>>>>> ExecutionEnvironment#execute
> > > > > >>> > > > >>>>>>>>> is a blocking method which would do 2 things,
> > first
> > > > > >>> submit
> > > > > >>> > > > >> job
> > > > > >>> > > > >>>> and
> > > > > >>> > > > >>>>>> then
> > > > > >>> > > > >>>>>>>>> wait for job until it is finished. I'd like
> > > > introduce a
> > > > > >>> > > > >>>> nonblocking
> > > > > >>> > > > >>>>>>>>> execution method like
> ExecutionEnvironment#submit
> > > > which
> > > > > >>> only
> > > > > >>> > > > >>>> submit
> > > > > >>> > > > >>>>>> job
> > > > > >>> > > > >>>>>>>> and
> > > > > >>> > > > >>>>>>>>> then return jobId to client. And allow user to
> > > query
> > > > > the
> > > > > >>> job
> > > > > >>> > > > >>>> status
> > > > > >>> > > > >>>>>> via
> > > > > >>> > > > >>>>>>>> the
> > > > > >>> > > > >>>>>>>>> jobId.
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>> 2. Add cancel api in
> > > > > >>> > > > >>>>> ExecutionEnvironment/StreamExecutionEnvironment,
> > > > > >>> > > > >>>>>>>>> currently the only way to cancel job is via cli
> > > > > >>> (bin/flink),
> > > > > >>> > > > >>> this
> > > > > >>> > > > >>>>> is
> > > > > >>> > > > >>>>>>> not
> > > > > >>> > > > >>>>>>>>> convenient for downstream project to use this
> > > > feature.
> > > > > >>> So I'd
> > > > > >>> > > > >>>> like
> > > > > >>> > > > >>>>> to
> > > > > >>> > > > >>>>>>> add
> > > > > >>> > > > >>>>>>>>> cancel api in ExecutionEnvironment
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>> 3. Add savepoint api in
> > > > > >>> > > > >>>>>>> ExecutionEnvironment/StreamExecutionEnvironment.
> > > > > >>> > > > >>>>>>>> It
> > > > > >>> > > > >>>>>>>>> is similar as cancel api, we should use
> > > > > >>> ExecutionEnvironment
> > > > > >>> > > > >> as
> > > > > >>> > > > >>>> the
> > > > > >>> > > > >>>>>>>> unified
> > > > > >>> > > > >>>>>>>>> api for third party to integrate with flink.
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>> 4. Add listener for job execution lifecycle.
> > > > Something
> > > > > >>> like
> > > > > >>> > > > >>>>>> following,
> > > > > >>> > > > >>>>>>> so
> > > > > >>> > > > >>>>>>>>> that downstream project can do custom logic in
> > the
> > > > > >>> lifecycle
> > > > > >>> > > > >> of
> > > > > >>> > > > >>>>> job.
> > > > > >>> > > > >>>>>>> e.g.
> > > > > >>> > > > >>>>>>>>> Zeppelin would capture the jobId after job is
> > > > submitted
> > > > > >>> and
> > > > > >>> > > > >>> then
> > > > > >>> > > > >>>>> use
> > > > > >>> > > > >>>>>>> this
> > > > > >>> > > > >>>>>>>>> jobId to cancel it later when necessary.
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>> public interface JobListener {
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>>   void onJobSubmitted(JobID jobId);
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>>   void onJobExecuted(JobExecutionResult
> > jobResult);
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>>   void onJobCanceled(JobID jobId);
> > > > > >>> > > > >>>>>>>>> }
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>> 5. Enable session in ExecutionEnvironment.
> > > Currently
> > > > it
> > > > > >>> is
> > > > > >>> > > > >>>>> disabled,
> > > > > >>> > > > >>>>>>> but
> > > > > >>> > > > >>>>>>>>> session is very convenient for third party to
> > > > > submitting
> > > > > >>> jobs
> > > > > >>> > > > >>>>>>>> continually.
> > > > > >>> > > > >>>>>>>>> I hope flink can enable it again.
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>> 6. Unify all flink client api into
> > > > > >>> > > > >>>>>>>>>
> ExecutionEnvironment/StreamExecutionEnvironment.
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>> This is a long term issue which needs more
> > careful
> > > > > >>> thinking
> > > > > >>> > > > >> and
> > > > > >>> > > > >>>>>> design.
> > > > > >>> > > > >>>>>>>>> Currently some of features of flink is exposed
> in
> > > > > >>> > > > >>>>>>>>>
> ExecutionEnvironment/StreamExecutionEnvironment,
> > > but
> > > > > >>> some are
> > > > > >>> > > > >>>>> exposed
> > > > > >>> > > > >>>>>>> in
> > > > > >>> > > > >>>>>>>>> cli instead of api, like the cancel and
> > savepoint I
> > > > > >>> mentioned
> > > > > >>> > > > >>>>> above.
> > > > > >>> > > > >>>>>> I
> > > > > >>> > > > >>>>>>>>> think the root cause is due to that flink
> didn't
> > > > unify
> > > > > >>> the
> > > > > >>> > > > >>>>>> interaction
> > > > > >>> > > > >>>>>>>> with
> > > > > >>> > > > >>>>>>>>> flink. Here I list 3 scenarios of flink
> operation
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>>   - Local job execution.  Flink will create
> > > > > >>> LocalEnvironment
> > > > > >>> > > > >>> and
> > > > > >>> > > > >>>>>> then
> > > > > >>> > > > >>>>>>>> use
> > > > > >>> > > > >>>>>>>>>   this LocalEnvironment to create LocalExecutor
> > for
> > > > job
> > > > > >>> > > > >>>> execution.
> > > > > >>> > > > >>>>>>>>>   - Remote job execution. Flink will create
> > > > > ClusterClient
> > > > > >>> > > > >>> first
> > > > > >>> > > > >>>>> and
> > > > > >>> > > > >>>>>>> then
> > > > > >>> > > > >>>>>>>>>   create ContextEnvironment based on the
> > > > ClusterClient
> > > > > >>> and
> > > > > >>> > > > >>> then
> > > > > >>> > > > >>>>> run
> > > > > >>> > > > >>>>>>> the
> > > > > >>> > > > >>>>>>>>> job.
> > > > > >>> > > > >>>>>>>>>   - Job cancelation. Flink will create
> > > ClusterClient
> > > > > >>> first
> > > > > >>> > > > >> and
> > > > > >>> > > > >>>>> then
> > > > > >>> > > > >>>>>>>> cancel
> > > > > >>> > > > >>>>>>>>>   this job via this ClusterClient.
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>> As you can see in the above 3 scenarios. Flink
> > > didn't
> > > > > >>> use the
> > > > > >>> > > > >>>> same
> > > > > >>> > > > >>>>>>>>> approach(code path) to interact with flink
> > > > > >>> > > > >>>>>>>>> What I propose is following:
> > > > > >>> > > > >>>>>>>>> Create the proper
> > > LocalEnvironment/RemoteEnvironment
> > > > > >>> (based
> > > > > >>> > > > >> on
> > > > > >>> > > > >>>> user
> > > > > >>> > > > >>>>>>>>> configuration) --> Use this Environment to
> create
> > > > > proper
> > > > > >>> > > > >>>>>> ClusterClient
> > > > > >>> > > > >>>>>>>>> (LocalClusterClient or RestClusterClient) to
> > > > > interactive
> > > > > >>> with
> > > > > >>> > > > >>>>> Flink (
> > > > > >>> > > > >>>>>>> job
> > > > > >>> > > > >>>>>>>>> execution or cancelation)
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>> This way we can unify the process of local
> > > execution
> > > > > and
> > > > > >>> > > > >> remote
> > > > > >>> > > > >>>>>>>> execution.
> > > > > >>> > > > >>>>>>>>> And it is much easier for third party to
> > integrate
> > > > with
> > > > > >>> > > > >> flink,
> > > > > >>> > > > >>>>>> because
> > > > > >>> > > > >>>>>>>>> ExecutionEnvironment is the unified entry point
> > for
> > > > > >>> flink.
> > > > > >>> > > > >> What
> > > > > >>> > > > >>>>> third
> > > > > >>> > > > >>>>>>>> party
> > > > > >>> > > > >>>>>>>>> needs to do is just pass configuration to
> > > > > >>> > > > >> ExecutionEnvironment
> > > > > >>> > > > >>>> and
> > > > > >>> > > > >>>>>>>>> ExecutionEnvironment will do the right thing
> > based
> > > on
> > > > > the
> > > > > >>> > > > >>>>>>> configuration.
> > > > > >>> > > > >>>>>>>>> Flink cli can also be considered as flink api
> > > > consumer.
> > > > > >>> it
> > > > > >>> > > > >> just
> > > > > >>> > > > >>>>> pass
> > > > > >>> > > > >>>>>>> the
> > > > > >>> > > > >>>>>>>>> configuration to ExecutionEnvironment and let
> > > > > >>> > > > >>>> ExecutionEnvironment
> > > > > >>> > > > >>>>> to
> > > > > >>> > > > >>>>>>>>> create the proper ClusterClient instead of
> > letting
> > > > cli
> > > > > to
> > > > > >>> > > > >>> create
> > > > > >>> > > > >>>>>>>>> ClusterClient directly.
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>> 6 would involve large code refactoring, so I
> > think
> > > we
> > > > > can
> > > > > >>> > > > >> defer
> > > > > >>> > > > >>>> it
> > > > > >>> > > > >>>>>> for
> > > > > >>> > > > >>>>>>>>> future release, 1,2,3,4,5 could be done at
> once I
> > > > > >>> believe.
> > > > > >>> > > > >> Let
> > > > > >>> > > > >>> me
> > > > > >>> > > > >>>>>> know
> > > > > >>> > > > >>>>>>>> your
> > > > > >>> > > > >>>>>>>>> comments and feedback, thanks
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>> --
> > > > > >>> > > > >>>>>>>>> Best Regards
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>> Jeff Zhang
> > > > > >>> > > > >>>>>>>>>
> > > > > >>> > > > >>>>>>>>
> > > > > >>> > > > >>>>>>>
> > > > > >>> > > > >>>>>>>
> > > > > >>> > > > >>>>>>> --
> > > > > >>> > > > >>>>>>> Best Regards
> > > > > >>> > > > >>>>>>>
> > > > > >>> > > > >>>>>>> Jeff Zhang
> > > > > >>> > > > >>>>>>>
> > > > > >>> > > > >>>>>>
> > > > > >>> > > > >>>>>
> > > > > >>> > > > >>>>>
> > > > > >>> > > > >>>>> --
> > > > > >>> > > > >>>>> Best Regards
> > > > > >>> > > > >>>>>
> > > > > >>> > > > >>>>> Jeff Zhang
> > > > > >>> > > > >>>>>
> > > > > >>> > > > >>>>
> > > > > >>> > > > >>>
> > > > > >>> > > > >>>
> > > > > >>> > > > >>> --
> > > > > >>> > > > >>> Best Regards
> > > > > >>> > > > >>>
> > > > > >>> > > > >>> Jeff Zhang
> > > > > >>> > > > >>>
> > > > > >>> > > > >>
> > > > > >>> > > > >
> > > > > >>> > > > >
> > > > > >>> > > > > --
> > > > > >>> > > > > Best Regards
> > > > > >>> > > > >
> > > > > >>> > > > > Jeff Zhang
> > > > > >>> > > >
> > > > > >>> > > >
> > > > > >>> > >
> > > > > >>> > > --
> > > > > >>> > > Best Regards
> > > > > >>> > >
> > > > > >>> > > Jeff Zhang
> > > > > >>> > >
> > > > > >>> >
> > > > > >>>
> > > > > >>
> > > > >
> > > > > --
> > > > > Best Regards
> > > > >
> > > > > Jeff Zhang
> > > > >
> > > >
> > >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>

Reply via email to