Thanks for your explanation Kostas.

I agree that Clients are independent from the Executors. From
your text I wonder one thing that whether Executor#execute
returns a cluster client or a job client? As discussed previously
I think conceptually it is a job client?

Best,
tison.


Kostas Kloudas <kklou...@gmail.com> 于2019年10月10日周四 下午5:08写道:

> Hi Tison,
>
> I would say that as a first step, and until we see that the interfaces
> we introduce cover all intended purposes, we keep the Executors
> non-public.
> From the previous discussion, I think that in general the Clients are
> independent from the Executors, as the Executors simply use the
> clients to submit jobs and return a cluster client.
>
> Cheers,
> Kostas
>
> On Wed, Oct 9, 2019 at 7:01 PM Zili Chen <wander4...@gmail.com> wrote:
> >
> > Hi Kostas & Aljoscha,
> >
> > I'm drafting a plan exposing multi-layered clients. It is mainly about
> > how we distinguish different layers and what clients we're going to
> > expose.
> >
> > In FLIP-73 scope I'd like to ask a question that whether or not Executor
> > becomes a public interface that can be made use of by downstream
> > project developer? Or it just an internal concept for unifying job
> > submission?
> > If it is the latter, I'm feeling multi-layer client topic is totally
> > independent from
> > Executor.
> >
> > Best,
> > tison.
> >
> >
> > Thomas Weise <t...@apache.org> 于2019年10月5日周六 上午12:17写道:
> >
> > > It might be useful to mention on FLIP-73 that the intention for
> > > Executor.execute is to be an asynchronous API once it becomes public
> and
> > > also refer to FLIP-74 as such.
> > >
> > >
> > > On Fri, Oct 4, 2019 at 2:52 AM Aljoscha Krettek <aljos...@apache.org>
> > > wrote:
> > >
> > > > Hi Tison,
> > > >
> > > > I agree, for now the async Executor.execute() is an internal detail
> but
> > > > during your work for FLIP-74 it will probably also reach the public
> API.
> > > >
> > > > Best,
> > > > Aljoscha
> > > >
> > > > > On 4. Oct 2019, at 11:39, Zili Chen <wander4...@gmail.com> wrote:
> > > > >
> > > > > Hi Aljoscha,
> > > > >
> > > > > After clearly narrow the scope of this FLIP it looks good to me the
> > > > > interface
> > > > > Executor and its discovery so that I'm glad to see the vote thread.
> > > > >
> > > > > As you said, we should still discuss on implementation details but
> I
> > > > don't
> > > > > think
> > > > > it should be a blocker of the vote thread because a vote means we
> > > > generally
> > > > > agree on the motivation and overall design.
> > > > >
> > > > > As for Executor.execute() to be async, it is much better than we
> keep
> > > the
> > > > > difference between sync/async in this level. But I'd like to note
> that
> > > it
> > > > > only
> > > > > works internally for now because user-facing interface is still
> > > > env.execute
> > > > > which block and return a JobExecutionResult. I'm afraid that there
> are
> > > > > several
> > > > > people depends on the result for doing post execution process,
> although
> > > > it
> > > > > doesn't
> > > > > work on current per-job mode.
> > > > >
> > > > > Best,
> > > > > tison.
> > > > >
> > > > >
> > > > > Aljoscha Krettek <aljos...@apache.org> 于2019年10月4日周五 下午4:40写道:
> > > > >
> > > > >> Do you all think we could agree on the basic executor primitives
> and
> > > > start
> > > > >> voting on this FLIP? There are still some implementation details
> but I
> > > > >> think we can discuss/tackle them when we get to them and the
> various
> > > > people
> > > > >> implementing this should be in close collaboration.
> > > > >>
> > > > >> Best,
> > > > >> Aljoscha
> > > > >>
> > > > >>> On 4. Oct 2019, at 10:15, Aljoscha Krettek <aljos...@apache.org>
> > > > wrote:
> > > > >>>
> > > > >>> Hi,
> > > > >>>
> > > > >>> I think the end goal is to have only one environment per API,
> but I
> > > > >> think we won’t be able to achieve that in the short-term because
> of
> > > > >> backwards compatibility. This is most notable with the context
> > > > environment,
> > > > >> preview environments etc.
> > > > >>>
> > > > >>> To keep this FLIP very slim we can make this only about the
> executors
> > > > >> and executor discovery. Anything else like job submission
> semantics,
> > > > >> detached mode, … can be tackled after this. If we don’t focus I’m
> > > afraid
> > > > >> this will drag on for quite a while.
> > > > >>>
> > > > >>> One thing I would like to propose to make this easier is to
> change
> > > > >> Executor.execute() to return a CompletableFuture and to completely
> > > > remove
> > > > >> the “detached” logic from ClusterClient. That way, the new
> components
> > > > make
> > > > >> no distinction between “detached” and “attached” but we can still
> do
> > > it
> > > > in
> > > > >> the CLI (via the ContextEnvironment) to support the existing
> > > “detached”
> > > > >> behaviour of the CLI that users expect. What do you think about
> this?
> > > > >>>
> > > > >>> Best,
> > > > >>> Aljoscha
> > > > >>>
> > > > >>>> On 3. Oct 2019, at 10:03, Zili Chen <wander4...@gmail.com>
> wrote:
> > > > >>>>
> > > > >>>> Thanks for your explanation Kostas to make it clear subtasks
> under
> > > > >> FLIP-73.
> > > > >>>>
> > > > >>>> As you described, changes of Environment are included in this
> FLIP.
> > > > For
> > > > >>>> "each
> > > > >>>> API to have a single Environment", it could be helpful to
> describe
> > > > which
> > > > >>>> APIs we'd
> > > > >>>> like to have after FLIP-73. And if we keep multiple
> Environments,
> > > > shall
> > > > >> we
> > > > >>>> keep the
> > > > >>>> way inject context environment for each API?
> > > > >>>>
> > > > >>>>
> > > > >>>> Kostas Kloudas <kklou...@gmail.com> 于2019年10月3日周四 下午1:44写道:
> > > > >>>>
> > > > >>>>> Hi Tison,
> > > > >>>>>
> > > > >>>>> The changes that this FLIP propose are:
> > > > >>>>> - the introduction of the Executor interface
> > > > >>>>> - the fact that everything in the current state of job
> submission
> > > in
> > > > >>>>> Flink can be defined through configuration parameters
> > > > >>>>> - implementation of Executors that do not change any of the
> > > semantics
> > > > >>>>> of the currently offered "modes" of job submission
> > > > >>>>>
> > > > >>>>> In this, and in the FLIP itself where the
> > > > >>>>> ExecutionEnvironment.execute() method is described, there are
> > > details
> > > > >>>>> about parts of the
> > > > >>>>> integration with the existing Flink code-base.
> > > > >>>>>
> > > > >>>>> So I am not sure what do you mean by making the "integration a
> > > > >>>>> follow-up discussion".
> > > > >>>>>
> > > > >>>>> Cheers,
> > > > >>>>> Kostas
> > > > >>>>>
> > > > >>>>> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen <wander4...@gmail.com
> >
> > > > wrote:
> > > > >>>>>>
> > > > >>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal
> to the
> > > > >>>>>> Executors work, as they are using the exexute() method because
> > > this
> > > > is
> > > > >>>>>> the only "entry" to the user program. To this regard, I
> believe we
> > > > >>>>>> should just see the fact that they have their dedicated
> > > environment
> > > > as
> > > > >>>>>> an "implementation detail".
> > > > >>>>>>
> > > > >>>>>> The proposal says
> > > > >>>>>>
> > > > >>>>>> In this document, we propose to abstract away from the
> > > Environments
> > > > >> the
> > > > >>>>> job
> > > > >>>>>> submission logic and put it in a newly introduced Executor.
> This
> > > > will
> > > > >>>>>> allow *each
> > > > >>>>>> API to have a single Environment* which, based on the provided
> > > > >>>>>> configuration, will decide which executor to use, *e.g.* Yarn,
> > > > Local,
> > > > >>>>> etc.
> > > > >>>>>> In addition, it will allow different APIs and downstream
> projects
> > > to
> > > > >>>>> re-use
> > > > >>>>>> the provided executors, thus limiting the amount of code
> > > duplication
> > > > >> and
> > > > >>>>>> the amount of code that has to be written.
> > > > >>>>>>
> > > > >>>>>> note that This will allow *each API to have a single
> Environment*
> > > > it
> > > > >>>>>> seems a bit diverge with you statement above. Or we say a
> single
> > > > >>>>> Environment
> > > > >>>>>> as a possible advantage after the introduction of Executor so
> that
> > > > we
> > > > >>>>>> exclude it
> > > > >>>>>> from this pass.
> > > > >>>>>>
> > > > >>>>>> Best,
> > > > >>>>>> tison.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> Zili Chen <wander4...@gmail.com> 于2019年10月3日周四 上午2:07写道:
> > > > >>>>>>
> > > > >>>>>>> BTW, correct me if I misunderstand, now I learn more about
> our
> > > > >>>>> community
> > > > >>>>>>> way. Since FLIP-73 aimed at introducing an interface with
> > > community
> > > > >>>>>>> consensus the discussion is more about the interface in
> order to
> > > > >>>>> properly
> > > > >>>>>>> define a useful and extensible API. The integration story
> could
> > > be
> > > > a
> > > > >>>>>>> follow up
> > > > >>>>>>> since this one does not affect current behavior at all.
> > > > >>>>>>>
> > > > >>>>>>> Best,
> > > > >>>>>>> tison.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Zili Chen <wander4...@gmail.com> 于2019年10月3日周四 上午2:02写道:
> > > > >>>>>>>
> > > > >>>>>>>> Hi Kostas,
> > > > >>>>>>>>
> > > > >>>>>>>> It seems does no harm we have a configuration parameter of
> > > > >>>>>>>> Executor#execute
> > > > >>>>>>>> since we can merge this one with the one configured on
> Executor
> > > > >>>>> created
> > > > >>>>>>>> and
> > > > >>>>>>>> let this one overwhelm that one.
> > > > >>>>>>>>
> > > > >>>>>>>> I can see it is useful that conceptually we can create an
> > > Executor
> > > > >>>>> for a
> > > > >>>>>>>> series jobs
> > > > >>>>>>>> to the same cluster but with different job configuration per
> > > > >> pipeline.
> > > > >>>>>>>>
> > > > >>>>>>>> Best,
> > > > >>>>>>>> tison.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> Kostas Kloudas <kklou...@apache.org> 于2019年10月3日周四
> 上午1:37写道:
> > > > >>>>>>>>
> > > > >>>>>>>>> Hi again,
> > > > >>>>>>>>>
> > > > >>>>>>>>> I did not include this to my previous email, as this is
> related
> > > > to
> > > > >>>>> the
> > > > >>>>>>>>> proposal on the FLIP itself.
> > > > >>>>>>>>>
> > > > >>>>>>>>> In the existing proposal, the Executor interface is the
> > > > following.
> > > > >>>>>>>>>
> > > > >>>>>>>>> public interface Executor {
> > > > >>>>>>>>>
> > > > >>>>>>>>> JobExecutionResult execute(Pipeline pipeline) throws
> Exception;
> > > > >>>>>>>>>
> > > > >>>>>>>>> }
> > > > >>>>>>>>>
> > > > >>>>>>>>> This implies that all the necessary information for the
> > > execution
> > > > >> of
> > > > >>>>> a
> > > > >>>>>>>>> Pipeline should be included in the Configuration passed in
> the
> > > > >>>>>>>>> ExecutorFactory which instantiates the Executor itself.
> This
> > > > should
> > > > >>>>>>>>> include, for example, all the parameters currently
> supplied by
> > > > the
> > > > >>>>>>>>> ProgramOptions, which are conceptually not executor
> parameters
> > > > but
> > > > >>>>>>>>> rather parameters for the execution of the specific
> pipeline.
> > > To
> > > > >> this
> > > > >>>>>>>>> end, I would like to propose a change in the current
> Executor
> > > > >>>>>>>>> interface showcased below:
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> public interface Executor {
> > > > >>>>>>>>>
> > > > >>>>>>>>> JobExecutionResult execute(Pipeline pipeline, Configuration
> > > > >>>>>>>>> executionOptions) throws Exception;
> > > > >>>>>>>>>
> > > > >>>>>>>>> }
> > > > >>>>>>>>>
> > > > >>>>>>>>> The above will allow to have the Executor specific options
> > > passed
> > > > >> in
> > > > >>>>>>>>> the configuration given during executor instantiation,
> while
> > > the
> > > > >>>>>>>>> pipeline specific options can be passed in the
> > > executionOptions.
> > > > >> As a
> > > > >>>>>>>>> positive side-effect, this will make Executors re-usable,
> i.e.
> > > > >>>>>>>>> instantiate an executor and use it to execute multiple
> > > pipelines,
> > > > >> if
> > > > >>>>>>>>> in the future we choose to do so.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Let me know what do you think,
> > > > >>>>>>>>> Kostas
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <
> > > > kklou...@apache.org
> > > > >>>
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Hi all,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> I agree with Tison that we should disentangle threads so
> that
> > > > >>>>> people
> > > > >>>>>>>>>> can work independently.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> For FLIP-73:
> > > > >>>>>>>>>> - for Preview/OptimizedPlanEnv: I think they are
> orthogonal to
> > > > the
> > > > >>>>>>>>>> Executors work, as they are using the exexute() method
> because
> > > > >>>>> this is
> > > > >>>>>>>>>> the only "entry" to the user program. To this regard, I
> > > believe
> > > > we
> > > > >>>>>>>>>> should just see the fact that they have their dedicated
> > > > >>>>> environment as
> > > > >>>>>>>>>> an "implementation detail".
> > > > >>>>>>>>>> - for getting rid of the per-job mode: as a first note,
> there
> > > > was
> > > > >>>>>>>>>> already a discussion here:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>
> > > > >>
> > > >
> > >
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > > > >>>>>>>>>> with many people, including myself, expressing their
> opinion.
> > > I
> > > > am
> > > > >>>>>>>>>> mentioning that to show that this topic already has some
> > > history
> > > > >>>>> and
> > > > >>>>>>>>>> the discussin does not start from scratch but there are
> > > already
> > > > >>>>> some
> > > > >>>>>>>>>> contradicting opinions. My opinion is that we should not
> get
> > > rid
> > > > >> of
> > > > >>>>>>>>>> the per-job mode but I agree that we should discuss about
> the
> > > > >>>>>>>>>> semantics in more detail. Although in terms of code it
> may be
> > > > >>>>> tempting
> > > > >>>>>>>>>> to "merge" the two submission modes, one of the main
> benefits
> > > of
> > > > >>>>> the
> > > > >>>>>>>>>> per-job mode is isolation, both for resources and
> security, as
> > > > the
> > > > >>>>>>>>>> jobGraph to be executed is fixed and the cluster is
> "locked"
> > > > just
> > > > >>>>> for
> > > > >>>>>>>>>> that specific graph. This would be violated by having a
> > > session
> > > > >>>>>>>>>> cluster launched and having all the infrastrucutre (ports
> and
> > > > >>>>>>>>>> endpoints) set for submittting to that cluster any job.
> > > > >>>>>>>>>> - for getting rid of the "detached" mode: I agree with
> getting
> > > > rid
> > > > >>>>> of
> > > > >>>>>>>>>> it but this implies some potential user-facing changes
> that
> > > > should
> > > > >>>>> be
> > > > >>>>>>>>>> discussed.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Given the above, I think that:
> > > > >>>>>>>>>> 1) in the context of FLIP-73 we should not change any
> > > semantics
> > > > >> but
> > > > >>>>>>>>>> simply push the existing submission logic behind a
> reusable
> > > > >>>>>>>>>> abstraction and make it usable via public APIs, as
> Aljoscha
> > > > said.
> > > > >>>>>>>>>> 2) as Till said, changing the semantics is beyond the
> scope of
> > > > >> this
> > > > >>>>>>>>>> FLIP and as Tison mentioned we should work towards
> decoupling
> > > > >>>>>>>>>> discussions rather than the opposite. So let's discuss
> about
> > > the
> > > > >>>>>>>>>> future of the per-job and detached modes in a separate
> thread.
> > > > >> This
> > > > >>>>>>>>>> will also allow to give the proper visibility to such an
> > > > important
> > > > >>>>>>>>>> topic.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Cheers,
> > > > >>>>>>>>>> Kostas
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <
> > > wander4...@gmail.com>
> > > > >>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Thanks for your thoughts Aljoscha.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Another question since FLIP-73 might contains refactors
> on
> > > > >>>>>>>>> Environemnt:
> > > > >>>>>>>>>>> shall we support
> > > > >>>>>>>>>>> something like PreviewPlanEnvironment? If so, how? From a
> > > user
> > > > >>>>>>>>> perspective
> > > > >>>>>>>>>>> preview plan
> > > > >>>>>>>>>>> is useful, by give visual view, to modify topos and
> configure
> > > > >>>>> without
> > > > >>>>>>>>>>> submit it.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Best,
> > > > >>>>>>>>>>> tison.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> 于2019年10月2日周三
> > > > 下午10:10写道:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> I agree with Till that we should not change the
> semantics of
> > > > >>>>>>>>> per-job mode.
> > > > >>>>>>>>>>>> In my opinion per-job mode means that the cluster
> > > (JobManager)
> > > > >>>>> is
> > > > >>>>>>>>> brought
> > > > >>>>>>>>>>>> up with one job and it only executes that one job. There
> > > > >>>>> should be
> > > > >>>>>>>>> no open
> > > > >>>>>>>>>>>> ports/anything that would allow submitting further jobs.
> > > This
> > > > >>>>> is
> > > > >>>>>>>>> very
> > > > >>>>>>>>>>>> important for deployments in docker/Kubernetes or other
> > > > >>>>>>>>> environments were
> > > > >>>>>>>>>>>> you bring up jobs without necessarily having the notion
> of a
> > > > >>>>> Flink
> > > > >>>>>>>>> cluster.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> What this means for a user program that has multiple
> > > execute()
> > > > >>>>>>>>> calls is
> > > > >>>>>>>>>>>> that you will get a fresh cluster for each execute call.
> > > This
> > > > >>>>> also
> > > > >>>>>>>>> means,
> > > > >>>>>>>>>>>> that further execute() calls will only happen if the
> > > “client”
> > > > >>>>> is
> > > > >>>>>>>>> still
> > > > >>>>>>>>>>>> alive, because it is the one driving execution.
> Currently,
> > > > this
> > > > >>>>>>>>> only works
> > > > >>>>>>>>>>>> if you start the job in “attached” mode. If you start in
> > > > >>>>>>>>> “detached” mode
> > > > >>>>>>>>>>>> only the first execute() will happen and the rest will
> be
> > > > >>>>> ignored.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> This brings us to the tricky question about what to do
> about
> > > > >>>>>>>>> “detached”
> > > > >>>>>>>>>>>> and “attached”. In the long run, I would like to get
> rid of
> > > > the
> > > > >>>>>>>>> distinction
> > > > >>>>>>>>>>>> and leave it up to the user program, by either blocking
> or
> > > not
> > > > >>>>> on
> > > > >>>>>>>>> the
> > > > >>>>>>>>>>>> Future (or JobClient or whatnot) that job submission
> > > returns.
> > > > >>>>> This,
> > > > >>>>>>>>>>>> however, means that users cannot simply request
> “detached”
> > > > >>>>>>>>> execution when
> > > > >>>>>>>>>>>> using bin/flink, the user program has to “play along”.
> On
> > > the
> > > > >>>>>>>>> other hand,
> > > > >>>>>>>>>>>> “detached” mode is quite strange for the user program.
> The
> > > > >>>>>>>>> execute() call
> > > > >>>>>>>>>>>> either returns with a proper job result after the job
> ran
> > > (in
> > > > >>>>>>>>> “attached”
> > > > >>>>>>>>>>>> mode) or with a dummy result (in “detached” mode) right
> > > after
> > > > >>>>>>>>> submission. I
> > > > >>>>>>>>>>>> think this can even lead to weird cases where multiple
> > > > >>>>> "execute()”
> > > > >>>>>>>>> run in
> > > > >>>>>>>>>>>> parallel. For per-job detached mode we also “throw” out
> of
> > > the
> > > > >>>>>>>>> first
> > > > >>>>>>>>>>>> execute so the rest (including result processing logic)
> is
> > > > >>>>> ignored.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> For this here FLIP-73 we can (and should) ignore these
> > > > >>>>> problems,
> > > > >>>>>>>>> because
> > > > >>>>>>>>>>>> FLIP-73 only moves the existing submission logic behind
> a
> > > > >>>>> reusable
> > > > >>>>>>>>>>>> abstraction and makes it usable via API. We should
> closely
> > > > >>>>> follow
> > > > >>>>>>>>> up on the
> > > > >>>>>>>>>>>> above points though because I think they are also
> important.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Best,
> > > > >>>>>>>>>>>> Aljoscha
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> On 2. Oct 2019, at 12:08, Zili Chen <
> wander4...@gmail.com>
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Thanks for your clarification Till.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> I agree with the current semantics of the per-job
> mode, one
> > > > >>>>>>>>> should
> > > > >>>>>>>>>>>> deploy a
> > > > >>>>>>>>>>>>> new cluster for each part of the job. Apart from the
> > > > >>>>> performance
> > > > >>>>>>>>> concern
> > > > >>>>>>>>>>>>> it also means that PerJobExecutor knows how to deploy a
> > > > >>>>> cluster
> > > > >>>>>>>>> actually,
> > > > >>>>>>>>>>>>> which is different from the description that Executor
> > > submit
> > > > >>>>> a
> > > > >>>>>>>>> job.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Anyway it sounds workable and narrow the changes.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>
> > > > >>>
> > > > >>
> > > > >>
> > > >
> > > >
> > >
>

Reply via email to