Thanks Till for the explanation! That looks good to me.

Thanks,
Zhu Zhu

Till Rohrmann <trohrm...@apache.org> 于2019年10月21日周一 上午2:45写道:

> Hi Zhu Zhu,
>
> the cluster partition does not need to be registered at the RM before it
> can be used. The cluster partition descriptor will be reported to the
> client as part of the job execution result. This information is used to
> construct a JobGraph which can consume from a cluster partition. The
> cluster partition descriptor contains all the information necessary to read
> the partition. Hence, a job consuming this partition will simply deploy the
> consumer on a TM and then read the cluster partition described by the
> cluster partition descriptor. If the partition is no longer available, then
> the job will fail and the client needs to handle the situation. If the
> client knows how to reprocess the partition, then it would submit the
> producing job.
>
> Cheers,
> Till
>
> On Sun, Oct 20, 2019 at 12:23 PM Zhu Zhu <reed...@gmail.com> wrote:
>
> > Thanks Chesnay for proposing this FLIP! And sorry for the late response
> on
> > it.
> > The FLIP overall looks good to me, except for one question.
> >
> > - If a cluster partition does not exist in RM, how can users tell whether
> > it is not produced yet, or it is already released?
> > Users/InteractiveQuery may need this information to decide to whether to
> > wait or re-execute the producer job.
> > One way I can think of is to also check the producer job's state --
> > unavailable partition of a finished job means the partition is released.
> > But as the cluster partition is notified to RM via TM heartbeat, there
> can
> > be bad case if job is finished but the partition is not updated to RM
> yet.
> > One solution of the bad case might be that TM notifies RM instantly when
> > partitions are promoted, as a supplementary to the TM heartbeat way. It
> > also shortens the time that a consumer job waits for a cluster partition
> to
> > become available, especially for a sequence of short lived jobs. This
> > however introduces JM dependency on RM on job finishes, which is
> unwanted.
> >
> >
> > Thanks,
> > Zhu Zhu
> >
> > Chesnay Schepler <ches...@apache.org> 于2019年10月15日周二 下午6:48写道:
> >
> >> I have updated the FLIP.
> >>
> >> - adopted job-/cluster partitions naming scheme
> >> - out-lined interface for new component living in the RM (currently
> >> called ThinShuffleMaster, but I'm not a fan of the name. Suggestions
> >> would be appreciated)
> >> - added a note that the ShuffleService changes are only necessary for
> >> external shuffle services, which could be omitted in a first version
> >>
> >> Unless there are objections I'll start a vote thread later today.
> >>
> >> On 14/10/2019 06:28, Zhijiang wrote:
> >> > Thanks for these further considerations Chesnay!
> >> >
> >> > I guess we might have some misunderstanding. Actually I was not
> >> > against the previous proposal Till suggested before, and I think it is
> >> > a formal way to do that.
> >> >
> >> > And my previous proposal was not for excluding the ShuffleService
> >> > completely. The ShuffleService can be regarded as a factory for
> >> > creating ShuffleMaster on JM/RM side and creating ShuffleEnvironment
> >> > on TE side.
> >> >
> >> >  *
> >> >     For the ShuffleEnvironment on TE side: I do not have concerns
> >> >     always. The TE receives RPC call for deleting local/global
> >> >     partitions and then handle them via ShuffleEnvironment, just the
> >> >     similar way as local partitions now.
> >> >  *
> >> >     For the ShuffleMaster side: I saw some previous disuccsions on
> >> >     multiple ShuffleMaster instances run in different components. I
> >> >     was not against this way in essence, but only wonder it might
> >> >     bring this feature complex to consider that. So my proposal was
> >> >     only for excluding ShuffleMaster if possible to make
> >> >     implementation a bit easy. I thought there might have a somewhat
> >> >     PartitionTracker component in RM for tracking/deleting global
> >> >     partitions, just as we did the way now in JM. The partition state
> >> >     is reported from TE and maintained in PartitionTracker of RM, and
> >> >     the PartitionTracker could trigger global partition release with
> >> >     TE gateway directly, and not further via ShuffleMaster(it is also
> >> >     stateless now). And actually in existing PartitionTrackerImpl in
> >> >     JM, the PRC call on TE#releasePartitions is also triggered not via
> >> >     ShuffleMaster in some cases, and it can be regareded as a shortcut
> >> >     way. Of course I am also in favour of via ShuffleMaster to call
> >> >     the actual release partition always, and the form seems elegant.
> >> >
> >> > I do not expect my inconsequential thought would block this feature
> >> > ongoing and disturb your previous conclusion. Moreover, Till's recent
> >> > reply already dispels my previous concern. :)
> >> >
> >> > Best,
> >> > Zhijiang
> >> >
> >> >     ------------------------------------------------------------------
> >> >     From:Chesnay Schepler <ches...@apache.org>
> >> >     Send Time:2019年10月14日(星期一) 07:00
> >> >     To:dev <dev@flink.apache.org>; Till Rohrmann
> >> >     <trohrm...@apache.org>; zhijiang <wangzhijiang...@aliyun.com
> >> .invalid>
> >> >     Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
> >> >
> >> >     I'm quite torn on whether to exclude the ShuffleServices from the
> >> >     proposal. I think I'm now on my third or fourth iteration for a
> >> >
> >>  response, so I'll just send both so I can stop thinking for a bit about
> >> >
> >> >     whether to push for one or the other:
> >> >
> >> >     Opinion A, aka "Nu Uh":
> >> >
> >> >
> >>      I'm not in favor of excluding the shuffle master from this
> proposal;
> >> >
> >>      I believe it raises interesting questions that should be discussed
> >> >
> >>      beforehand; otherwise we may just end up developing ourselves into
> a
> >> >         corner.
> >> >
> >>      Unless there are good reasons for doing so I'd prefer to keep the
> >> >         functionality across shuffle services consistent.
> >> >         And man, my last sentence is giving me headaches (how can you
> >> >
> >>      introduce inconsistencies across shuffle services if you don't even
> >> >         touch them?..)
> >> >
> >> >
> >>      Ultimately the RM only needs the ShuffleService for 2 things, which
> >> >         are fairly straight-forward:
> >> >
> >> >          1. list partitions
> >> >          2. delete partitions
> >> >
> >> >
> >>      Both of these are /exclusively /used via the REST APIs. In terms of
> >> >         scope I wanted this proposal to contain something that feels
> >> >         complete. If there is functionality to have a partition stick
> >> >
> >>      around, there needs to be a mechanism to delete it. Thus you also
> >> >
> >>      need a way to list them, simply for practical purposes. I do
> believe
> >> >
> >>      that without these this whole proposal is very much incomplete and
> >> >
> >>      would hate to see them excluded. It just /makes sense/ to have
> them.
> >> >         Yes, technically speak
> >> >
> >> >         Could we exclude the external shuffle services from this
> logic?
> >> >         Sure, but I'm quite worried that we will not tackle this
> problem
> >> >
> >>      again for 1.10, and if we don't we end up with really inconsistent
> >> >
> >>      behavior across versions. In 1.9 you can have local state in your
> >> >
> >>      master implementation, and, bar extraordinary circumstances, will
> >> >
> >>      get a release call for partition that was registered. In 1.10 that
> >> >
> >>      last part that goes down the drain, and in 1.X the last part is
> back
> >> >         in play but you can't have local state anymore since another
> >> >         instance is running on the RM.
> >> >
> >> >
> >>      Who is even supposed to keep up with that? It's still an interface
> >> >         that is exposed to every user. I don't think we should impose
> >> >         constraints in such a cut loose fashion.
> >> >
> >> >
> >>      At last, the fact that we can implement this in a way where it
> works
> >> >
> >>      for some shuffle services and not others should already be quite a
> >> >
> >>      red flag. The RM maybe shouldn't do any tracking and just forward
> >> >
> >>      the heartbeat payload to the ThinShuffleMaster present on the RM.
> >> >
> >> >     Opinion B, aka "technically it would be fine"
> >> >
> >> >
> >>      The counterpoint to the whole REST API completeness argument is
> that
> >> >
> >>      while the /runtime //supports /having partitions stick around,
> there
> >> >
> >>      is technically no way for anyone to enable such behavior at
> runtime.
> >> >         Hence, with no user-facing APIs to enable the feature, we
> don't
> >> >         necessarily need a user-facing API for management purposes,
> and
> >> >         could defer both to a later point where this feature is
> exposed
> >> >         fully to users.
> >> >
> >> >
> >>      But then it's hard to justify having any communication between the
> >> >
> >>      TE and RM at all; it literally serves no purpose. The TE could just
> >> >
> >>      keep cluster partitions around until the RM disconnects. Which
> would
> >> >
> >>      then also raise the question what exactly of substance is left in
> >> >         this proposal.
> >> >
> >> >     @Till yes, the RM should work against a different interface; I
> don't
> >> >
> >>  think anyone has argued against that. Let's put this point to rest. :)
> >> >
> >> >     On 13/10/2019 11:04, Till Rohrmann wrote:
> >> >
> >>  > I think we won't necessarily run multiple ShuffleMasters. I think it
> would
> >> >
> >>  > be better to pass in a leaner interface into the RM to only handle
> the
> >> >     > deletion of the global result partitions.
> >> >     >
> >> >
> >>  > Letting the TEs handle the deletion of the global result partitions
> might
> >> >
> >>  > work as long as we don't have an external shuffle service
> implementation.
> >> >
> >>  > Hence, it could be a first step to decrease complexity but in order
> to
> >> >     > complete this feature, I think we need to do it differently.
> >> >     >
> >> >     > Cheers,
> >> >     > Till
> >> >     >
> >> >     > On Sat, Oct 12, 2019 at 7:39 AM zhijiang <
> >> wangzhijiang...@aliyun.com.invalid>
> >> >     > wrote:
> >> >     >
> >> >
> >>  >> Sorry for delay catching up with the recent progress. Thanks for
> the FLIP
> >> >     >> update and valuable discussions!
> >> >     >>
> >> >
> >>  >> I also like the term of job/cluster partitions, and agree with most
> of the
> >> >     >> previous comments.
> >> >     >>
> >> >     >> Only left one concern of ShuffleMaster side:
> >> >
> >>  >>> However, if the separation of JM/RM into separate processes, as
> outlined
> >> >
> >>  >> in FLIP-6, is ever fully realized it necessarily implies that
> multiple
> >> >     >> shuffle master instances may exist for a given shuffle service.
> >> >     >>
> >> >
> >>  >> My previous thought was that one ShuffleService factory is for
> creating
> >> >
> >>  >> one shuffleMaster instance. If we have multiple ShuffleMaster
> instances, we
> >> >     >> might also need differentt ShuffleService factories.
> >> >
> >>  >> And it seems that different ShuffleMaster instances could run in
> different
> >> >     >> components based on demands, e.g. dispatcher, JM, RM.
> >> >     >>
> >> >
> >>  >> Is it also feasible to not touch the ShuffleMaster concept in this
> FLIP to
> >> >
> >>  >> make things a bit easy? I mean the ShuffleMaster is still running
> in JM
> >> >
> >>  >> component and is responsbile for job partitions. For the case of
> cluster
> >> >
> >>  >> partitions, the RM could interact with TE directly. TE would report
> global
> >> >
> >>  >> partitions as payloads via heartbeat with RM. And the RM could call
> >> >
> >>  >> TE#releaseGlobalPartitions directly not via ShuffleMaster.  Even
> the RM
> >> >
> >>  >> could also pass the global released partitions via payloads in
> heartbeat
> >> >
> >>  >> with TE to reduce additional explict RPC call, but this would bring
> some
> >> >     >> delays for releasing partition based on heartbeat interval.
> >> >     >>
> >> >     >> Best,
> >> >     >> Zhijiang
> >> >
> >>  >> ------------------------------------------------------------------
> >> >     >> From:Chesnay Schepler <ches...@apache.org>
> >> >     >> Send Time:2019年10月11日(星期五) 10:21
> >> >     >> To:dev <dev@flink.apache.org>; Till Rohrmann <
> >> trohrm...@apache.org>
> >> >     >> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
> >> >     >>
> >> >     >> ooooh I like job-/cluster partitions.
> >> >     >>
> >> >     >> On 10/10/2019 16:27, Till Rohrmann wrote:
> >> >
> >>  >>> I think we should introduce a separate interface for the
> ResourceManager
> >> >     >> so
> >> >
> >>  >>> that it can list and delete global result partitions from the
> shuffle
> >> >
> >>  >>> service implementation. As long as the JM and RM run in the same
> process,
> >> >
> >>  >>> this interface could be implemented by the ShuffleMaster
> implementations.
> >> >     >>> However, we should make sure that we don't introduce
> unnecessary
> >> >
> >>  >>> concurrency. If that should be the case, then it might be simpler
> to have
> >> >     >>> two separate components.
> >> >     >>>
> >> >     >>> Some ideas for the naming problem:
> >> >     >>>
> >> >     >>> local/global: job/cluster, intra/inter
> >> >     >>>
> >> >     >>> Cheers,
> >> >     >>> Till
> >> >     >>>
> >> >     >>> On Wed, Oct 9, 2019 at 1:35 PM Chesnay Schepler <
> >> ches...@apache.org>
> >> >     >> wrote:
> >> >     >>>> Are there any other opinions in regards to the naming scheme?
> >> >     >>>> (local/global, promote)
> >> >     >>>>
> >> >     >>>> On 06/09/2019 15:16, Chesnay Schepler wrote:
> >> >     >>>>> Hello,
> >> >     >>>>>
> >> >     >>>>> FLIP-36 (interactive programming)
> >> >     >>>>> <
> >> >     >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> >> >
> >>  >>>>> proposes a new programming paradigm where jobs are built
> incrementally
> >> >     >>>>> by the user.
> >> >     >>>>>
> >> >
> >>  >>>>> To support this in an efficient manner I propose to extend
> partition
> >> >
> >>  >>>>> life-cycle to support the notion of /global partitions/, which
> are
> >> >     >>>>> partitions that can exist beyond the life-time of a job.
> >> >     >>>>>
> >> >
> >>  >>>>> These partitions could then be re-used by subsequent jobs in a
> fairly
> >> >
> >>  >>>>> efficient manner, as they don't have to persisted to an external
> >> >
> >>  >>>>> storage first and consuming tasks could be scheduled to exploit
> >> >     >>>>> data-locality.
> >> >     >>>>>
> >> >
> >>  >>>>> The FLIP outlines the required changes on the JobMaster,
> TaskExecutor
> >> >
> >>  >>>>> and ResourceManager to support this from a life-cycle
> perspective.
> >> >     >>>>>
> >> >     >>>>> This FLIP does /not/ concern itself with the /usage/ of
> global
> >> >
> >>  >>>>> partitions, including client-side APIs, job-submission,
> scheduling and
> >> >
> >>  >>>>> reading said partitions; these are all follow-ups that will
> either be
> >> >     >>>>> part of FLIP-36 or spliced out into separate FLIPs.
> >> >     >>>>>
> >> >     >>>>>
> >> >     >>
> >> >
> >> >
> >>
> >>
>

Reply via email to