Do you all think we could agree on the basic executor primitives and start voting on this FLIP? There are still some implementation details but I think we can discuss/tackle them when we get to them and the various people implementing this should be in close collaboration.
Best, Aljoscha > On 4. Oct 2019, at 10:15, Aljoscha Krettek <aljos...@apache.org> wrote: > > Hi, > > I think the end goal is to have only one environment per API, but I think we > won’t be able to achieve that in the short-term because of backwards > compatibility. This is most notable with the context environment, preview > environments etc. > > To keep this FLIP very slim we can make this only about the executors and > executor discovery. Anything else like job submission semantics, detached > mode, … can be tackled after this. If we don’t focus I’m afraid this will > drag on for quite a while. > > One thing I would like to propose to make this easier is to change > Executor.execute() to return a CompletableFuture and to completely remove the > “detached” logic from ClusterClient. That way, the new components make no > distinction between “detached” and “attached” but we can still do it in the > CLI (via the ContextEnvironment) to support the existing “detached” behaviour > of the CLI that users expect. What do you think about this? > > Best, > Aljoscha > >> On 3. Oct 2019, at 10:03, Zili Chen <wander4...@gmail.com> wrote: >> >> Thanks for your explanation Kostas to make it clear subtasks under FLIP-73. >> >> As you described, changes of Environment are included in this FLIP. For >> "each >> API to have a single Environment", it could be helpful to describe which >> APIs we'd >> like to have after FLIP-73. And if we keep multiple Environments, shall we >> keep the >> way inject context environment for each API? >> >> >> Kostas Kloudas <kklou...@gmail.com> 于2019年10月3日周四 下午1:44写道: >> >>> Hi Tison, >>> >>> The changes that this FLIP propose are: >>> - the introduction of the Executor interface >>> - the fact that everything in the current state of job submission in >>> Flink can be defined through configuration parameters >>> - implementation of Executors that do not change any of the semantics >>> of the currently offered "modes" of job submission >>> >>> In this, and in the FLIP itself where the >>> ExecutionEnvironment.execute() method is described, there are details >>> about parts of the >>> integration with the existing Flink code-base. >>> >>> So I am not sure what do you mean by making the "integration a >>> follow-up discussion". >>> >>> Cheers, >>> Kostas >>> >>> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen <wander4...@gmail.com> wrote: >>>> >>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the >>>> Executors work, as they are using the exexute() method because this is >>>> the only "entry" to the user program. To this regard, I believe we >>>> should just see the fact that they have their dedicated environment as >>>> an "implementation detail". >>>> >>>> The proposal says >>>> >>>> In this document, we propose to abstract away from the Environments the >>> job >>>> submission logic and put it in a newly introduced Executor. This will >>>> allow *each >>>> API to have a single Environment* which, based on the provided >>>> configuration, will decide which executor to use, *e.g.* Yarn, Local, >>> etc. >>>> In addition, it will allow different APIs and downstream projects to >>> re-use >>>> the provided executors, thus limiting the amount of code duplication and >>>> the amount of code that has to be written. >>>> >>>> note that This will allow *each API to have a single Environment* it >>>> seems a bit diverge with you statement above. Or we say a single >>> Environment >>>> as a possible advantage after the introduction of Executor so that we >>>> exclude it >>>> from this pass. >>>> >>>> Best, >>>> tison. >>>> >>>> >>>> Zili Chen <wander4...@gmail.com> 于2019年10月3日周四 上午2:07写道: >>>> >>>>> BTW, correct me if I misunderstand, now I learn more about our >>> community >>>>> way. Since FLIP-73 aimed at introducing an interface with community >>>>> consensus the discussion is more about the interface in order to >>> properly >>>>> define a useful and extensible API. The integration story could be a >>>>> follow up >>>>> since this one does not affect current behavior at all. >>>>> >>>>> Best, >>>>> tison. >>>>> >>>>> >>>>> Zili Chen <wander4...@gmail.com> 于2019年10月3日周四 上午2:02写道: >>>>> >>>>>> Hi Kostas, >>>>>> >>>>>> It seems does no harm we have a configuration parameter of >>>>>> Executor#execute >>>>>> since we can merge this one with the one configured on Executor >>> created >>>>>> and >>>>>> let this one overwhelm that one. >>>>>> >>>>>> I can see it is useful that conceptually we can create an Executor >>> for a >>>>>> series jobs >>>>>> to the same cluster but with different job configuration per pipeline. >>>>>> >>>>>> Best, >>>>>> tison. >>>>>> >>>>>> >>>>>> Kostas Kloudas <kklou...@apache.org> 于2019年10月3日周四 上午1:37写道: >>>>>> >>>>>>> Hi again, >>>>>>> >>>>>>> I did not include this to my previous email, as this is related to >>> the >>>>>>> proposal on the FLIP itself. >>>>>>> >>>>>>> In the existing proposal, the Executor interface is the following. >>>>>>> >>>>>>> public interface Executor { >>>>>>> >>>>>>> JobExecutionResult execute(Pipeline pipeline) throws Exception; >>>>>>> >>>>>>> } >>>>>>> >>>>>>> This implies that all the necessary information for the execution of >>> a >>>>>>> Pipeline should be included in the Configuration passed in the >>>>>>> ExecutorFactory which instantiates the Executor itself. This should >>>>>>> include, for example, all the parameters currently supplied by the >>>>>>> ProgramOptions, which are conceptually not executor parameters but >>>>>>> rather parameters for the execution of the specific pipeline. To this >>>>>>> end, I would like to propose a change in the current Executor >>>>>>> interface showcased below: >>>>>>> >>>>>>> >>>>>>> public interface Executor { >>>>>>> >>>>>>> JobExecutionResult execute(Pipeline pipeline, Configuration >>>>>>> executionOptions) throws Exception; >>>>>>> >>>>>>> } >>>>>>> >>>>>>> The above will allow to have the Executor specific options passed in >>>>>>> the configuration given during executor instantiation, while the >>>>>>> pipeline specific options can be passed in the executionOptions. As a >>>>>>> positive side-effect, this will make Executors re-usable, i.e. >>>>>>> instantiate an executor and use it to execute multiple pipelines, if >>>>>>> in the future we choose to do so. >>>>>>> >>>>>>> Let me know what do you think, >>>>>>> Kostas >>>>>>> >>>>>>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <kklou...@apache.org> >>>>>>> wrote: >>>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> I agree with Tison that we should disentangle threads so that >>> people >>>>>>>> can work independently. >>>>>>>> >>>>>>>> For FLIP-73: >>>>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the >>>>>>>> Executors work, as they are using the exexute() method because >>> this is >>>>>>>> the only "entry" to the user program. To this regard, I believe we >>>>>>>> should just see the fact that they have their dedicated >>> environment as >>>>>>>> an "implementation detail". >>>>>>>> - for getting rid of the per-job mode: as a first note, there was >>>>>>>> already a discussion here: >>>>>>>> >>>>>>> >>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E >>>>>>>> with many people, including myself, expressing their opinion. I am >>>>>>>> mentioning that to show that this topic already has some history >>> and >>>>>>>> the discussin does not start from scratch but there are already >>> some >>>>>>>> contradicting opinions. My opinion is that we should not get rid of >>>>>>>> the per-job mode but I agree that we should discuss about the >>>>>>>> semantics in more detail. Although in terms of code it may be >>> tempting >>>>>>>> to "merge" the two submission modes, one of the main benefits of >>> the >>>>>>>> per-job mode is isolation, both for resources and security, as the >>>>>>>> jobGraph to be executed is fixed and the cluster is "locked" just >>> for >>>>>>>> that specific graph. This would be violated by having a session >>>>>>>> cluster launched and having all the infrastrucutre (ports and >>>>>>>> endpoints) set for submittting to that cluster any job. >>>>>>>> - for getting rid of the "detached" mode: I agree with getting rid >>> of >>>>>>>> it but this implies some potential user-facing changes that should >>> be >>>>>>>> discussed. >>>>>>>> >>>>>>>> Given the above, I think that: >>>>>>>> 1) in the context of FLIP-73 we should not change any semantics but >>>>>>>> simply push the existing submission logic behind a reusable >>>>>>>> abstraction and make it usable via public APIs, as Aljoscha said. >>>>>>>> 2) as Till said, changing the semantics is beyond the scope of this >>>>>>>> FLIP and as Tison mentioned we should work towards decoupling >>>>>>>> discussions rather than the opposite. So let's discuss about the >>>>>>>> future of the per-job and detached modes in a separate thread. This >>>>>>>> will also allow to give the proper visibility to such an important >>>>>>>> topic. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Kostas >>>>>>>> >>>>>>>> On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <wander4...@gmail.com> >>> wrote: >>>>>>>>> >>>>>>>>> Thanks for your thoughts Aljoscha. >>>>>>>>> >>>>>>>>> Another question since FLIP-73 might contains refactors on >>>>>>> Environemnt: >>>>>>>>> shall we support >>>>>>>>> something like PreviewPlanEnvironment? If so, how? From a user >>>>>>> perspective >>>>>>>>> preview plan >>>>>>>>> is useful, by give visual view, to modify topos and configure >>> without >>>>>>>>> submit it. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> tison. >>>>>>>>> >>>>>>>>> >>>>>>>>> Aljoscha Krettek <aljos...@apache.org> 于2019年10月2日周三 下午10:10写道: >>>>>>>>> >>>>>>>>>> I agree with Till that we should not change the semantics of >>>>>>> per-job mode. >>>>>>>>>> In my opinion per-job mode means that the cluster (JobManager) >>> is >>>>>>> brought >>>>>>>>>> up with one job and it only executes that one job. There >>> should be >>>>>>> no open >>>>>>>>>> ports/anything that would allow submitting further jobs. This >>> is >>>>>>> very >>>>>>>>>> important for deployments in docker/Kubernetes or other >>>>>>> environments were >>>>>>>>>> you bring up jobs without necessarily having the notion of a >>> Flink >>>>>>> cluster. >>>>>>>>>> >>>>>>>>>> What this means for a user program that has multiple execute() >>>>>>> calls is >>>>>>>>>> that you will get a fresh cluster for each execute call. This >>> also >>>>>>> means, >>>>>>>>>> that further execute() calls will only happen if the “client” >>> is >>>>>>> still >>>>>>>>>> alive, because it is the one driving execution. Currently, this >>>>>>> only works >>>>>>>>>> if you start the job in “attached” mode. If you start in >>>>>>> “detached” mode >>>>>>>>>> only the first execute() will happen and the rest will be >>> ignored. >>>>>>>>>> >>>>>>>>>> This brings us to the tricky question about what to do about >>>>>>> “detached” >>>>>>>>>> and “attached”. In the long run, I would like to get rid of the >>>>>>> distinction >>>>>>>>>> and leave it up to the user program, by either blocking or not >>> on >>>>>>> the >>>>>>>>>> Future (or JobClient or whatnot) that job submission returns. >>> This, >>>>>>>>>> however, means that users cannot simply request “detached” >>>>>>> execution when >>>>>>>>>> using bin/flink, the user program has to “play along”. On the >>>>>>> other hand, >>>>>>>>>> “detached” mode is quite strange for the user program. The >>>>>>> execute() call >>>>>>>>>> either returns with a proper job result after the job ran (in >>>>>>> “attached” >>>>>>>>>> mode) or with a dummy result (in “detached” mode) right after >>>>>>> submission. I >>>>>>>>>> think this can even lead to weird cases where multiple >>> "execute()” >>>>>>> run in >>>>>>>>>> parallel. For per-job detached mode we also “throw” out of the >>>>>>> first >>>>>>>>>> execute so the rest (including result processing logic) is >>> ignored. >>>>>>>>>> >>>>>>>>>> For this here FLIP-73 we can (and should) ignore these >>> problems, >>>>>>> because >>>>>>>>>> FLIP-73 only moves the existing submission logic behind a >>> reusable >>>>>>>>>> abstraction and makes it usable via API. We should closely >>> follow >>>>>>> up on the >>>>>>>>>> above points though because I think they are also important. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Aljoscha >>>>>>>>>> >>>>>>>>>>> On 2. Oct 2019, at 12:08, Zili Chen <wander4...@gmail.com> >>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Thanks for your clarification Till. >>>>>>>>>>> >>>>>>>>>>> I agree with the current semantics of the per-job mode, one >>>>>>> should >>>>>>>>>> deploy a >>>>>>>>>>> new cluster for each part of the job. Apart from the >>> performance >>>>>>> concern >>>>>>>>>>> it also means that PerJobExecutor knows how to deploy a >>> cluster >>>>>>> actually, >>>>>>>>>>> which is different from the description that Executor submit >>> a >>>>>>> job. >>>>>>>>>>> >>>>>>>>>>> Anyway it sounds workable and narrow the changes. >>>>>>>>>> >>>>>>>>>> >>>>>>> >>>>>> >>> >