Thanks for your explanation Kostas. I agree that Clients are independent from the Executors. From your text I wonder one thing that whether Executor#execute returns a cluster client or a job client? As discussed previously I think conceptually it is a job client?
Best, tison. Kostas Kloudas <kklou...@gmail.com> 于2019年10月10日周四 下午5:08写道: > Hi Tison, > > I would say that as a first step, and until we see that the interfaces > we introduce cover all intended purposes, we keep the Executors > non-public. > From the previous discussion, I think that in general the Clients are > independent from the Executors, as the Executors simply use the > clients to submit jobs and return a cluster client. > > Cheers, > Kostas > > On Wed, Oct 9, 2019 at 7:01 PM Zili Chen <wander4...@gmail.com> wrote: > > > > Hi Kostas & Aljoscha, > > > > I'm drafting a plan exposing multi-layered clients. It is mainly about > > how we distinguish different layers and what clients we're going to > > expose. > > > > In FLIP-73 scope I'd like to ask a question that whether or not Executor > > becomes a public interface that can be made use of by downstream > > project developer? Or it just an internal concept for unifying job > > submission? > > If it is the latter, I'm feeling multi-layer client topic is totally > > independent from > > Executor. > > > > Best, > > tison. > > > > > > Thomas Weise <t...@apache.org> 于2019年10月5日周六 上午12:17写道: > > > > > It might be useful to mention on FLIP-73 that the intention for > > > Executor.execute is to be an asynchronous API once it becomes public > and > > > also refer to FLIP-74 as such. > > > > > > > > > On Fri, Oct 4, 2019 at 2:52 AM Aljoscha Krettek <aljos...@apache.org> > > > wrote: > > > > > > > Hi Tison, > > > > > > > > I agree, for now the async Executor.execute() is an internal detail > but > > > > during your work for FLIP-74 it will probably also reach the public > API. > > > > > > > > Best, > > > > Aljoscha > > > > > > > > > On 4. Oct 2019, at 11:39, Zili Chen <wander4...@gmail.com> wrote: > > > > > > > > > > Hi Aljoscha, > > > > > > > > > > After clearly narrow the scope of this FLIP it looks good to me the > > > > > interface > > > > > Executor and its discovery so that I'm glad to see the vote thread. > > > > > > > > > > As you said, we should still discuss on implementation details but > I > > > > don't > > > > > think > > > > > it should be a blocker of the vote thread because a vote means we > > > > generally > > > > > agree on the motivation and overall design. > > > > > > > > > > As for Executor.execute() to be async, it is much better than we > keep > > > the > > > > > difference between sync/async in this level. But I'd like to note > that > > > it > > > > > only > > > > > works internally for now because user-facing interface is still > > > > env.execute > > > > > which block and return a JobExecutionResult. I'm afraid that there > are > > > > > several > > > > > people depends on the result for doing post execution process, > although > > > > it > > > > > doesn't > > > > > work on current per-job mode. > > > > > > > > > > Best, > > > > > tison. > > > > > > > > > > > > > > > Aljoscha Krettek <aljos...@apache.org> 于2019年10月4日周五 下午4:40写道: > > > > > > > > > >> Do you all think we could agree on the basic executor primitives > and > > > > start > > > > >> voting on this FLIP? There are still some implementation details > but I > > > > >> think we can discuss/tackle them when we get to them and the > various > > > > people > > > > >> implementing this should be in close collaboration. > > > > >> > > > > >> Best, > > > > >> Aljoscha > > > > >> > > > > >>> On 4. Oct 2019, at 10:15, Aljoscha Krettek <aljos...@apache.org> > > > > wrote: > > > > >>> > > > > >>> Hi, > > > > >>> > > > > >>> I think the end goal is to have only one environment per API, > but I > > > > >> think we won’t be able to achieve that in the short-term because > of > > > > >> backwards compatibility. This is most notable with the context > > > > environment, > > > > >> preview environments etc. > > > > >>> > > > > >>> To keep this FLIP very slim we can make this only about the > executors > > > > >> and executor discovery. Anything else like job submission > semantics, > > > > >> detached mode, … can be tackled after this. If we don’t focus I’m > > > afraid > > > > >> this will drag on for quite a while. > > > > >>> > > > > >>> One thing I would like to propose to make this easier is to > change > > > > >> Executor.execute() to return a CompletableFuture and to completely > > > > remove > > > > >> the “detached” logic from ClusterClient. That way, the new > components > > > > make > > > > >> no distinction between “detached” and “attached” but we can still > do > > > it > > > > in > > > > >> the CLI (via the ContextEnvironment) to support the existing > > > “detached” > > > > >> behaviour of the CLI that users expect. What do you think about > this? > > > > >>> > > > > >>> Best, > > > > >>> Aljoscha > > > > >>> > > > > >>>> On 3. Oct 2019, at 10:03, Zili Chen <wander4...@gmail.com> > wrote: > > > > >>>> > > > > >>>> Thanks for your explanation Kostas to make it clear subtasks > under > > > > >> FLIP-73. > > > > >>>> > > > > >>>> As you described, changes of Environment are included in this > FLIP. > > > > For > > > > >>>> "each > > > > >>>> API to have a single Environment", it could be helpful to > describe > > > > which > > > > >>>> APIs we'd > > > > >>>> like to have after FLIP-73. And if we keep multiple > Environments, > > > > shall > > > > >> we > > > > >>>> keep the > > > > >>>> way inject context environment for each API? > > > > >>>> > > > > >>>> > > > > >>>> Kostas Kloudas <kklou...@gmail.com> 于2019年10月3日周四 下午1:44写道: > > > > >>>> > > > > >>>>> Hi Tison, > > > > >>>>> > > > > >>>>> The changes that this FLIP propose are: > > > > >>>>> - the introduction of the Executor interface > > > > >>>>> - the fact that everything in the current state of job > submission > > > in > > > > >>>>> Flink can be defined through configuration parameters > > > > >>>>> - implementation of Executors that do not change any of the > > > semantics > > > > >>>>> of the currently offered "modes" of job submission > > > > >>>>> > > > > >>>>> In this, and in the FLIP itself where the > > > > >>>>> ExecutionEnvironment.execute() method is described, there are > > > details > > > > >>>>> about parts of the > > > > >>>>> integration with the existing Flink code-base. > > > > >>>>> > > > > >>>>> So I am not sure what do you mean by making the "integration a > > > > >>>>> follow-up discussion". > > > > >>>>> > > > > >>>>> Cheers, > > > > >>>>> Kostas > > > > >>>>> > > > > >>>>> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen <wander4...@gmail.com > > > > > > wrote: > > > > >>>>>> > > > > >>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal > to the > > > > >>>>>> Executors work, as they are using the exexute() method because > > > this > > > > is > > > > >>>>>> the only "entry" to the user program. To this regard, I > believe we > > > > >>>>>> should just see the fact that they have their dedicated > > > environment > > > > as > > > > >>>>>> an "implementation detail". > > > > >>>>>> > > > > >>>>>> The proposal says > > > > >>>>>> > > > > >>>>>> In this document, we propose to abstract away from the > > > Environments > > > > >> the > > > > >>>>> job > > > > >>>>>> submission logic and put it in a newly introduced Executor. > This > > > > will > > > > >>>>>> allow *each > > > > >>>>>> API to have a single Environment* which, based on the provided > > > > >>>>>> configuration, will decide which executor to use, *e.g.* Yarn, > > > > Local, > > > > >>>>> etc. > > > > >>>>>> In addition, it will allow different APIs and downstream > projects > > > to > > > > >>>>> re-use > > > > >>>>>> the provided executors, thus limiting the amount of code > > > duplication > > > > >> and > > > > >>>>>> the amount of code that has to be written. > > > > >>>>>> > > > > >>>>>> note that This will allow *each API to have a single > Environment* > > > > it > > > > >>>>>> seems a bit diverge with you statement above. Or we say a > single > > > > >>>>> Environment > > > > >>>>>> as a possible advantage after the introduction of Executor so > that > > > > we > > > > >>>>>> exclude it > > > > >>>>>> from this pass. > > > > >>>>>> > > > > >>>>>> Best, > > > > >>>>>> tison. > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> Zili Chen <wander4...@gmail.com> 于2019年10月3日周四 上午2:07写道: > > > > >>>>>> > > > > >>>>>>> BTW, correct me if I misunderstand, now I learn more about > our > > > > >>>>> community > > > > >>>>>>> way. Since FLIP-73 aimed at introducing an interface with > > > community > > > > >>>>>>> consensus the discussion is more about the interface in > order to > > > > >>>>> properly > > > > >>>>>>> define a useful and extensible API. The integration story > could > > > be > > > > a > > > > >>>>>>> follow up > > > > >>>>>>> since this one does not affect current behavior at all. > > > > >>>>>>> > > > > >>>>>>> Best, > > > > >>>>>>> tison. > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> Zili Chen <wander4...@gmail.com> 于2019年10月3日周四 上午2:02写道: > > > > >>>>>>> > > > > >>>>>>>> Hi Kostas, > > > > >>>>>>>> > > > > >>>>>>>> It seems does no harm we have a configuration parameter of > > > > >>>>>>>> Executor#execute > > > > >>>>>>>> since we can merge this one with the one configured on > Executor > > > > >>>>> created > > > > >>>>>>>> and > > > > >>>>>>>> let this one overwhelm that one. > > > > >>>>>>>> > > > > >>>>>>>> I can see it is useful that conceptually we can create an > > > Executor > > > > >>>>> for a > > > > >>>>>>>> series jobs > > > > >>>>>>>> to the same cluster but with different job configuration per > > > > >> pipeline. > > > > >>>>>>>> > > > > >>>>>>>> Best, > > > > >>>>>>>> tison. > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> Kostas Kloudas <kklou...@apache.org> 于2019年10月3日周四 > 上午1:37写道: > > > > >>>>>>>> > > > > >>>>>>>>> Hi again, > > > > >>>>>>>>> > > > > >>>>>>>>> I did not include this to my previous email, as this is > related > > > > to > > > > >>>>> the > > > > >>>>>>>>> proposal on the FLIP itself. > > > > >>>>>>>>> > > > > >>>>>>>>> In the existing proposal, the Executor interface is the > > > > following. > > > > >>>>>>>>> > > > > >>>>>>>>> public interface Executor { > > > > >>>>>>>>> > > > > >>>>>>>>> JobExecutionResult execute(Pipeline pipeline) throws > Exception; > > > > >>>>>>>>> > > > > >>>>>>>>> } > > > > >>>>>>>>> > > > > >>>>>>>>> This implies that all the necessary information for the > > > execution > > > > >> of > > > > >>>>> a > > > > >>>>>>>>> Pipeline should be included in the Configuration passed in > the > > > > >>>>>>>>> ExecutorFactory which instantiates the Executor itself. > This > > > > should > > > > >>>>>>>>> include, for example, all the parameters currently > supplied by > > > > the > > > > >>>>>>>>> ProgramOptions, which are conceptually not executor > parameters > > > > but > > > > >>>>>>>>> rather parameters for the execution of the specific > pipeline. > > > To > > > > >> this > > > > >>>>>>>>> end, I would like to propose a change in the current > Executor > > > > >>>>>>>>> interface showcased below: > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> public interface Executor { > > > > >>>>>>>>> > > > > >>>>>>>>> JobExecutionResult execute(Pipeline pipeline, Configuration > > > > >>>>>>>>> executionOptions) throws Exception; > > > > >>>>>>>>> > > > > >>>>>>>>> } > > > > >>>>>>>>> > > > > >>>>>>>>> The above will allow to have the Executor specific options > > > passed > > > > >> in > > > > >>>>>>>>> the configuration given during executor instantiation, > while > > > the > > > > >>>>>>>>> pipeline specific options can be passed in the > > > executionOptions. > > > > >> As a > > > > >>>>>>>>> positive side-effect, this will make Executors re-usable, > i.e. > > > > >>>>>>>>> instantiate an executor and use it to execute multiple > > > pipelines, > > > > >> if > > > > >>>>>>>>> in the future we choose to do so. > > > > >>>>>>>>> > > > > >>>>>>>>> Let me know what do you think, > > > > >>>>>>>>> Kostas > > > > >>>>>>>>> > > > > >>>>>>>>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas < > > > > kklou...@apache.org > > > > >>> > > > > >>>>>>>>> wrote: > > > > >>>>>>>>>> > > > > >>>>>>>>>> Hi all, > > > > >>>>>>>>>> > > > > >>>>>>>>>> I agree with Tison that we should disentangle threads so > that > > > > >>>>> people > > > > >>>>>>>>>> can work independently. > > > > >>>>>>>>>> > > > > >>>>>>>>>> For FLIP-73: > > > > >>>>>>>>>> - for Preview/OptimizedPlanEnv: I think they are > orthogonal to > > > > the > > > > >>>>>>>>>> Executors work, as they are using the exexute() method > because > > > > >>>>> this is > > > > >>>>>>>>>> the only "entry" to the user program. To this regard, I > > > believe > > > > we > > > > >>>>>>>>>> should just see the fact that they have their dedicated > > > > >>>>> environment as > > > > >>>>>>>>>> an "implementation detail". > > > > >>>>>>>>>> - for getting rid of the per-job mode: as a first note, > there > > > > was > > > > >>>>>>>>>> already a discussion here: > > > > >>>>>>>>>> > > > > >>>>>>>>> > > > > >>>>> > > > > >> > > > > > > > > https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E > > > > >>>>>>>>>> with many people, including myself, expressing their > opinion. > > > I > > > > am > > > > >>>>>>>>>> mentioning that to show that this topic already has some > > > history > > > > >>>>> and > > > > >>>>>>>>>> the discussin does not start from scratch but there are > > > already > > > > >>>>> some > > > > >>>>>>>>>> contradicting opinions. My opinion is that we should not > get > > > rid > > > > >> of > > > > >>>>>>>>>> the per-job mode but I agree that we should discuss about > the > > > > >>>>>>>>>> semantics in more detail. Although in terms of code it > may be > > > > >>>>> tempting > > > > >>>>>>>>>> to "merge" the two submission modes, one of the main > benefits > > > of > > > > >>>>> the > > > > >>>>>>>>>> per-job mode is isolation, both for resources and > security, as > > > > the > > > > >>>>>>>>>> jobGraph to be executed is fixed and the cluster is > "locked" > > > > just > > > > >>>>> for > > > > >>>>>>>>>> that specific graph. This would be violated by having a > > > session > > > > >>>>>>>>>> cluster launched and having all the infrastrucutre (ports > and > > > > >>>>>>>>>> endpoints) set for submittting to that cluster any job. > > > > >>>>>>>>>> - for getting rid of the "detached" mode: I agree with > getting > > > > rid > > > > >>>>> of > > > > >>>>>>>>>> it but this implies some potential user-facing changes > that > > > > should > > > > >>>>> be > > > > >>>>>>>>>> discussed. > > > > >>>>>>>>>> > > > > >>>>>>>>>> Given the above, I think that: > > > > >>>>>>>>>> 1) in the context of FLIP-73 we should not change any > > > semantics > > > > >> but > > > > >>>>>>>>>> simply push the existing submission logic behind a > reusable > > > > >>>>>>>>>> abstraction and make it usable via public APIs, as > Aljoscha > > > > said. > > > > >>>>>>>>>> 2) as Till said, changing the semantics is beyond the > scope of > > > > >> this > > > > >>>>>>>>>> FLIP and as Tison mentioned we should work towards > decoupling > > > > >>>>>>>>>> discussions rather than the opposite. So let's discuss > about > > > the > > > > >>>>>>>>>> future of the per-job and detached modes in a separate > thread. > > > > >> This > > > > >>>>>>>>>> will also allow to give the proper visibility to such an > > > > important > > > > >>>>>>>>>> topic. > > > > >>>>>>>>>> > > > > >>>>>>>>>> Cheers, > > > > >>>>>>>>>> Kostas > > > > >>>>>>>>>> > > > > >>>>>>>>>> On Wed, Oct 2, 2019 at 4:40 PM Zili Chen < > > > wander4...@gmail.com> > > > > >>>>> wrote: > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Thanks for your thoughts Aljoscha. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Another question since FLIP-73 might contains refactors > on > > > > >>>>>>>>> Environemnt: > > > > >>>>>>>>>>> shall we support > > > > >>>>>>>>>>> something like PreviewPlanEnvironment? If so, how? From a > > > user > > > > >>>>>>>>> perspective > > > > >>>>>>>>>>> preview plan > > > > >>>>>>>>>>> is useful, by give visual view, to modify topos and > configure > > > > >>>>> without > > > > >>>>>>>>>>> submit it. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Best, > > > > >>>>>>>>>>> tison. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> 于2019年10月2日周三 > > > > 下午10:10写道: > > > > >>>>>>>>>>> > > > > >>>>>>>>>>>> I agree with Till that we should not change the > semantics of > > > > >>>>>>>>> per-job mode. > > > > >>>>>>>>>>>> In my opinion per-job mode means that the cluster > > > (JobManager) > > > > >>>>> is > > > > >>>>>>>>> brought > > > > >>>>>>>>>>>> up with one job and it only executes that one job. There > > > > >>>>> should be > > > > >>>>>>>>> no open > > > > >>>>>>>>>>>> ports/anything that would allow submitting further jobs. > > > This > > > > >>>>> is > > > > >>>>>>>>> very > > > > >>>>>>>>>>>> important for deployments in docker/Kubernetes or other > > > > >>>>>>>>> environments were > > > > >>>>>>>>>>>> you bring up jobs without necessarily having the notion > of a > > > > >>>>> Flink > > > > >>>>>>>>> cluster. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> What this means for a user program that has multiple > > > execute() > > > > >>>>>>>>> calls is > > > > >>>>>>>>>>>> that you will get a fresh cluster for each execute call. > > > This > > > > >>>>> also > > > > >>>>>>>>> means, > > > > >>>>>>>>>>>> that further execute() calls will only happen if the > > > “client” > > > > >>>>> is > > > > >>>>>>>>> still > > > > >>>>>>>>>>>> alive, because it is the one driving execution. > Currently, > > > > this > > > > >>>>>>>>> only works > > > > >>>>>>>>>>>> if you start the job in “attached” mode. If you start in > > > > >>>>>>>>> “detached” mode > > > > >>>>>>>>>>>> only the first execute() will happen and the rest will > be > > > > >>>>> ignored. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> This brings us to the tricky question about what to do > about > > > > >>>>>>>>> “detached” > > > > >>>>>>>>>>>> and “attached”. In the long run, I would like to get > rid of > > > > the > > > > >>>>>>>>> distinction > > > > >>>>>>>>>>>> and leave it up to the user program, by either blocking > or > > > not > > > > >>>>> on > > > > >>>>>>>>> the > > > > >>>>>>>>>>>> Future (or JobClient or whatnot) that job submission > > > returns. > > > > >>>>> This, > > > > >>>>>>>>>>>> however, means that users cannot simply request > “detached” > > > > >>>>>>>>> execution when > > > > >>>>>>>>>>>> using bin/flink, the user program has to “play along”. > On > > > the > > > > >>>>>>>>> other hand, > > > > >>>>>>>>>>>> “detached” mode is quite strange for the user program. > The > > > > >>>>>>>>> execute() call > > > > >>>>>>>>>>>> either returns with a proper job result after the job > ran > > > (in > > > > >>>>>>>>> “attached” > > > > >>>>>>>>>>>> mode) or with a dummy result (in “detached” mode) right > > > after > > > > >>>>>>>>> submission. I > > > > >>>>>>>>>>>> think this can even lead to weird cases where multiple > > > > >>>>> "execute()” > > > > >>>>>>>>> run in > > > > >>>>>>>>>>>> parallel. For per-job detached mode we also “throw” out > of > > > the > > > > >>>>>>>>> first > > > > >>>>>>>>>>>> execute so the rest (including result processing logic) > is > > > > >>>>> ignored. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> For this here FLIP-73 we can (and should) ignore these > > > > >>>>> problems, > > > > >>>>>>>>> because > > > > >>>>>>>>>>>> FLIP-73 only moves the existing submission logic behind > a > > > > >>>>> reusable > > > > >>>>>>>>>>>> abstraction and makes it usable via API. We should > closely > > > > >>>>> follow > > > > >>>>>>>>> up on the > > > > >>>>>>>>>>>> above points though because I think they are also > important. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Best, > > > > >>>>>>>>>>>> Aljoscha > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> On 2. Oct 2019, at 12:08, Zili Chen < > wander4...@gmail.com> > > > > >>>>>>>>> wrote: > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Thanks for your clarification Till. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> I agree with the current semantics of the per-job > mode, one > > > > >>>>>>>>> should > > > > >>>>>>>>>>>> deploy a > > > > >>>>>>>>>>>>> new cluster for each part of the job. Apart from the > > > > >>>>> performance > > > > >>>>>>>>> concern > > > > >>>>>>>>>>>>> it also means that PerJobExecutor knows how to deploy a > > > > >>>>> cluster > > > > >>>>>>>>> actually, > > > > >>>>>>>>>>>>> which is different from the description that Executor > > > submit > > > > >>>>> a > > > > >>>>>>>>> job. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Anyway it sounds workable and narrow the changes. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>> > > > > >>>>> > > > > >>> > > > > >> > > > > >> > > > > > > > > > > > >