The new edits look good to me.
Looking forward to the vote.

Thanks,
Zhu

Xintong Song <tonysong...@gmail.com> 于2020年9月4日周五 上午9:49写道:

> Thanks Till, the changes look good to me. Looking forward to the vote.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Sep 4, 2020 at 12:31 AM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
> > Thanks for the feedback Xintong and Zhu Zhu. I've added a bit more
> details
> > for the intended interface extensions, potential follow ups (removing the
> > AllocationIDs) and the question about whether to reuse or return a slot
> if
> > the profiles don't fully match.
> >
> > If nobody objects, then I would start a vote for this FLIP soon.
> >
> > Cheers,
> > Till
> >
> > On Mon, Aug 31, 2020 at 11:53 AM Zhu Zhu <reed...@gmail.com> wrote:
> >
> > > Thanks for the clarification @Till Rohrmann <trohrm...@apache.org>
> > >
> > > >> # Implications for the scheduling
> > > Agreed that it turned out to be different execution strategies for
> batch
> > > jobs.
> > > We can have a simple one first and improve it later.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Xintong Song <tonysong...@gmail.com> 于2020年8月31日周一 下午3:05写道:
> > >
> > >> Thanks for the clarification, @Till.
> > >>
> > >> - For FLIP-56, sounds good to me. I think there should be no problem
> > >> before
> > >> removing AllocationID. And even after replacing AllocationID, it
> should
> > >> only require limited effort to make FLIP-56 work with SlotID. I was
> just
> > >> trying to understand when the effort will be needed.
> > >>
> > >> - For offer/release slots between JM/TM, I think you are right.
> > >> Waiting on the confirmation for resource requirement decrease before
> > >> freeing the slot is quite equivalent to releasing slots through RM, in
> > >> terms of it practically preventing JM from releasing slots when the RM
> > is
> > >> absent. But this approach obviously requires less change to the
> current
> > >> mechanism.
> > >> Since the first problem can be solved by the declarative protocol, and
> > the
> > >> second problem can be addressed by this confirmation based approach,
> > ATM I
> > >> don't see any strong reason for changing to offering and releasing
> slots
> > >> through RM, especially considering the significant changes it
> requires.
> > >>
> > >> Thank you~
> > >>
> > >> Xintong Song
> > >>
> > >>
> > >>
> > >> On Fri, Aug 28, 2020 at 10:07 PM Till Rohrmann <trohrm...@apache.org>
> > >> wrote:
> > >>
> > >> > Thanks for creating this FLIP @Chesnay and the good input @Xintong
> and
> > >> @Zhu
> > >> > Zhu.
> > >> >
> > >> > Let me try to add some comments concerning your questions:
> > >> >
> > >> > # FLIP-56
> > >> >
> > >> > I think there is nothing fundamentally contradicting FLIP-56 in the
> > FLIP
> > >> > for declarative resource management. As Chesnay said, we have to
> keep
> > >> the
> > >> > AllocationID around as long as we have the old scheduler
> > implementation.
> > >> > Once it is replaced, we can think about using the SlotID instead of
> > >> > AllocationIDs for identifying allocated slots. For dynamic slots we
> > can
> > >> > keep the special meaning of a SlotID with a negative index. In the
> > >> future
> > >> > we might think about making this encoding a bit more explicit by
> > >> sending a
> > >> > richer slot request object and reporting the actual SlotID back to
> the
> > >> RM.
> > >> >
> > >> > For the question of resource utilization vs. deployment latency I
> > >> believe
> > >> > that this will be a question of requirements and preferences as
> you've
> > >> said
> > >> > Xintong. I can see that we will have different strategies to fulfill
> > the
> > >> > different needs.
> > >> >
> > >> > # Offer/free slots between JM/TM
> > >> >
> > >> > You are right Xintong that the existing slot protocol was developed
> > with
> > >> > the assumption in mind that the RM and JM can run in separate
> > processes
> > >> and
> > >> > that a failure of the RM should only affect the JM in the sense that
> > it
> > >> > cannot ask for more resources. I believe that one could simplify
> > things
> > >> a
> > >> > bit under the assumption that the RM and JM are always colocated in
> > the
> > >> > same process. However, the discussion whether to change it or not
> > should
> > >> > indeed be a separate one.
> > >> >
> > >> > Changing the slot protocol to a declarative resource management
> should
> > >> > already solve the first problem you have described because we won't
> > ask
> > >> for
> > >> > new slots in case of a failover but simply keep the same resource
> > >> > requirements declared and let the RM make sure that we will receive
> at
> > >> > least this amount of slots.
> > >> >
> > >> > If releasing a slot should lead to allocating new resources because
> > >> > decreasing the resource requirement declaration takes longer than
> > >> releasing
> > >> > the slot on the TM, then we could apply what Chesnay said. By
> waiting
> > on
> > >> > the confirmation of the resource requirement decrease and then
> freeing
> > >> the
> > >> > slot on the TM gives you effectively the same behaviour as if the
> > >> freeing
> > >> > of the slot would be done by the RM.
> > >> >
> > >> > I am not entirely sure whether allocating the slots and receiving
> the
> > >> slot
> > >> > offers through the RM will allow us to get rid of the pending slot
> > >> state on
> > >> > the RM side. If the RM needs to communicate with the TM and we want
> to
> > >> have
> > >> > a reconciliation protocol between these components, then I think we
> > >> would
> > >> > have to solve the exact same problem of currently waiting on the TM
> > for
> > >> > confirming that a slot has been allocated.
> > >> >
> > >> > # Implications for the scheduling
> > >> >
> > >> > The FLIP does not fully cover the changes for the scheduler and
> mainly
> > >> > drafts the rough idea. For the batch scheduling, I believe that we
> > have
> > >> a
> > >> > couple degrees of freedom in how to do things. In the scenario you
> > >> > described, one could choose a simple strategy where we wait for all
> > >> > producers to stop before deciding on the parallelism of the consumer
> > and
> > >> > scheduling the respective tasks (even though they have POINTWISE
> > >> BLOCKING
> > >> > edges). Or we can try to be smart and say if we get at least one
> slot
> > >> that
> > >> > we can run the consumers with the same parallelism as the producers
> it
> > >> just
> > >> > might be that we have to run them one after another in a single
> slot.
> > >> One
> > >> > advantage of not directly schedule the first consumer when the first
> > >> > producer is finished is that one might schedule the consumer stage
> > with
> > >> a
> > >> > higher parallelism because one might acquire more resources a bit
> > later.
> > >> > But I would see this as different execution strategies which have
> > >> different
> > >> > properties.
> > >> >
> > >> > Cheers,
> > >> > Till
> > >> >
> > >> > On Fri, Aug 28, 2020 at 11:21 AM Zhu Zhu <reed...@gmail.com> wrote:
> > >> >
> > >> > > Thanks for the explanation @Chesnay Schepler <ches...@apache.org>
> .
> > >> > >
> > >> > > Yes, for batch jobs it can be safe to schedule downstream vertices
> > if
> > >> > > there
> > >> > > are enough slots in the pool, even if these slots are still in use
> > at
> > >> > that
> > >> > > moment.
> > >> > > And the job can still progress even if the vertices stick to the
> > >> original
> > >> > > parallelism.
> > >> > >
> > >> > > Looks to me several decision makings can be different for
> streaming
> > >> and
> > >> > > batch jobs.
> > >> > > Looking forward to the follow-up FLIP on the lazy ExecutionGraph
> > >> > > construction!
> > >> > >
> > >> > > Thanks,
> > >> > > Zhu
> > >> > >
> > >> > > Chesnay Schepler <ches...@apache.org> 于2020年8月28日周五 下午4:35写道:
> > >> > >
> > >> > >> Maybe :)
> > >> > >>
> > >> > >> Imagine a case where the producer and consumer have the same
> > >> > >> ResourceProfile, or at least one where the consumer requirements
> > are
> > >> > less
> > >> > >> than the producer ones.
> > >> > >> In this case, the scheduler can happily schedule consumers,
> because
> > >> it
> > >> > >> knows it will get enough slots.
> > >> > >>
> > >> > >> If the profiles are different, then the Scheduler _may_ wait
> > >> > >> numberOf(producer) slots; it _may_ also stick with the
> parallelism
> > >> and
> > >> > >> schedule right away, in the worst case running the consumers in
> > >> > sequence.
> > >> > >> In fact, for batch jobs there is probably(?) never a reason for
> the
> > >> > >> scheduler to _reduce_ the parallelism; it can always try to run
> > >> things
> > >> > in
> > >> > >> sequence if it doesn't get enough slots.
> > >> > >> Reducing the parallelism would just mean that you'd have to wait
> > for
> > >> > more
> > >> > >> producers to finish.
> > >> > >>
> > >> > >> The scope of this FLIP is just the protocol, without changes to
> the
> > >> > >> scheduler; in other words just changing how slots are acquired,
> but
> > >> > change
> > >> > >> nothing about the scheduling. That is tackled in a follow-up
> FLIP.
> > >> > >>
> > >> > >> On 28/08/2020 07:34, Zhu Zhu wrote:
> > >> > >>
> > >> > >> Thanks for the response!
> > >> > >>
> > >> > >> >> The scheduler doesn't have to wait for one stage to finish
> > >> > >> Does it mean we will declare resources and decide the parallelism
> > >> for a
> > >> > >> stage which is partially
> > >> > >> schedulable, i.e. when input data are ready just for part of the
> > >> > >> execution vertices?
> > >> > >>
> > >> > >> >> This will get more complicated once we allow the scheduler to
> > >> change
> > >> > >> the parallelism while the job is running
> > >> > >> Agreed. Looks to me it's a problem for batch jobs only and can be
> > >> > avoided
> > >> > >> for streaming jobs.
> > >> > >> Will this FLIP limit its scope to streaming jobs, and
> improvements
> > >> for
> > >> > >> batch jobs are to be done later?
> > >> > >>
> > >> > >> Thanks,
> > >> > >> Zhu
> > >> > >>
> > >> > >> Chesnay Schepler <ches...@apache.org> 于2020年8月28日周五 上午2:27写道:
> > >> > >>
> > >> > >>> The scheduler doesn't have to wait for one stage to finish. It
> is
> > >> still
> > >> > >>> aware that the upstream execution vertex has finished, and can
> > >> > request/use
> > >> > >>> slots accordingly to schedule the consumer.
> > >> > >>>
> > >> > >>> This will get more complicated once we allow the scheduler to
> > change
> > >> > the
> > >> > >>> parallelism while the job is running, for which we will need
> some
> > >> > >>> enhancements to the network stack to allow the producer to run
> > >> without
> > >> > >>> knowing the consumer parallelism ahead of time. I'm not too
> clear
> > on
> > >> > the
> > >> > >>> details, but we'll some form of keygroup-like approach for sub
> > >> > partitions
> > >> > >>> (maxParallelism and all that).
> > >> > >>>
> > >> > >>> On 27/08/2020 20:05, Zhu Zhu wrote:
> > >> > >>>
> > >> > >>> Thanks Chesnay&Till for proposing this improvement.
> > >> > >>> It's of good value to allow jobs to make best use of available
> > >> > resources
> > >> > >>> adaptively. Not
> > >> > >>> to mention it further supports reactive mode.
> > >> > >>> So big +1 for it.
> > >> > >>>
> > >> > >>> I have a minor concern about possible regression in certain
> cases
> > >> due
> > >> > to
> > >> > >>> the proposed
> > >> > >>> JobVertex-wise scheduling which replaces current
> > >> ExecutionVertex-wise
> > >> > >>> scheduling.
> > >> > >>> In the proposal, looks to me it requires a stage to finish
> before
> > >> its
> > >> > >>> consumer stage can be
> > >> > >>> scheduled. This limitation, however, does not exist in current
> > >> > >>> scheduler. In the case that there
> > >> > >>> exists a POINTWISE BLOCKING edge, the downstream execution
> region
> > >> can
> > >> > be
> > >> > >>> scheduled
> > >> > >>> right after its connected upstream execution vertices finishes,
> > even
> > >> > >>> before the whole upstream
> > >> > >>> stage finishes. This allows the region to be launched earlier
> and
> > >> make
> > >> > >>> use of available resources.
> > >> > >>> Do we need to let the new scheduler retain this property?
> > >> > >>>
> > >> > >>> Thanks,
> > >> > >>> Zhu
> > >> > >>>
> > >> > >>> Xintong Song <tonysong...@gmail.com> 于2020年8月26日周三 下午6:59写道:
> > >> > >>>
> > >> > >>>> Thanks for the quick response.
> > >> > >>>>
> > >> > >>>> *Job prioritization, Allocation IDs, Minimum resource
> > >> > >>>> requirements, SlotManager Implementation Plan:* Sounds good to
> > me.
> > >> > >>>>
> > >> > >>>> *FLIP-56*
> > >> > >>>> Good point about the trade-off. I believe maximum resource
> > >> utilization
> > >> > >>>> and
> > >> > >>>> quick deployment are desired in different scenarios. E.g., a
> long
> > >> > >>>> running
> > >> > >>>> streaming job deserves some deployment latency to improve the
> > >> resource
> > >> > >>>> utilization, which benefits the entire lifecycle of the job. On
> > the
> > >> > >>>> other
> > >> > >>>> hand, short batch queries may prefer quick deployment,
> otherwise
> > >> the
> > >> > >>>> time
> > >> > >>>> for resource allocation might significantly increase the
> response
> > >> > time.
> > >> > >>>> It would be good enough for me to bring these questions to
> > >> attention.
> > >> > >>>> Nothing that I'm aware of should block this FLIP.
> > >> > >>>>
> > >> > >>>> Thank you~
> > >> > >>>>
> > >> > >>>> Xintong Song
> > >> > >>>>
> > >> > >>>>
> > >> > >>>>
> > >> > >>>> On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler <
> > >> ches...@apache.org>
> > >> > >>>> wrote:
> > >> > >>>>
> > >> > >>>> > Thank you Xintong for your questions!
> > >> > >>>> > Job prioritization
> > >> > >>>> > Yes, the job which declares it's initial requirements first
> is
> > >> > >>>> prioritized.
> > >> > >>>> > This is very much for simplicity; for example this avoids the
> > >> nasty
> > >> > >>>> case
> > >> > >>>> > where all jobs get some resources, but none get enough to
> > >> actually
> > >> > >>>> run the
> > >> > >>>> > job.
> > >> > >>>> > Minimum resource requirements
> > >> > >>>> >
> > >> > >>>> > My bad; at some point we want to allow the JobMaster to
> > declare a
> > >> > >>>> range of
> > >> > >>>> > resources it could use to run a job, for example min=1,
> > >> target=10,
> > >> > >>>> > max=+inf.
> > >> > >>>> >
> > >> > >>>> > With this model, the RM would then try to balance the
> resources
> > >> such
> > >> > >>>> that
> > >> > >>>> > as many jobs as possible are as close to the target state as
> > >> > possible.
> > >> > >>>> >
> > >> > >>>> > Currently, the minimum/target/maximum resources are all the
> > >> same. So
> > >> > >>>> the
> > >> > >>>> > notification is sent whenever the current requirements cannot
> > be
> > >> > met.
> > >> > >>>> > Allocation IDs
> > >> > >>>> > We do intend to, at the very least, remove AllocationIDs on
> the
> > >> > >>>> > SlotManager side, as they are just not required there.
> > >> > >>>> >
> > >> > >>>> > On the slotpool side we have to keep them around at least
> until
> > >> the
> > >> > >>>> > existing Slotpool implementations are removed (not sure
> whether
> > >> > we'll
> > >> > >>>> fully
> > >> > >>>> > commit to this in 1.12), since the interfaces use
> > AllocationIDs,
> > >> > >>>> which also
> > >> > >>>> > bleed into the JobMaster.
> > >> > >>>> > The TaskExecutor is in a similar position.
> > >> > >>>> > But in the long-term, yes they will be removed, and most
> usages
> > >> will
> > >> > >>>> > probably be replaced by the SlotID.
> > >> > >>>> > FLIP-56
> > >> > >>>> >
> > >> > >>>> > Dynamic slot allocations are indeed quite interesting and
> > raise a
> > >> > few
> > >> > >>>> > questions; for example, the main purpose of it is to ensure
> > >> maximum
> > >> > >>>> > resource utilization. In that case, should the JobMaster be
> > >> allowed
> > >> > to
> > >> > >>>> > re-use a slot it if the task requires less resources than the
> > >> slot
> > >> > >>>> > provides, or should it always request a new slot that exactly
> > >> > matches?
> > >> > >>>> >
> > >> > >>>> > There is a trade-off to be made between maximum resource
> > >> utilization
> > >> > >>>> > (request exactly matching slots, and only re-use exact
> matches)
> > >> and
> > >> > >>>> quicker
> > >> > >>>> > job deployment (re-use slot even if they don't exactly match,
> > >> skip
> > >> > >>>> > round-trip to RM).
> > >> > >>>> >
> > >> > >>>> > As for how to handle the lack of a preemptively known
> SlotIDs,
> > >> that
> > >> > >>>> should
> > >> > >>>> > be fine in and of itself; we already handle a similar case
> when
> > >> we
> > >> > >>>> request
> > >> > >>>> > a new TaskExecutor to be started. So long as there is some
> way
> > to
> > >> > >>>> know how
> > >> > >>>> > many resources the TaskExecutor has in total I do not see a
> > >> problem
> > >> > >>>> at the
> > >> > >>>> > moment. We will get the SlotID eventually by virtue of the
> > >> heartbeat
> > >> > >>>> > SlotReport.
> > >> > >>>> > Implementation plan (SlotManager)
> > >> > >>>> > You are on the right track. The SlotManager tracks the
> declared
> > >> > >>>> resource
> > >> > >>>> > requirements, and if the requirements increased it creates a
> > >> > >>>> SlotRequest,
> > >> > >>>> > which then goes through similar code paths as we have at the
> > >> moment
> > >> > >>>> (try to
> > >> > >>>> > find a free slot, if found tell the TM, otherwise try to
> > request
> > >> new
> > >> > >>>> TM).
> > >> > >>>> > The SlotManager changes are not that substantial to get a
> > working
> > >> > >>>> version;
> > >> > >>>> > we have a PoC and most of the work went into refactoring the
> > >> > >>>> SlotManager
> > >> > >>>> > into a more manageable state. (split into several components,
> > >> > >>>> stricter and
> > >> > >>>> > simplified Slot life-cycle, ...).
> > >> > >>>> > Offer/free slots between JM/TM
> > >> > >>>> > Gotta run, but that's a good question and I'll think about.
> > But I
> > >> > >>>> think it
> > >> > >>>> > comes down to making less changes, and being able to leverage
> > >> > existing
> > >> > >>>> > reconciliation protocols.
> > >> > >>>> > Do note that TaskExecutor also explicitly inform the RM about
> > >> freed
> > >> > >>>> slots;
> > >> > >>>> > the heartbeat slot report is just a safety net.
> > >> > >>>> > I'm not sure whether slot requests are able to overtake a
> slot
> > >> > >>>> release;
> > >> > >>>> > @till do you have thoughts on that?
> > >> > >>>> > As for the race condition between the requirements reduction
> > and
> > >> > slot
> > >> > >>>> > release, if we run into problems we have the backup plan of
> > only
> > >> > >>>> releasing
> > >> > >>>> > the slot after the requirement reduction has been
> acknowledged.
> > >> > >>>> >
> > >> > >>>> > On 26/08/2020 10:31, Xintong Song wrote:
> > >> > >>>> >
> > >> > >>>> > Thanks for preparing the FLIP and driving this discussion,
> > >> @Chesnay
> > >> > &
> > >> > >>>> @Till.
> > >> > >>>> >
> > >> > >>>> > I really like the idea. I see a great value in the proposed
> > >> > >>>> declarative
> > >> > >>>> > resource management, in terms of flexibility, usability and
> > >> > >>>> efficiency.
> > >> > >>>> >
> > >> > >>>> > I have a few comments and questions regarding the FLIP
> design.
> > In
> > >> > >>>> general,
> > >> > >>>> > the protocol design makes good sense to me. My main concern
> is
> > >> that
> > >> > >>>> it is
> > >> > >>>> > not very clear to me what changes are required from the
> > >> > >>>> > Resource/SlotManager side to adapt to the new protocol.
> > >> > >>>> >
> > >> > >>>> > *1. Distributed slots across different jobs*
> > >> > >>>> >
> > >> > >>>> > Jobs which register their requirements first, will have
> > >> precedence
> > >> > >>>> over
> > >> > >>>> >
> > >> > >>>> > other jobs also if the requirements change during the
> runtime.
> > >> > >>>> >
> > >> > >>>> > Just trying to understand, does this mean jobs are
> prioritized
> > by
> > >> > the
> > >> > >>>> order
> > >> > >>>> > of their first resource declaring?
> > >> > >>>> >
> > >> > >>>> > *2. AllocationID*
> > >> > >>>> >
> > >> > >>>> > Is this FLIP suggesting to completely remove AllocationID?
> > >> > >>>> >
> > >> > >>>> > I'm fine with this change. It seems where AllocationID is
> used
> > >> can
> > >> > >>>> either
> > >> > >>>> > be removed or be replaced by JobID. This reflects the concept
> > >> that
> > >> > >>>> slots
> > >> > >>>> > are now assigned to a job instead of its individual slot
> > >> requests.
> > >> > >>>> >
> > >> > >>>> > I would like to bring to attention that this also requires
> > >> changes
> > >> > on
> > >> > >>>> the
> > >> > >>>> > TM side, with respect to FLIP-56[1].
> > >> > >>>> >
> > >> > >>>> > In the context of dynamic slot allocation introduced by
> > FLIP-56,
> > >> > >>>> slots do
> > >> > >>>> > not pre-exist on TM and are dynamically created when RM calls
> > >> > >>>> > TaskExecutorGateway.requestSlot. Since the slots do not
> > >> pre-exist,
> > >> > nor
> > >> > >>>> > their SlotIDs, RM requests slots from TM with a special
> SlotID
> > >> > >>>> (negative
> > >> > >>>> > slot index). The semantic changes from "requesting the slot
> > >> > >>>> identified by
> > >> > >>>> > the given SlotID" to "requesting a slot with the given
> resource
> > >> > >>>> profile".
> > >> > >>>> > The AllocationID is used for identifying the dynamic slots in
> > >> such
> > >> > >>>> cases.
> > >> > >>>> >
> > >> > >>>> > >From the perspective of FLIP-56 and fine grained resource
> > >> > >>>> management, I'm
> > >> > >>>> > fine with removing AllocationID. In the meantime, we would
> need
> > >> TM
> > >> > to
> > >> > >>>> > recognize the special negative indexed SlotID and generate a
> > new
> > >> > >>>> unique
> > >> > >>>> > SlotID for identifying the slot.
> > >> > >>>> >
> > >> > >>>> > *3. Minimum resource requirement*
> > >> > >>>> >
> > >> > >>>> > However, we can let the JobMaster know if we cannot fulfill
> the
> > >> > >>>> minimum
> > >> > >>>> >
> > >> > >>>> > resource requirement for a job after
> > >> > >>>> > resourcemanager.standalone.start-up-time has passed.
> > >> > >>>> >
> > >> > >>>> > What is the "minimum resource requirement for a job"? Did I
> > >> overlook
> > >> > >>>> > anything?
> > >> > >>>> >
> > >> > >>>> > *4. Offer/free slots between JM/TM*
> > >> > >>>> >
> > >> > >>>> > This probably deserves a separate discussion thread. Just
> want
> > to
> > >> > >>>> bring it
> > >> > >>>> > up.
> > >> > >>>> >
> > >> > >>>> > The idea has been coming to me for quite some time. Is this
> > >> design,
> > >> > >>>> that JM
> > >> > >>>> > requests resources from RM while accepting/releasing
> resources
> > >> > >>>> from/to TM,
> > >> > >>>> > the right thing?
> > >> > >>>> >
> > >> > >>>> > The pain point is that events of JM's activities
> > >> > (requesting/releasing
> > >> > >>>> > resources) arrive at RM out of order. This leads to several
> > >> > problems.
> > >> > >>>> >
> > >> > >>>> >    - When a job fails and task cancelation takes long, some
> of
> > >> the
> > >> > >>>> slots
> > >> > >>>> >    might be released from the slot pool due to being unused
> > for a
> > >> > >>>> while. Then
> > >> > >>>> >    the job restarts and requests these slots again. At this
> > >> time, RM
> > >> > >>>> may
> > >> > >>>> >    receive slot requests before noticing from TM heartbeats
> > that
> > >> > >>>> previous
> > >> > >>>> >    slots are released, thus requesting new resources. I've
> seen
> > >> many
> > >> > >>>> times
> > >> > >>>> >    that the Yarn cluster has a heavy load and is not
> allocating
> > >> > >>>> resources
> > >> > >>>> >    quickly enough, which leads to slot request timeout and
> job
> > >> > >>>> failover, and
> > >> > >>>> >    during the failover more resources are requested which
> adds
> > >> more
> > >> > >>>> load to
> > >> > >>>> >    the Yarn cluster. Happily, this should be improved with
> the
> > >> > >>>> declarative
> > >> > >>>> >    resource management. :)
> > >> > >>>> >    - As described in this FLIP, it is possible that RM learns
> > the
> > >> > >>>> releasing
> > >> > >>>> >    of slots from TM heartbeat before noticing the resource
> > >> > requirement
> > >> > >>>> >    decreasing, it may allocate more resources which need to
> be
> > >> > >>>> released soon.
> > >> > >>>> >    - It complicates the ResourceManager/SlotManager, by
> > >> requiring an
> > >> > >>>> >    additional slot state PENDING, which means the slot is
> > >> assigned
> > >> > by
> > >> > >>>> RM but
> > >> > >>>> >    is not confirmed successfully ordered by TM.
> > >> > >>>> >
> > >> > >>>> > Why not just make RM offer the allocated resources (TM
> address,
> > >> > >>>> SlotID,
> > >> > >>>> > etc.) to JM, and JM release resources to RM? So that for all
> > the
> > >> > >>>> resource
> > >> > >>>> > management JM talks to RM, and for the task deployment and
> > >> execution
> > >> > >>>> it
> > >> > >>>> > talks to TM?
> > >> > >>>> >
> > >> > >>>> > I tried to understand the benefits for having the current
> > design,
> > >> > and
> > >> > >>>> found
> > >> > >>>> > the following in FLIP-6[2].
> > >> > >>>> >
> > >> > >>>> >
> > >> > >>>> > All that the ResourceManager does is negotiate between the
> > >> > >>>> > cluster-manager, the JobManager, and the TaskManagers. Its
> > state
> > >> can
> > >> > >>>> hence
> > >> > >>>> > be reconstructed from re-acquiring containers and
> > re-registration
> > >> > from
> > >> > >>>> > JobManagers and TaskManagers
> > >> > >>>> >
> > >> > >>>> > Correct me if I'm wrong, it seems the original purpose is to
> > make
> > >> > >>>> sure the
> > >> > >>>> > assignment between jobs and slots are confirmed between JM
> and
> > >> TMs,
> > >> > >>>> so that
> > >> > >>>> > failures of RM will not lead to any inconsistency. However,
> > this
> > >> > only
> > >> > >>>> > benefits scenarios where RM fails while JM and TMs live.
> > >> Currently,
> > >> > >>>> JM and
> > >> > >>>> > RM are in the same process. We do not really have any
> scenario
> > >> where
> > >> > >>>> RM
> > >> > >>>> > fails alone. We might separate JM and RM to different
> processes
> > >> in
> > >> > >>>> future,
> > >> > >>>> > but as far as I can see we don't have such requirements at
> the
> > >> > >>>> moment. It
> > >> > >>>> > seems to me that we are suffering the current problems,
> > >> complying to
> > >> > >>>> > potential future benefits.
> > >> > >>>> >
> > >> > >>>> > Maybe I overlooked something.
> > >> > >>>> >
> > >> > >>>> > *5. Implementation Plan*
> > >> > >>>> >
> > >> > >>>> > For SlotPool, it sounds quite straightforward to "aggregate
> > >> > >>>> individual slot
> > >> > >>>> > requests".
> > >> > >>>> >
> > >> > >>>> > For Resource/SlotManager, it seems there are quite a lot
> > changes
> > >> > >>>> needed,
> > >> > >>>> > with the removal of individual slot requests and
> AllocationID.
> > >> It's
> > >> > >>>> not
> > >> > >>>> > clear to me what is the first step plan for RM/SM? Do we
> > >> internally
> > >> > >>>> treat
> > >> > >>>> > the resource requirements as individual slot requests as the
> > >> first
> > >> > >>>> step, so
> > >> > >>>> > only the interfaces are changed? Or do we actually change
> > >> > (practically
> > >> > >>>> > re-write) the slot allocation logics?
> > >> > >>>> >
> > >> > >>>> > Thank you~
> > >> > >>>> >
> > >> > >>>> > Xintong Song
> > >> > >>>> >
> > >> > >>>> >
> > >> > >>>> > [1]
> > >> > >>>>
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
> > >> > >>>> > [2]
> > >> > >>>>
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
> > >> > >>>> >
> > >> > >>>> > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler <
> > >> > ches...@apache.org>
> > >> > >>>> <ches...@apache.org> wrote:
> > >> > >>>> >
> > >> > >>>> >
> > >> > >>>> > Hello,
> > >> > >>>> >
> > >> > >>>> > in FLIP-138 we want to rework the way the JobMaster acquires
> > >> slots,
> > >> > >>>> such
> > >> > >>>> > that required resources are declared before a job is
> scheduled
> > >> and
> > >> > th
> > >> > >>>> > job execution is adjusted according to the provided resources
> > >> (e.g.,
> > >> > >>>> > reducing parallelism), instead of asking for a fixed number
> of
> > >> > >>>> resources
> > >> > >>>> > during scheduling and failing midway through if not enough
> > >> resources
> > >> > >>>> are
> > >> > >>>> > available.
> > >> > >>>> >
> > >> > >>>> > This is a stepping stone towards reactive mode, where Flink
> > will
> > >> > >>>> > automatically make use of new TaskExecutors being started.
> > >> > >>>> >
> > >> > >>>> > More details can be found here
> > >> > >>>> > <
> > >> > >>>>
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
> > >> > >>>> >
> > >> > >>>> > .
> > >> > >>>> >
> > >> > >>>> >
> > >> > >>>> >
> > >> > >>>>
> > >> > >>>
> > >> > >>>
> > >> > >>
> > >> >
> > >>
> > >
> >
>

Reply via email to