Do you all think we could agree on the basic executor primitives and start 
voting on this FLIP? There are still some implementation details but I think we 
can discuss/tackle them when we get to them and the various people implementing 
this should be in close collaboration.

Best,
Aljoscha

> On 4. Oct 2019, at 10:15, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Hi,
> 
> I think the end goal is to have only one environment per API, but I think we 
> won’t be able to achieve that in the short-term because of backwards 
> compatibility. This is most notable with the context environment, preview 
> environments etc.
> 
> To keep this FLIP very slim we can make this only about the executors and 
> executor discovery. Anything else like job submission semantics, detached 
> mode, … can be tackled after this. If we don’t focus I’m afraid this will 
> drag on for quite a while.
> 
> One thing I would like to propose to make this easier is to change 
> Executor.execute() to return a CompletableFuture and to completely remove the 
> “detached” logic from ClusterClient. That way, the new components make no 
> distinction between “detached” and “attached” but we can still do it in the 
> CLI (via the ContextEnvironment) to support the existing “detached” behaviour 
> of the CLI that users expect. What do you think about this?
> 
> Best,
> Aljoscha
> 
>> On 3. Oct 2019, at 10:03, Zili Chen <wander4...@gmail.com> wrote:
>> 
>> Thanks for your explanation Kostas to make it clear subtasks under FLIP-73.
>> 
>> As you described, changes of Environment are included in this FLIP. For
>> "each
>> API to have a single Environment", it could be helpful to describe which
>> APIs we'd
>> like to have after FLIP-73. And if we keep multiple Environments, shall we
>> keep the
>> way inject context environment for each API?
>> 
>> 
>> Kostas Kloudas <kklou...@gmail.com> 于2019年10月3日周四 下午1:44写道:
>> 
>>> Hi Tison,
>>> 
>>> The changes that this FLIP propose are:
>>> - the introduction of the Executor interface
>>> - the fact that everything in the current state of job submission in
>>> Flink can be defined through configuration parameters
>>> - implementation of Executors that do not change any of the semantics
>>> of the currently offered "modes" of job submission
>>> 
>>> In this, and in the FLIP itself where the
>>> ExecutionEnvironment.execute() method is described, there are details
>>> about parts of the
>>> integration with the existing Flink code-base.
>>> 
>>> So I am not sure what do you mean by making the "integration a
>>> follow-up discussion".
>>> 
>>> Cheers,
>>> Kostas
>>> 
>>> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen <wander4...@gmail.com> wrote:
>>>> 
>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
>>>> Executors work, as they are using the exexute() method because this is
>>>> the only "entry" to the user program. To this regard, I believe we
>>>> should just see the fact that they have their dedicated environment as
>>>> an "implementation detail".
>>>> 
>>>> The proposal says
>>>> 
>>>> In this document, we propose to abstract away from the Environments the
>>> job
>>>> submission logic and put it in a newly introduced Executor. This will
>>>> allow *each
>>>> API to have a single Environment* which, based on the provided
>>>> configuration, will decide which executor to use, *e.g.* Yarn, Local,
>>> etc.
>>>> In addition, it will allow different APIs and downstream projects to
>>> re-use
>>>> the provided executors, thus limiting the amount of code duplication and
>>>> the amount of code that has to be written.
>>>> 
>>>> note that This will allow *each API to have a single Environment*  it
>>>> seems a bit diverge with you statement above. Or we say a single
>>> Environment
>>>> as a possible advantage after the introduction of Executor so that we
>>>> exclude it
>>>> from this pass.
>>>> 
>>>> Best,
>>>> tison.
>>>> 
>>>> 
>>>> Zili Chen <wander4...@gmail.com> 于2019年10月3日周四 上午2:07写道:
>>>> 
>>>>> BTW, correct me if I misunderstand, now I learn more about our
>>> community
>>>>> way. Since FLIP-73 aimed at introducing an interface with community
>>>>> consensus the discussion is more about the interface in order to
>>> properly
>>>>> define a useful and extensible API. The integration story could be a
>>>>> follow up
>>>>> since this one does not affect current behavior at all.
>>>>> 
>>>>> Best,
>>>>> tison.
>>>>> 
>>>>> 
>>>>> Zili Chen <wander4...@gmail.com> 于2019年10月3日周四 上午2:02写道:
>>>>> 
>>>>>> Hi Kostas,
>>>>>> 
>>>>>> It seems does no harm we have a configuration parameter of
>>>>>> Executor#execute
>>>>>> since we can merge this one with the one configured on Executor
>>> created
>>>>>> and
>>>>>> let this one overwhelm that one.
>>>>>> 
>>>>>> I can see it is useful that conceptually we can create an Executor
>>> for a
>>>>>> series jobs
>>>>>> to the same cluster but with different job configuration per pipeline.
>>>>>> 
>>>>>> Best,
>>>>>> tison.
>>>>>> 
>>>>>> 
>>>>>> Kostas Kloudas <kklou...@apache.org> 于2019年10月3日周四 上午1:37写道:
>>>>>> 
>>>>>>> Hi again,
>>>>>>> 
>>>>>>> I did not include this to my previous email, as this is related to
>>> the
>>>>>>> proposal on the FLIP itself.
>>>>>>> 
>>>>>>> In the existing proposal, the Executor interface is the following.
>>>>>>> 
>>>>>>> public interface Executor {
>>>>>>> 
>>>>>>> JobExecutionResult execute(Pipeline pipeline) throws Exception;
>>>>>>> 
>>>>>>> }
>>>>>>> 
>>>>>>> This implies that all the necessary information for the execution of
>>> a
>>>>>>> Pipeline should be included in the Configuration passed in the
>>>>>>> ExecutorFactory which instantiates the Executor itself. This should
>>>>>>> include, for example, all the parameters currently supplied by the
>>>>>>> ProgramOptions, which are conceptually not executor parameters but
>>>>>>> rather parameters for the execution of the specific pipeline. To this
>>>>>>> end, I would like to propose a change in the current Executor
>>>>>>> interface showcased below:
>>>>>>> 
>>>>>>> 
>>>>>>> public interface Executor {
>>>>>>> 
>>>>>>> JobExecutionResult execute(Pipeline pipeline, Configuration
>>>>>>> executionOptions) throws Exception;
>>>>>>> 
>>>>>>> }
>>>>>>> 
>>>>>>> The above will allow to have the Executor specific options passed in
>>>>>>> the configuration given during executor instantiation, while the
>>>>>>> pipeline specific options can be passed in the executionOptions. As a
>>>>>>> positive side-effect, this will make Executors re-usable, i.e.
>>>>>>> instantiate an executor and use it to execute multiple pipelines, if
>>>>>>> in the future we choose to do so.
>>>>>>> 
>>>>>>> Let me know what do you think,
>>>>>>> Kostas
>>>>>>> 
>>>>>>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <kklou...@apache.org>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>> Hi all,
>>>>>>>> 
>>>>>>>> I agree with Tison that we should disentangle threads so that
>>> people
>>>>>>>> can work independently.
>>>>>>>> 
>>>>>>>> For FLIP-73:
>>>>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
>>>>>>>> Executors work, as they are using the exexute() method because
>>> this is
>>>>>>>> the only "entry" to the user program. To this regard, I believe we
>>>>>>>> should just see the fact that they have their dedicated
>>> environment as
>>>>>>>> an "implementation detail".
>>>>>>>> - for getting rid of the per-job mode: as a first note, there was
>>>>>>>> already a discussion here:
>>>>>>>> 
>>>>>>> 
>>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>>>>>>>> with many people, including myself, expressing their opinion. I am
>>>>>>>> mentioning that to show that this topic already has some history
>>> and
>>>>>>>> the discussin does not start from scratch but there are already
>>> some
>>>>>>>> contradicting opinions. My opinion is that we should not get rid of
>>>>>>>> the per-job mode but I agree that we should discuss about the
>>>>>>>> semantics in more detail. Although in terms of code it may be
>>> tempting
>>>>>>>> to "merge" the two submission modes, one of the main benefits of
>>> the
>>>>>>>> per-job mode is isolation, both for resources and security, as the
>>>>>>>> jobGraph to be executed is fixed and the cluster is "locked" just
>>> for
>>>>>>>> that specific graph. This would be violated by having a session
>>>>>>>> cluster launched and having all the infrastrucutre (ports and
>>>>>>>> endpoints) set for submittting to that cluster any job.
>>>>>>>> - for getting rid of the "detached" mode: I agree with getting rid
>>> of
>>>>>>>> it but this implies some potential user-facing changes that should
>>> be
>>>>>>>> discussed.
>>>>>>>> 
>>>>>>>> Given the above, I think that:
>>>>>>>> 1) in the context of FLIP-73 we should not change any semantics but
>>>>>>>> simply push the existing submission logic behind a reusable
>>>>>>>> abstraction and make it usable via public APIs, as Aljoscha said.
>>>>>>>> 2) as Till said, changing the semantics is beyond the scope of this
>>>>>>>> FLIP and as Tison mentioned we should work towards decoupling
>>>>>>>> discussions rather than the opposite. So let's discuss about the
>>>>>>>> future of the per-job and detached modes in a separate thread. This
>>>>>>>> will also allow to give the proper visibility to such an important
>>>>>>>> topic.
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Kostas
>>>>>>>> 
>>>>>>>> On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <wander4...@gmail.com>
>>> wrote:
>>>>>>>>> 
>>>>>>>>> Thanks for your thoughts Aljoscha.
>>>>>>>>> 
>>>>>>>>> Another question since FLIP-73 might contains refactors on
>>>>>>> Environemnt:
>>>>>>>>> shall we support
>>>>>>>>> something like PreviewPlanEnvironment? If so, how? From a user
>>>>>>> perspective
>>>>>>>>> preview plan
>>>>>>>>> is useful, by give visual view, to modify topos and configure
>>> without
>>>>>>>>> submit it.
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> tison.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> 于2019年10月2日周三 下午10:10写道:
>>>>>>>>> 
>>>>>>>>>> I agree with Till that we should not change the semantics of
>>>>>>> per-job mode.
>>>>>>>>>> In my opinion per-job mode means that the cluster (JobManager)
>>> is
>>>>>>> brought
>>>>>>>>>> up with one job and it only executes that one job. There
>>> should be
>>>>>>> no open
>>>>>>>>>> ports/anything that would allow submitting further jobs. This
>>> is
>>>>>>> very
>>>>>>>>>> important for deployments in docker/Kubernetes or other
>>>>>>> environments were
>>>>>>>>>> you bring up jobs without necessarily having the notion of a
>>> Flink
>>>>>>> cluster.
>>>>>>>>>> 
>>>>>>>>>> What this means for a user program that has multiple execute()
>>>>>>> calls is
>>>>>>>>>> that you will get a fresh cluster for each execute call. This
>>> also
>>>>>>> means,
>>>>>>>>>> that further execute() calls will only happen if the “client”
>>> is
>>>>>>> still
>>>>>>>>>> alive, because it is the one driving execution. Currently, this
>>>>>>> only works
>>>>>>>>>> if you start the job in “attached” mode. If you start in
>>>>>>> “detached” mode
>>>>>>>>>> only the first execute() will happen and the rest will be
>>> ignored.
>>>>>>>>>> 
>>>>>>>>>> This brings us to the tricky question about what to do about
>>>>>>> “detached”
>>>>>>>>>> and “attached”. In the long run, I would like to get rid of the
>>>>>>> distinction
>>>>>>>>>> and leave it up to the user program, by either blocking or not
>>> on
>>>>>>> the
>>>>>>>>>> Future (or JobClient or whatnot) that job submission returns.
>>> This,
>>>>>>>>>> however, means that users cannot simply request “detached”
>>>>>>> execution when
>>>>>>>>>> using bin/flink, the user program has to “play along”. On the
>>>>>>> other hand,
>>>>>>>>>> “detached” mode is quite strange for the user program. The
>>>>>>> execute() call
>>>>>>>>>> either returns with a proper job result after the job ran (in
>>>>>>> “attached”
>>>>>>>>>> mode) or with a dummy result (in “detached” mode) right after
>>>>>>> submission. I
>>>>>>>>>> think this can even lead to weird cases where multiple
>>> "execute()”
>>>>>>> run in
>>>>>>>>>> parallel. For per-job detached mode we also “throw” out of the
>>>>>>> first
>>>>>>>>>> execute so the rest (including result processing logic) is
>>> ignored.
>>>>>>>>>> 
>>>>>>>>>> For this here FLIP-73 we can (and should) ignore these
>>> problems,
>>>>>>> because
>>>>>>>>>> FLIP-73 only moves the existing submission logic behind a
>>> reusable
>>>>>>>>>> abstraction and makes it usable via API. We should closely
>>> follow
>>>>>>> up on the
>>>>>>>>>> above points though because I think they are also important.
>>>>>>>>>> 
>>>>>>>>>> Best,
>>>>>>>>>> Aljoscha
>>>>>>>>>> 
>>>>>>>>>>> On 2. Oct 2019, at 12:08, Zili Chen <wander4...@gmail.com>
>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Thanks for your clarification Till.
>>>>>>>>>>> 
>>>>>>>>>>> I agree with the current semantics of the per-job mode, one
>>>>>>> should
>>>>>>>>>> deploy a
>>>>>>>>>>> new cluster for each part of the job. Apart from the
>>> performance
>>>>>>> concern
>>>>>>>>>>> it also means that PerJobExecutor knows how to deploy a
>>> cluster
>>>>>>> actually,
>>>>>>>>>>> which is different from the description that Executor submit
>>> a
>>>>>>> job.
>>>>>>>>>>> 
>>>>>>>>>>> Anyway it sounds workable and narrow the changes.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>> 
> 

Reply via email to