Thanks for these further considerations Chesnay!
I guess we might have some misunderstanding. Actually I was not
against the previous proposal Till suggested before, and I think it is
a formal way to do that.
And my previous proposal was not for excluding the ShuffleService
completely. The ShuffleService can be regarded as a factory for
creating ShuffleMaster on JM/RM side and creating ShuffleEnvironment
on TE side.
*
For the ShuffleEnvironment on TE side: I do not have concerns
always. The TE receives RPC call for deleting local/global
partitions and then handle them via ShuffleEnvironment, just the
similar way as local partitions now.
*
For the ShuffleMaster side: I saw some previous disuccsions on
multiple ShuffleMaster instances run in different components. I
was not against this way in essence, but only wonder it might
bring this feature complex to consider that. So my proposal was
only for excluding ShuffleMaster if possible to make
implementation a bit easy. I thought there might have a somewhat
PartitionTracker component in RM for tracking/deleting global
partitions, just as we did the way now in JM. The partition state
is reported from TE and maintained in PartitionTracker of RM, and
the PartitionTracker could trigger global partition release with
TE gateway directly, and not further via ShuffleMaster(it is also
stateless now). And actually in existing PartitionTrackerImpl in
JM, the PRC call on TE#releasePartitions is also triggered not via
ShuffleMaster in some cases, and it can be regareded as a shortcut
way. Of course I am also in favour of via ShuffleMaster to call
the actual release partition always, and the form seems elegant.
I do not expect my inconsequential thought would block this feature
ongoing and disturb your previous conclusion. Moreover, Till's recent
reply already dispels my previous concern. :)
Best,
Zhijiang
------------------------------------------------------------------
From:Chesnay Schepler <ches...@apache.org>
Send Time:2019年10月14日(星期一) 07:00
To:dev <dev@flink.apache.org>; Till Rohrmann
<trohrm...@apache.org>; zhijiang <wangzhijiang...@aliyun.com.invalid>
Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
I'm quite torn on whether to exclude the ShuffleServices from the
proposal. I think I'm now on my third or fourth iteration for a
response, so I'll just send both so I can stop thinking for a bit about
whether to push for one or the other:
Opinion A, aka "Nu Uh":
I'm not in favor of excluding the shuffle master from this proposal;
I believe it raises interesting questions that should be discussed
beforehand; otherwise we may just end up developing ourselves into a
corner.
Unless there are good reasons for doing so I'd prefer to keep the
functionality across shuffle services consistent.
And man, my last sentence is giving me headaches (how can you
introduce inconsistencies across shuffle services if you don't even
touch them?..)
Ultimately the RM only needs the ShuffleService for 2 things, which
are fairly straight-forward:
1. list partitions
2. delete partitions
Both of these are /exclusively /used via the REST APIs. In terms of
scope I wanted this proposal to contain something that feels
complete. If there is functionality to have a partition stick
around, there needs to be a mechanism to delete it. Thus you also
need a way to list them, simply for practical purposes. I do believe
that without these this whole proposal is very much incomplete and
would hate to see them excluded. It just /makes sense/ to have them.
Yes, technically speak
Could we exclude the external shuffle services from this logic?
Sure, but I'm quite worried that we will not tackle this problem
again for 1.10, and if we don't we end up with really inconsistent
behavior across versions. In 1.9 you can have local state in your
master implementation, and, bar extraordinary circumstances, will
get a release call for partition that was registered. In 1.10 that
last part that goes down the drain, and in 1.X the last part is back
in play but you can't have local state anymore since another
instance is running on the RM.
Who is even supposed to keep up with that? It's still an interface
that is exposed to every user. I don't think we should impose
constraints in such a cut loose fashion.
At last, the fact that we can implement this in a way where it works
for some shuffle services and not others should already be quite a
red flag. The RM maybe shouldn't do any tracking and just forward
the heartbeat payload to the ThinShuffleMaster present on the RM.
Opinion B, aka "technically it would be fine"
The counterpoint to the whole REST API completeness argument is that
while the /runtime //supports /having partitions stick around, there
is technically no way for anyone to enable such behavior at runtime.
Hence, with no user-facing APIs to enable the feature, we don't
necessarily need a user-facing API for management purposes, and
could defer both to a later point where this feature is exposed
fully to users.
But then it's hard to justify having any communication between the
TE and RM at all; it literally serves no purpose. The TE could just
keep cluster partitions around until the RM disconnects. Which would
then also raise the question what exactly of substance is left in
this proposal.
@Till yes, the RM should work against a different interface; I don't
think anyone has argued against that. Let's put this point to rest. :)
On 13/10/2019 11:04, Till Rohrmann wrote:
> I think we won't necessarily run multiple ShuffleMasters. I think it would
> be better to pass in a leaner interface into the RM to only handle the
> deletion of the global result partitions.
>
> Letting the TEs handle the deletion of the global result partitions might
> work as long as we don't have an external shuffle service implementation.
> Hence, it could be a first step to decrease complexity but in order to
> complete this feature, I think we need to do it differently.
>
> Cheers,
> Till
>
> On Sat, Oct 12, 2019 at 7:39 AM zhijiang
<wangzhijiang...@aliyun.com.invalid>
> wrote:
>
>> Sorry for delay catching up with the recent progress. Thanks for the FLIP
>> update and valuable discussions!
>>
>> I also like the term of job/cluster partitions, and agree with most of
the
>> previous comments.
>>
>> Only left one concern of ShuffleMaster side:
>>> However, if the separation of JM/RM into separate processes, as outlined
>> in FLIP-6, is ever fully realized it necessarily implies that multiple
>> shuffle master instances may exist for a given shuffle service.
>>
>> My previous thought was that one ShuffleService factory is for creating
>> one shuffleMaster instance. If we have multiple ShuffleMaster instances,
we
>> might also need differentt ShuffleService factories.
>> And it seems that different ShuffleMaster instances could run in
different
>> components based on demands, e.g. dispatcher, JM, RM.
>>
>> Is it also feasible to not touch the ShuffleMaster concept in this FLIP
to
>> make things a bit easy? I mean the ShuffleMaster is still running in JM
>> component and is responsbile for job partitions. For the case of cluster
>> partitions, the RM could interact with TE directly. TE would report
global
>> partitions as payloads via heartbeat with RM. And the RM could call
>> TE#releaseGlobalPartitions directly not via ShuffleMaster. Even the RM
>> could also pass the global released partitions via payloads in heartbeat
>> with TE to reduce additional explict RPC call, but this would bring some
>> delays for releasing partition based on heartbeat interval.
>>
>> Best,
>> Zhijiang
>> ------------------------------------------------------------------
>> From:Chesnay Schepler <ches...@apache.org>
>> Send Time:2019年10月11日(星期五) 10:21
>> To:dev <dev@flink.apache.org>; Till Rohrmann <trohrm...@apache.org>
>> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
>>
>> ooooh I like job-/cluster partitions.
>>
>> On 10/10/2019 16:27, Till Rohrmann wrote:
>>> I think we should introduce a separate interface for the ResourceManager
>> so
>>> that it can list and delete global result partitions from the shuffle
>>> service implementation. As long as the JM and RM run in the same
process,
>>> this interface could be implemented by the ShuffleMaster
implementations.
>>> However, we should make sure that we don't introduce unnecessary
>>> concurrency. If that should be the case, then it might be simpler to
have
>>> two separate components.
>>>
>>> Some ideas for the naming problem:
>>>
>>> local/global: job/cluster, intra/inter
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Oct 9, 2019 at 1:35 PM Chesnay Schepler <ches...@apache.org>
>> wrote:
>>>> Are there any other opinions in regards to the naming scheme?
>>>> (local/global, promote)
>>>>
>>>> On 06/09/2019 15:16, Chesnay Schepler wrote:
>>>>> Hello,
>>>>>
>>>>> FLIP-36 (interactive programming)
>>>>> <
>>
https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>>>>> proposes a new programming paradigm where jobs are built incrementally
>>>>> by the user.
>>>>>
>>>>> To support this in an efficient manner I propose to extend partition
>>>>> life-cycle to support the notion of /global partitions/, which are
>>>>> partitions that can exist beyond the life-time of a job.
>>>>>
>>>>> These partitions could then be re-used by subsequent jobs in a fairly
>>>>> efficient manner, as they don't have to persisted to an external
>>>>> storage first and consuming tasks could be scheduled to exploit
>>>>> data-locality.
>>>>>
>>>>> The FLIP outlines the required changes on the JobMaster, TaskExecutor
>>>>> and ResourceManager to support this from a life-cycle perspective.
>>>>>
>>>>> This FLIP does /not/ concern itself with the /usage/ of global
>>>>> partitions, including client-side APIs, job-submission, scheduling and
>>>>> reading said partitions; these are all follow-ups that will either be
>>>>> part of FLIP-36 or spliced out into separate FLIPs.
>>>>>
>>>>>
>>