Thanks for your explanation Kostas to make it clear subtasks under FLIP-73.
As you described, changes of Environment are included in this FLIP. For "each API to have a single Environment", it could be helpful to describe which APIs we'd like to have after FLIP-73. And if we keep multiple Environments, shall we keep the way inject context environment for each API? Kostas Kloudas <kklou...@gmail.com> 于2019年10月3日周四 下午1:44写道: > Hi Tison, > > The changes that this FLIP propose are: > - the introduction of the Executor interface > - the fact that everything in the current state of job submission in > Flink can be defined through configuration parameters > - implementation of Executors that do not change any of the semantics > of the currently offered "modes" of job submission > > In this, and in the FLIP itself where the > ExecutionEnvironment.execute() method is described, there are details > about parts of the > integration with the existing Flink code-base. > > So I am not sure what do you mean by making the "integration a > follow-up discussion". > > Cheers, > Kostas > > On Wed, Oct 2, 2019 at 8:10 PM Zili Chen <wander4...@gmail.com> wrote: > > > > - for Preview/OptimizedPlanEnv: I think they are orthogonal to the > > Executors work, as they are using the exexute() method because this is > > the only "entry" to the user program. To this regard, I believe we > > should just see the fact that they have their dedicated environment as > > an "implementation detail". > > > > The proposal says > > > > In this document, we propose to abstract away from the Environments the > job > > submission logic and put it in a newly introduced Executor. This will > > allow *each > > API to have a single Environment* which, based on the provided > > configuration, will decide which executor to use, *e.g.* Yarn, Local, > etc. > > In addition, it will allow different APIs and downstream projects to > re-use > > the provided executors, thus limiting the amount of code duplication and > > the amount of code that has to be written. > > > > note that This will allow *each API to have a single Environment* it > > seems a bit diverge with you statement above. Or we say a single > Environment > > as a possible advantage after the introduction of Executor so that we > > exclude it > > from this pass. > > > > Best, > > tison. > > > > > > Zili Chen <wander4...@gmail.com> 于2019年10月3日周四 上午2:07写道: > > > > > BTW, correct me if I misunderstand, now I learn more about our > community > > > way. Since FLIP-73 aimed at introducing an interface with community > > > consensus the discussion is more about the interface in order to > properly > > > define a useful and extensible API. The integration story could be a > > > follow up > > > since this one does not affect current behavior at all. > > > > > > Best, > > > tison. > > > > > > > > > Zili Chen <wander4...@gmail.com> 于2019年10月3日周四 上午2:02写道: > > > > > >> Hi Kostas, > > >> > > >> It seems does no harm we have a configuration parameter of > > >> Executor#execute > > >> since we can merge this one with the one configured on Executor > created > > >> and > > >> let this one overwhelm that one. > > >> > > >> I can see it is useful that conceptually we can create an Executor > for a > > >> series jobs > > >> to the same cluster but with different job configuration per pipeline. > > >> > > >> Best, > > >> tison. > > >> > > >> > > >> Kostas Kloudas <kklou...@apache.org> 于2019年10月3日周四 上午1:37写道: > > >> > > >>> Hi again, > > >>> > > >>> I did not include this to my previous email, as this is related to > the > > >>> proposal on the FLIP itself. > > >>> > > >>> In the existing proposal, the Executor interface is the following. > > >>> > > >>> public interface Executor { > > >>> > > >>> JobExecutionResult execute(Pipeline pipeline) throws Exception; > > >>> > > >>> } > > >>> > > >>> This implies that all the necessary information for the execution of > a > > >>> Pipeline should be included in the Configuration passed in the > > >>> ExecutorFactory which instantiates the Executor itself. This should > > >>> include, for example, all the parameters currently supplied by the > > >>> ProgramOptions, which are conceptually not executor parameters but > > >>> rather parameters for the execution of the specific pipeline. To this > > >>> end, I would like to propose a change in the current Executor > > >>> interface showcased below: > > >>> > > >>> > > >>> public interface Executor { > > >>> > > >>> JobExecutionResult execute(Pipeline pipeline, Configuration > > >>> executionOptions) throws Exception; > > >>> > > >>> } > > >>> > > >>> The above will allow to have the Executor specific options passed in > > >>> the configuration given during executor instantiation, while the > > >>> pipeline specific options can be passed in the executionOptions. As a > > >>> positive side-effect, this will make Executors re-usable, i.e. > > >>> instantiate an executor and use it to execute multiple pipelines, if > > >>> in the future we choose to do so. > > >>> > > >>> Let me know what do you think, > > >>> Kostas > > >>> > > >>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <kklou...@apache.org> > > >>> wrote: > > >>> > > > >>> > Hi all, > > >>> > > > >>> > I agree with Tison that we should disentangle threads so that > people > > >>> > can work independently. > > >>> > > > >>> > For FLIP-73: > > >>> > - for Preview/OptimizedPlanEnv: I think they are orthogonal to the > > >>> > Executors work, as they are using the exexute() method because > this is > > >>> > the only "entry" to the user program. To this regard, I believe we > > >>> > should just see the fact that they have their dedicated > environment as > > >>> > an "implementation detail". > > >>> > - for getting rid of the per-job mode: as a first note, there was > > >>> > already a discussion here: > > >>> > > > >>> > https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E > > >>> > with many people, including myself, expressing their opinion. I am > > >>> > mentioning that to show that this topic already has some history > and > > >>> > the discussin does not start from scratch but there are already > some > > >>> > contradicting opinions. My opinion is that we should not get rid of > > >>> > the per-job mode but I agree that we should discuss about the > > >>> > semantics in more detail. Although in terms of code it may be > tempting > > >>> > to "merge" the two submission modes, one of the main benefits of > the > > >>> > per-job mode is isolation, both for resources and security, as the > > >>> > jobGraph to be executed is fixed and the cluster is "locked" just > for > > >>> > that specific graph. This would be violated by having a session > > >>> > cluster launched and having all the infrastrucutre (ports and > > >>> > endpoints) set for submittting to that cluster any job. > > >>> > - for getting rid of the "detached" mode: I agree with getting rid > of > > >>> > it but this implies some potential user-facing changes that should > be > > >>> > discussed. > > >>> > > > >>> > Given the above, I think that: > > >>> > 1) in the context of FLIP-73 we should not change any semantics but > > >>> > simply push the existing submission logic behind a reusable > > >>> > abstraction and make it usable via public APIs, as Aljoscha said. > > >>> > 2) as Till said, changing the semantics is beyond the scope of this > > >>> > FLIP and as Tison mentioned we should work towards decoupling > > >>> > discussions rather than the opposite. So let's discuss about the > > >>> > future of the per-job and detached modes in a separate thread. This > > >>> > will also allow to give the proper visibility to such an important > > >>> > topic. > > >>> > > > >>> > Cheers, > > >>> > Kostas > > >>> > > > >>> > On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <wander4...@gmail.com> > wrote: > > >>> > > > > >>> > > Thanks for your thoughts Aljoscha. > > >>> > > > > >>> > > Another question since FLIP-73 might contains refactors on > > >>> Environemnt: > > >>> > > shall we support > > >>> > > something like PreviewPlanEnvironment? If so, how? From a user > > >>> perspective > > >>> > > preview plan > > >>> > > is useful, by give visual view, to modify topos and configure > without > > >>> > > submit it. > > >>> > > > > >>> > > Best, > > >>> > > tison. > > >>> > > > > >>> > > > > >>> > > Aljoscha Krettek <aljos...@apache.org> 于2019年10月2日周三 下午10:10写道: > > >>> > > > > >>> > > > I agree with Till that we should not change the semantics of > > >>> per-job mode. > > >>> > > > In my opinion per-job mode means that the cluster (JobManager) > is > > >>> brought > > >>> > > > up with one job and it only executes that one job. There > should be > > >>> no open > > >>> > > > ports/anything that would allow submitting further jobs. This > is > > >>> very > > >>> > > > important for deployments in docker/Kubernetes or other > > >>> environments were > > >>> > > > you bring up jobs without necessarily having the notion of a > Flink > > >>> cluster. > > >>> > > > > > >>> > > > What this means for a user program that has multiple execute() > > >>> calls is > > >>> > > > that you will get a fresh cluster for each execute call. This > also > > >>> means, > > >>> > > > that further execute() calls will only happen if the “client” > is > > >>> still > > >>> > > > alive, because it is the one driving execution. Currently, this > > >>> only works > > >>> > > > if you start the job in “attached” mode. If you start in > > >>> “detached” mode > > >>> > > > only the first execute() will happen and the rest will be > ignored. > > >>> > > > > > >>> > > > This brings us to the tricky question about what to do about > > >>> “detached” > > >>> > > > and “attached”. In the long run, I would like to get rid of the > > >>> distinction > > >>> > > > and leave it up to the user program, by either blocking or not > on > > >>> the > > >>> > > > Future (or JobClient or whatnot) that job submission returns. > This, > > >>> > > > however, means that users cannot simply request “detached” > > >>> execution when > > >>> > > > using bin/flink, the user program has to “play along”. On the > > >>> other hand, > > >>> > > > “detached” mode is quite strange for the user program. The > > >>> execute() call > > >>> > > > either returns with a proper job result after the job ran (in > > >>> “attached” > > >>> > > > mode) or with a dummy result (in “detached” mode) right after > > >>> submission. I > > >>> > > > think this can even lead to weird cases where multiple > "execute()” > > >>> run in > > >>> > > > parallel. For per-job detached mode we also “throw” out of the > > >>> first > > >>> > > > execute so the rest (including result processing logic) is > ignored. > > >>> > > > > > >>> > > > For this here FLIP-73 we can (and should) ignore these > problems, > > >>> because > > >>> > > > FLIP-73 only moves the existing submission logic behind a > reusable > > >>> > > > abstraction and makes it usable via API. We should closely > follow > > >>> up on the > > >>> > > > above points though because I think they are also important. > > >>> > > > > > >>> > > > Best, > > >>> > > > Aljoscha > > >>> > > > > > >>> > > > > On 2. Oct 2019, at 12:08, Zili Chen <wander4...@gmail.com> > > >>> wrote: > > >>> > > > > > > >>> > > > > Thanks for your clarification Till. > > >>> > > > > > > >>> > > > > I agree with the current semantics of the per-job mode, one > > >>> should > > >>> > > > deploy a > > >>> > > > > new cluster for each part of the job. Apart from the > performance > > >>> concern > > >>> > > > > it also means that PerJobExecutor knows how to deploy a > cluster > > >>> actually, > > >>> > > > > which is different from the description that Executor submit > a > > >>> job. > > >>> > > > > > > >>> > > > > Anyway it sounds workable and narrow the changes. > > >>> > > > > > >>> > > > > > >>> > > >> >