Thanks Till for the explanation! That looks good to me. Thanks, Zhu Zhu
Till Rohrmann <trohrm...@apache.org> 于2019年10月21日周一 上午2:45写道: > Hi Zhu Zhu, > > the cluster partition does not need to be registered at the RM before it > can be used. The cluster partition descriptor will be reported to the > client as part of the job execution result. This information is used to > construct a JobGraph which can consume from a cluster partition. The > cluster partition descriptor contains all the information necessary to read > the partition. Hence, a job consuming this partition will simply deploy the > consumer on a TM and then read the cluster partition described by the > cluster partition descriptor. If the partition is no longer available, then > the job will fail and the client needs to handle the situation. If the > client knows how to reprocess the partition, then it would submit the > producing job. > > Cheers, > Till > > On Sun, Oct 20, 2019 at 12:23 PM Zhu Zhu <reed...@gmail.com> wrote: > > > Thanks Chesnay for proposing this FLIP! And sorry for the late response > on > > it. > > The FLIP overall looks good to me, except for one question. > > > > - If a cluster partition does not exist in RM, how can users tell whether > > it is not produced yet, or it is already released? > > Users/InteractiveQuery may need this information to decide to whether to > > wait or re-execute the producer job. > > One way I can think of is to also check the producer job's state -- > > unavailable partition of a finished job means the partition is released. > > But as the cluster partition is notified to RM via TM heartbeat, there > can > > be bad case if job is finished but the partition is not updated to RM > yet. > > One solution of the bad case might be that TM notifies RM instantly when > > partitions are promoted, as a supplementary to the TM heartbeat way. It > > also shortens the time that a consumer job waits for a cluster partition > to > > become available, especially for a sequence of short lived jobs. This > > however introduces JM dependency on RM on job finishes, which is > unwanted. > > > > > > Thanks, > > Zhu Zhu > > > > Chesnay Schepler <ches...@apache.org> 于2019年10月15日周二 下午6:48写道: > > > >> I have updated the FLIP. > >> > >> - adopted job-/cluster partitions naming scheme > >> - out-lined interface for new component living in the RM (currently > >> called ThinShuffleMaster, but I'm not a fan of the name. Suggestions > >> would be appreciated) > >> - added a note that the ShuffleService changes are only necessary for > >> external shuffle services, which could be omitted in a first version > >> > >> Unless there are objections I'll start a vote thread later today. > >> > >> On 14/10/2019 06:28, Zhijiang wrote: > >> > Thanks for these further considerations Chesnay! > >> > > >> > I guess we might have some misunderstanding. Actually I was not > >> > against the previous proposal Till suggested before, and I think it is > >> > a formal way to do that. > >> > > >> > And my previous proposal was not for excluding the ShuffleService > >> > completely. The ShuffleService can be regarded as a factory for > >> > creating ShuffleMaster on JM/RM side and creating ShuffleEnvironment > >> > on TE side. > >> > > >> > * > >> > For the ShuffleEnvironment on TE side: I do not have concerns > >> > always. The TE receives RPC call for deleting local/global > >> > partitions and then handle them via ShuffleEnvironment, just the > >> > similar way as local partitions now. > >> > * > >> > For the ShuffleMaster side: I saw some previous disuccsions on > >> > multiple ShuffleMaster instances run in different components. I > >> > was not against this way in essence, but only wonder it might > >> > bring this feature complex to consider that. So my proposal was > >> > only for excluding ShuffleMaster if possible to make > >> > implementation a bit easy. I thought there might have a somewhat > >> > PartitionTracker component in RM for tracking/deleting global > >> > partitions, just as we did the way now in JM. The partition state > >> > is reported from TE and maintained in PartitionTracker of RM, and > >> > the PartitionTracker could trigger global partition release with > >> > TE gateway directly, and not further via ShuffleMaster(it is also > >> > stateless now). And actually in existing PartitionTrackerImpl in > >> > JM, the PRC call on TE#releasePartitions is also triggered not via > >> > ShuffleMaster in some cases, and it can be regareded as a shortcut > >> > way. Of course I am also in favour of via ShuffleMaster to call > >> > the actual release partition always, and the form seems elegant. > >> > > >> > I do not expect my inconsequential thought would block this feature > >> > ongoing and disturb your previous conclusion. Moreover, Till's recent > >> > reply already dispels my previous concern. :) > >> > > >> > Best, > >> > Zhijiang > >> > > >> > ------------------------------------------------------------------ > >> > From:Chesnay Schepler <ches...@apache.org> > >> > Send Time:2019年10月14日(星期一) 07:00 > >> > To:dev <dev@flink.apache.org>; Till Rohrmann > >> > <trohrm...@apache.org>; zhijiang <wangzhijiang...@aliyun.com > >> .invalid> > >> > Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle > >> > > >> > I'm quite torn on whether to exclude the ShuffleServices from the > >> > proposal. I think I'm now on my third or fourth iteration for a > >> > > >> response, so I'll just send both so I can stop thinking for a bit about > >> > > >> > whether to push for one or the other: > >> > > >> > Opinion A, aka "Nu Uh": > >> > > >> > > >> I'm not in favor of excluding the shuffle master from this > proposal; > >> > > >> I believe it raises interesting questions that should be discussed > >> > > >> beforehand; otherwise we may just end up developing ourselves into > a > >> > corner. > >> > > >> Unless there are good reasons for doing so I'd prefer to keep the > >> > functionality across shuffle services consistent. > >> > And man, my last sentence is giving me headaches (how can you > >> > > >> introduce inconsistencies across shuffle services if you don't even > >> > touch them?..) > >> > > >> > > >> Ultimately the RM only needs the ShuffleService for 2 things, which > >> > are fairly straight-forward: > >> > > >> > 1. list partitions > >> > 2. delete partitions > >> > > >> > > >> Both of these are /exclusively /used via the REST APIs. In terms of > >> > scope I wanted this proposal to contain something that feels > >> > complete. If there is functionality to have a partition stick > >> > > >> around, there needs to be a mechanism to delete it. Thus you also > >> > > >> need a way to list them, simply for practical purposes. I do > believe > >> > > >> that without these this whole proposal is very much incomplete and > >> > > >> would hate to see them excluded. It just /makes sense/ to have > them. > >> > Yes, technically speak > >> > > >> > Could we exclude the external shuffle services from this > logic? > >> > Sure, but I'm quite worried that we will not tackle this > problem > >> > > >> again for 1.10, and if we don't we end up with really inconsistent > >> > > >> behavior across versions. In 1.9 you can have local state in your > >> > > >> master implementation, and, bar extraordinary circumstances, will > >> > > >> get a release call for partition that was registered. In 1.10 that > >> > > >> last part that goes down the drain, and in 1.X the last part is > back > >> > in play but you can't have local state anymore since another > >> > instance is running on the RM. > >> > > >> > > >> Who is even supposed to keep up with that? It's still an interface > >> > that is exposed to every user. I don't think we should impose > >> > constraints in such a cut loose fashion. > >> > > >> > > >> At last, the fact that we can implement this in a way where it > works > >> > > >> for some shuffle services and not others should already be quite a > >> > > >> red flag. The RM maybe shouldn't do any tracking and just forward > >> > > >> the heartbeat payload to the ThinShuffleMaster present on the RM. > >> > > >> > Opinion B, aka "technically it would be fine" > >> > > >> > > >> The counterpoint to the whole REST API completeness argument is > that > >> > > >> while the /runtime //supports /having partitions stick around, > there > >> > > >> is technically no way for anyone to enable such behavior at > runtime. > >> > Hence, with no user-facing APIs to enable the feature, we > don't > >> > necessarily need a user-facing API for management purposes, > and > >> > could defer both to a later point where this feature is > exposed > >> > fully to users. > >> > > >> > > >> But then it's hard to justify having any communication between the > >> > > >> TE and RM at all; it literally serves no purpose. The TE could just > >> > > >> keep cluster partitions around until the RM disconnects. Which > would > >> > > >> then also raise the question what exactly of substance is left in > >> > this proposal. > >> > > >> > @Till yes, the RM should work against a different interface; I > don't > >> > > >> think anyone has argued against that. Let's put this point to rest. :) > >> > > >> > On 13/10/2019 11:04, Till Rohrmann wrote: > >> > > >> > I think we won't necessarily run multiple ShuffleMasters. I think it > would > >> > > >> > be better to pass in a leaner interface into the RM to only handle > the > >> > > deletion of the global result partitions. > >> > > > >> > > >> > Letting the TEs handle the deletion of the global result partitions > might > >> > > >> > work as long as we don't have an external shuffle service > implementation. > >> > > >> > Hence, it could be a first step to decrease complexity but in order > to > >> > > complete this feature, I think we need to do it differently. > >> > > > >> > > Cheers, > >> > > Till > >> > > > >> > > On Sat, Oct 12, 2019 at 7:39 AM zhijiang < > >> wangzhijiang...@aliyun.com.invalid> > >> > > wrote: > >> > > > >> > > >> >> Sorry for delay catching up with the recent progress. Thanks for > the FLIP > >> > >> update and valuable discussions! > >> > >> > >> > > >> >> I also like the term of job/cluster partitions, and agree with most > of the > >> > >> previous comments. > >> > >> > >> > >> Only left one concern of ShuffleMaster side: > >> > > >> >>> However, if the separation of JM/RM into separate processes, as > outlined > >> > > >> >> in FLIP-6, is ever fully realized it necessarily implies that > multiple > >> > >> shuffle master instances may exist for a given shuffle service. > >> > >> > >> > > >> >> My previous thought was that one ShuffleService factory is for > creating > >> > > >> >> one shuffleMaster instance. If we have multiple ShuffleMaster > instances, we > >> > >> might also need differentt ShuffleService factories. > >> > > >> >> And it seems that different ShuffleMaster instances could run in > different > >> > >> components based on demands, e.g. dispatcher, JM, RM. > >> > >> > >> > > >> >> Is it also feasible to not touch the ShuffleMaster concept in this > FLIP to > >> > > >> >> make things a bit easy? I mean the ShuffleMaster is still running > in JM > >> > > >> >> component and is responsbile for job partitions. For the case of > cluster > >> > > >> >> partitions, the RM could interact with TE directly. TE would report > global > >> > > >> >> partitions as payloads via heartbeat with RM. And the RM could call > >> > > >> >> TE#releaseGlobalPartitions directly not via ShuffleMaster. Even > the RM > >> > > >> >> could also pass the global released partitions via payloads in > heartbeat > >> > > >> >> with TE to reduce additional explict RPC call, but this would bring > some > >> > >> delays for releasing partition based on heartbeat interval. > >> > >> > >> > >> Best, > >> > >> Zhijiang > >> > > >> >> ------------------------------------------------------------------ > >> > >> From:Chesnay Schepler <ches...@apache.org> > >> > >> Send Time:2019年10月11日(星期五) 10:21 > >> > >> To:dev <dev@flink.apache.org>; Till Rohrmann < > >> trohrm...@apache.org> > >> > >> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle > >> > >> > >> > >> ooooh I like job-/cluster partitions. > >> > >> > >> > >> On 10/10/2019 16:27, Till Rohrmann wrote: > >> > > >> >>> I think we should introduce a separate interface for the > ResourceManager > >> > >> so > >> > > >> >>> that it can list and delete global result partitions from the > shuffle > >> > > >> >>> service implementation. As long as the JM and RM run in the same > process, > >> > > >> >>> this interface could be implemented by the ShuffleMaster > implementations. > >> > >>> However, we should make sure that we don't introduce > unnecessary > >> > > >> >>> concurrency. If that should be the case, then it might be simpler > to have > >> > >>> two separate components. > >> > >>> > >> > >>> Some ideas for the naming problem: > >> > >>> > >> > >>> local/global: job/cluster, intra/inter > >> > >>> > >> > >>> Cheers, > >> > >>> Till > >> > >>> > >> > >>> On Wed, Oct 9, 2019 at 1:35 PM Chesnay Schepler < > >> ches...@apache.org> > >> > >> wrote: > >> > >>>> Are there any other opinions in regards to the naming scheme? > >> > >>>> (local/global, promote) > >> > >>>> > >> > >>>> On 06/09/2019 15:16, Chesnay Schepler wrote: > >> > >>>>> Hello, > >> > >>>>> > >> > >>>>> FLIP-36 (interactive programming) > >> > >>>>> < > >> > >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink > >> > > >> >>>>> proposes a new programming paradigm where jobs are built > incrementally > >> > >>>>> by the user. > >> > >>>>> > >> > > >> >>>>> To support this in an efficient manner I propose to extend > partition > >> > > >> >>>>> life-cycle to support the notion of /global partitions/, which > are > >> > >>>>> partitions that can exist beyond the life-time of a job. > >> > >>>>> > >> > > >> >>>>> These partitions could then be re-used by subsequent jobs in a > fairly > >> > > >> >>>>> efficient manner, as they don't have to persisted to an external > >> > > >> >>>>> storage first and consuming tasks could be scheduled to exploit > >> > >>>>> data-locality. > >> > >>>>> > >> > > >> >>>>> The FLIP outlines the required changes on the JobMaster, > TaskExecutor > >> > > >> >>>>> and ResourceManager to support this from a life-cycle > perspective. > >> > >>>>> > >> > >>>>> This FLIP does /not/ concern itself with the /usage/ of > global > >> > > >> >>>>> partitions, including client-side APIs, job-submission, > scheduling and > >> > > >> >>>>> reading said partitions; these are all follow-ups that will > either be > >> > >>>>> part of FLIP-36 or spliced out into separate FLIPs. > >> > >>>>> > >> > >>>>> > >> > >> > >> > > >> > > >> > >> >