Thanks for the clarification @Till Rohrmann <trohrm...@apache.org>

>> # Implications for the scheduling
Agreed that it turned out to be different execution strategies for batch
jobs.
We can have a simple one first and improve it later.

Thanks,
Zhu

Xintong Song <tonysong...@gmail.com> 于2020年8月31日周一 下午3:05写道:

> Thanks for the clarification, @Till.
>
> - For FLIP-56, sounds good to me. I think there should be no problem before
> removing AllocationID. And even after replacing AllocationID, it should
> only require limited effort to make FLIP-56 work with SlotID. I was just
> trying to understand when the effort will be needed.
>
> - For offer/release slots between JM/TM, I think you are right.
> Waiting on the confirmation for resource requirement decrease before
> freeing the slot is quite equivalent to releasing slots through RM, in
> terms of it practically preventing JM from releasing slots when the RM is
> absent. But this approach obviously requires less change to the current
> mechanism.
> Since the first problem can be solved by the declarative protocol, and the
> second problem can be addressed by this confirmation based approach, ATM I
> don't see any strong reason for changing to offering and releasing slots
> through RM, especially considering the significant changes it requires.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Aug 28, 2020 at 10:07 PM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
> > Thanks for creating this FLIP @Chesnay and the good input @Xintong and
> @Zhu
> > Zhu.
> >
> > Let me try to add some comments concerning your questions:
> >
> > # FLIP-56
> >
> > I think there is nothing fundamentally contradicting FLIP-56 in the FLIP
> > for declarative resource management. As Chesnay said, we have to keep the
> > AllocationID around as long as we have the old scheduler implementation.
> > Once it is replaced, we can think about using the SlotID instead of
> > AllocationIDs for identifying allocated slots. For dynamic slots we can
> > keep the special meaning of a SlotID with a negative index. In the future
> > we might think about making this encoding a bit more explicit by sending
> a
> > richer slot request object and reporting the actual SlotID back to the
> RM.
> >
> > For the question of resource utilization vs. deployment latency I believe
> > that this will be a question of requirements and preferences as you've
> said
> > Xintong. I can see that we will have different strategies to fulfill the
> > different needs.
> >
> > # Offer/free slots between JM/TM
> >
> > You are right Xintong that the existing slot protocol was developed with
> > the assumption in mind that the RM and JM can run in separate processes
> and
> > that a failure of the RM should only affect the JM in the sense that it
> > cannot ask for more resources. I believe that one could simplify things a
> > bit under the assumption that the RM and JM are always colocated in the
> > same process. However, the discussion whether to change it or not should
> > indeed be a separate one.
> >
> > Changing the slot protocol to a declarative resource management should
> > already solve the first problem you have described because we won't ask
> for
> > new slots in case of a failover but simply keep the same resource
> > requirements declared and let the RM make sure that we will receive at
> > least this amount of slots.
> >
> > If releasing a slot should lead to allocating new resources because
> > decreasing the resource requirement declaration takes longer than
> releasing
> > the slot on the TM, then we could apply what Chesnay said. By waiting on
> > the confirmation of the resource requirement decrease and then freeing
> the
> > slot on the TM gives you effectively the same behaviour as if the freeing
> > of the slot would be done by the RM.
> >
> > I am not entirely sure whether allocating the slots and receiving the
> slot
> > offers through the RM will allow us to get rid of the pending slot state
> on
> > the RM side. If the RM needs to communicate with the TM and we want to
> have
> > a reconciliation protocol between these components, then I think we would
> > have to solve the exact same problem of currently waiting on the TM for
> > confirming that a slot has been allocated.
> >
> > # Implications for the scheduling
> >
> > The FLIP does not fully cover the changes for the scheduler and mainly
> > drafts the rough idea. For the batch scheduling, I believe that we have a
> > couple degrees of freedom in how to do things. In the scenario you
> > described, one could choose a simple strategy where we wait for all
> > producers to stop before deciding on the parallelism of the consumer and
> > scheduling the respective tasks (even though they have POINTWISE BLOCKING
> > edges). Or we can try to be smart and say if we get at least one slot
> that
> > we can run the consumers with the same parallelism as the producers it
> just
> > might be that we have to run them one after another in a single slot. One
> > advantage of not directly schedule the first consumer when the first
> > producer is finished is that one might schedule the consumer stage with a
> > higher parallelism because one might acquire more resources a bit later.
> > But I would see this as different execution strategies which have
> different
> > properties.
> >
> > Cheers,
> > Till
> >
> > On Fri, Aug 28, 2020 at 11:21 AM Zhu Zhu <reed...@gmail.com> wrote:
> >
> > > Thanks for the explanation @Chesnay Schepler <ches...@apache.org> .
> > >
> > > Yes, for batch jobs it can be safe to schedule downstream vertices if
> > > there
> > > are enough slots in the pool, even if these slots are still in use at
> > that
> > > moment.
> > > And the job can still progress even if the vertices stick to the
> original
> > > parallelism.
> > >
> > > Looks to me several decision makings can be different for streaming and
> > > batch jobs.
> > > Looking forward to the follow-up FLIP on the lazy ExecutionGraph
> > > construction!
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Chesnay Schepler <ches...@apache.org> 于2020年8月28日周五 下午4:35写道:
> > >
> > >> Maybe :)
> > >>
> > >> Imagine a case where the producer and consumer have the same
> > >> ResourceProfile, or at least one where the consumer requirements are
> > less
> > >> than the producer ones.
> > >> In this case, the scheduler can happily schedule consumers, because it
> > >> knows it will get enough slots.
> > >>
> > >> If the profiles are different, then the Scheduler _may_ wait
> > >> numberOf(producer) slots; it _may_ also stick with the parallelism and
> > >> schedule right away, in the worst case running the consumers in
> > sequence.
> > >> In fact, for batch jobs there is probably(?) never a reason for the
> > >> scheduler to _reduce_ the parallelism; it can always try to run things
> > in
> > >> sequence if it doesn't get enough slots.
> > >> Reducing the parallelism would just mean that you'd have to wait for
> > more
> > >> producers to finish.
> > >>
> > >> The scope of this FLIP is just the protocol, without changes to the
> > >> scheduler; in other words just changing how slots are acquired, but
> > change
> > >> nothing about the scheduling. That is tackled in a follow-up FLIP.
> > >>
> > >> On 28/08/2020 07:34, Zhu Zhu wrote:
> > >>
> > >> Thanks for the response!
> > >>
> > >> >> The scheduler doesn't have to wait for one stage to finish
> > >> Does it mean we will declare resources and decide the parallelism for
> a
> > >> stage which is partially
> > >> schedulable, i.e. when input data are ready just for part of the
> > >> execution vertices?
> > >>
> > >> >> This will get more complicated once we allow the scheduler to
> change
> > >> the parallelism while the job is running
> > >> Agreed. Looks to me it's a problem for batch jobs only and can be
> > avoided
> > >> for streaming jobs.
> > >> Will this FLIP limit its scope to streaming jobs, and improvements for
> > >> batch jobs are to be done later?
> > >>
> > >> Thanks,
> > >> Zhu
> > >>
> > >> Chesnay Schepler <ches...@apache.org> 于2020年8月28日周五 上午2:27写道:
> > >>
> > >>> The scheduler doesn't have to wait for one stage to finish. It is
> still
> > >>> aware that the upstream execution vertex has finished, and can
> > request/use
> > >>> slots accordingly to schedule the consumer.
> > >>>
> > >>> This will get more complicated once we allow the scheduler to change
> > the
> > >>> parallelism while the job is running, for which we will need some
> > >>> enhancements to the network stack to allow the producer to run
> without
> > >>> knowing the consumer parallelism ahead of time. I'm not too clear on
> > the
> > >>> details, but we'll some form of keygroup-like approach for sub
> > partitions
> > >>> (maxParallelism and all that).
> > >>>
> > >>> On 27/08/2020 20:05, Zhu Zhu wrote:
> > >>>
> > >>> Thanks Chesnay&Till for proposing this improvement.
> > >>> It's of good value to allow jobs to make best use of available
> > resources
> > >>> adaptively. Not
> > >>> to mention it further supports reactive mode.
> > >>> So big +1 for it.
> > >>>
> > >>> I have a minor concern about possible regression in certain cases due
> > to
> > >>> the proposed
> > >>> JobVertex-wise scheduling which replaces current ExecutionVertex-wise
> > >>> scheduling.
> > >>> In the proposal, looks to me it requires a stage to finish before its
> > >>> consumer stage can be
> > >>> scheduled. This limitation, however, does not exist in current
> > >>> scheduler. In the case that there
> > >>> exists a POINTWISE BLOCKING edge, the downstream execution region can
> > be
> > >>> scheduled
> > >>> right after its connected upstream execution vertices finishes, even
> > >>> before the whole upstream
> > >>> stage finishes. This allows the region to be launched earlier and
> make
> > >>> use of available resources.
> > >>> Do we need to let the new scheduler retain this property?
> > >>>
> > >>> Thanks,
> > >>> Zhu
> > >>>
> > >>> Xintong Song <tonysong...@gmail.com> 于2020年8月26日周三 下午6:59写道:
> > >>>
> > >>>> Thanks for the quick response.
> > >>>>
> > >>>> *Job prioritization, Allocation IDs, Minimum resource
> > >>>> requirements, SlotManager Implementation Plan:* Sounds good to me.
> > >>>>
> > >>>> *FLIP-56*
> > >>>> Good point about the trade-off. I believe maximum resource
> utilization
> > >>>> and
> > >>>> quick deployment are desired in different scenarios. E.g., a long
> > >>>> running
> > >>>> streaming job deserves some deployment latency to improve the
> resource
> > >>>> utilization, which benefits the entire lifecycle of the job. On the
> > >>>> other
> > >>>> hand, short batch queries may prefer quick deployment, otherwise the
> > >>>> time
> > >>>> for resource allocation might significantly increase the response
> > time.
> > >>>> It would be good enough for me to bring these questions to
> attention.
> > >>>> Nothing that I'm aware of should block this FLIP.
> > >>>>
> > >>>> Thank you~
> > >>>>
> > >>>> Xintong Song
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler <
> ches...@apache.org>
> > >>>> wrote:
> > >>>>
> > >>>> > Thank you Xintong for your questions!
> > >>>> > Job prioritization
> > >>>> > Yes, the job which declares it's initial requirements first is
> > >>>> prioritized.
> > >>>> > This is very much for simplicity; for example this avoids the
> nasty
> > >>>> case
> > >>>> > where all jobs get some resources, but none get enough to actually
> > >>>> run the
> > >>>> > job.
> > >>>> > Minimum resource requirements
> > >>>> >
> > >>>> > My bad; at some point we want to allow the JobMaster to declare a
> > >>>> range of
> > >>>> > resources it could use to run a job, for example min=1, target=10,
> > >>>> > max=+inf.
> > >>>> >
> > >>>> > With this model, the RM would then try to balance the resources
> such
> > >>>> that
> > >>>> > as many jobs as possible are as close to the target state as
> > possible.
> > >>>> >
> > >>>> > Currently, the minimum/target/maximum resources are all the same.
> So
> > >>>> the
> > >>>> > notification is sent whenever the current requirements cannot be
> > met.
> > >>>> > Allocation IDs
> > >>>> > We do intend to, at the very least, remove AllocationIDs on the
> > >>>> > SlotManager side, as they are just not required there.
> > >>>> >
> > >>>> > On the slotpool side we have to keep them around at least until
> the
> > >>>> > existing Slotpool implementations are removed (not sure whether
> > we'll
> > >>>> fully
> > >>>> > commit to this in 1.12), since the interfaces use AllocationIDs,
> > >>>> which also
> > >>>> > bleed into the JobMaster.
> > >>>> > The TaskExecutor is in a similar position.
> > >>>> > But in the long-term, yes they will be removed, and most usages
> will
> > >>>> > probably be replaced by the SlotID.
> > >>>> > FLIP-56
> > >>>> >
> > >>>> > Dynamic slot allocations are indeed quite interesting and raise a
> > few
> > >>>> > questions; for example, the main purpose of it is to ensure
> maximum
> > >>>> > resource utilization. In that case, should the JobMaster be
> allowed
> > to
> > >>>> > re-use a slot it if the task requires less resources than the slot
> > >>>> > provides, or should it always request a new slot that exactly
> > matches?
> > >>>> >
> > >>>> > There is a trade-off to be made between maximum resource
> utilization
> > >>>> > (request exactly matching slots, and only re-use exact matches)
> and
> > >>>> quicker
> > >>>> > job deployment (re-use slot even if they don't exactly match, skip
> > >>>> > round-trip to RM).
> > >>>> >
> > >>>> > As for how to handle the lack of a preemptively known SlotIDs,
> that
> > >>>> should
> > >>>> > be fine in and of itself; we already handle a similar case when we
> > >>>> request
> > >>>> > a new TaskExecutor to be started. So long as there is some way to
> > >>>> know how
> > >>>> > many resources the TaskExecutor has in total I do not see a
> problem
> > >>>> at the
> > >>>> > moment. We will get the SlotID eventually by virtue of the
> heartbeat
> > >>>> > SlotReport.
> > >>>> > Implementation plan (SlotManager)
> > >>>> > You are on the right track. The SlotManager tracks the declared
> > >>>> resource
> > >>>> > requirements, and if the requirements increased it creates a
> > >>>> SlotRequest,
> > >>>> > which then goes through similar code paths as we have at the
> moment
> > >>>> (try to
> > >>>> > find a free slot, if found tell the TM, otherwise try to request
> new
> > >>>> TM).
> > >>>> > The SlotManager changes are not that substantial to get a working
> > >>>> version;
> > >>>> > we have a PoC and most of the work went into refactoring the
> > >>>> SlotManager
> > >>>> > into a more manageable state. (split into several components,
> > >>>> stricter and
> > >>>> > simplified Slot life-cycle, ...).
> > >>>> > Offer/free slots between JM/TM
> > >>>> > Gotta run, but that's a good question and I'll think about. But I
> > >>>> think it
> > >>>> > comes down to making less changes, and being able to leverage
> > existing
> > >>>> > reconciliation protocols.
> > >>>> > Do note that TaskExecutor also explicitly inform the RM about
> freed
> > >>>> slots;
> > >>>> > the heartbeat slot report is just a safety net.
> > >>>> > I'm not sure whether slot requests are able to overtake a slot
> > >>>> release;
> > >>>> > @till do you have thoughts on that?
> > >>>> > As for the race condition between the requirements reduction and
> > slot
> > >>>> > release, if we run into problems we have the backup plan of only
> > >>>> releasing
> > >>>> > the slot after the requirement reduction has been acknowledged.
> > >>>> >
> > >>>> > On 26/08/2020 10:31, Xintong Song wrote:
> > >>>> >
> > >>>> > Thanks for preparing the FLIP and driving this discussion,
> @Chesnay
> > &
> > >>>> @Till.
> > >>>> >
> > >>>> > I really like the idea. I see a great value in the proposed
> > >>>> declarative
> > >>>> > resource management, in terms of flexibility, usability and
> > >>>> efficiency.
> > >>>> >
> > >>>> > I have a few comments and questions regarding the FLIP design. In
> > >>>> general,
> > >>>> > the protocol design makes good sense to me. My main concern is
> that
> > >>>> it is
> > >>>> > not very clear to me what changes are required from the
> > >>>> > Resource/SlotManager side to adapt to the new protocol.
> > >>>> >
> > >>>> > *1. Distributed slots across different jobs*
> > >>>> >
> > >>>> > Jobs which register their requirements first, will have precedence
> > >>>> over
> > >>>> >
> > >>>> > other jobs also if the requirements change during the runtime.
> > >>>> >
> > >>>> > Just trying to understand, does this mean jobs are prioritized by
> > the
> > >>>> order
> > >>>> > of their first resource declaring?
> > >>>> >
> > >>>> > *2. AllocationID*
> > >>>> >
> > >>>> > Is this FLIP suggesting to completely remove AllocationID?
> > >>>> >
> > >>>> > I'm fine with this change. It seems where AllocationID is used can
> > >>>> either
> > >>>> > be removed or be replaced by JobID. This reflects the concept that
> > >>>> slots
> > >>>> > are now assigned to a job instead of its individual slot requests.
> > >>>> >
> > >>>> > I would like to bring to attention that this also requires changes
> > on
> > >>>> the
> > >>>> > TM side, with respect to FLIP-56[1].
> > >>>> >
> > >>>> > In the context of dynamic slot allocation introduced by FLIP-56,
> > >>>> slots do
> > >>>> > not pre-exist on TM and are dynamically created when RM calls
> > >>>> > TaskExecutorGateway.requestSlot. Since the slots do not pre-exist,
> > nor
> > >>>> > their SlotIDs, RM requests slots from TM with a special SlotID
> > >>>> (negative
> > >>>> > slot index). The semantic changes from "requesting the slot
> > >>>> identified by
> > >>>> > the given SlotID" to "requesting a slot with the given resource
> > >>>> profile".
> > >>>> > The AllocationID is used for identifying the dynamic slots in such
> > >>>> cases.
> > >>>> >
> > >>>> > >From the perspective of FLIP-56 and fine grained resource
> > >>>> management, I'm
> > >>>> > fine with removing AllocationID. In the meantime, we would need TM
> > to
> > >>>> > recognize the special negative indexed SlotID and generate a new
> > >>>> unique
> > >>>> > SlotID for identifying the slot.
> > >>>> >
> > >>>> > *3. Minimum resource requirement*
> > >>>> >
> > >>>> > However, we can let the JobMaster know if we cannot fulfill the
> > >>>> minimum
> > >>>> >
> > >>>> > resource requirement for a job after
> > >>>> > resourcemanager.standalone.start-up-time has passed.
> > >>>> >
> > >>>> > What is the "minimum resource requirement for a job"? Did I
> overlook
> > >>>> > anything?
> > >>>> >
> > >>>> > *4. Offer/free slots between JM/TM*
> > >>>> >
> > >>>> > This probably deserves a separate discussion thread. Just want to
> > >>>> bring it
> > >>>> > up.
> > >>>> >
> > >>>> > The idea has been coming to me for quite some time. Is this
> design,
> > >>>> that JM
> > >>>> > requests resources from RM while accepting/releasing resources
> > >>>> from/to TM,
> > >>>> > the right thing?
> > >>>> >
> > >>>> > The pain point is that events of JM's activities
> > (requesting/releasing
> > >>>> > resources) arrive at RM out of order. This leads to several
> > problems.
> > >>>> >
> > >>>> >    - When a job fails and task cancelation takes long, some of the
> > >>>> slots
> > >>>> >    might be released from the slot pool due to being unused for a
> > >>>> while. Then
> > >>>> >    the job restarts and requests these slots again. At this time,
> RM
> > >>>> may
> > >>>> >    receive slot requests before noticing from TM heartbeats that
> > >>>> previous
> > >>>> >    slots are released, thus requesting new resources. I've seen
> many
> > >>>> times
> > >>>> >    that the Yarn cluster has a heavy load and is not allocating
> > >>>> resources
> > >>>> >    quickly enough, which leads to slot request timeout and job
> > >>>> failover, and
> > >>>> >    during the failover more resources are requested which adds
> more
> > >>>> load to
> > >>>> >    the Yarn cluster. Happily, this should be improved with the
> > >>>> declarative
> > >>>> >    resource management. :)
> > >>>> >    - As described in this FLIP, it is possible that RM learns the
> > >>>> releasing
> > >>>> >    of slots from TM heartbeat before noticing the resource
> > requirement
> > >>>> >    decreasing, it may allocate more resources which need to be
> > >>>> released soon.
> > >>>> >    - It complicates the ResourceManager/SlotManager, by requiring
> an
> > >>>> >    additional slot state PENDING, which means the slot is assigned
> > by
> > >>>> RM but
> > >>>> >    is not confirmed successfully ordered by TM.
> > >>>> >
> > >>>> > Why not just make RM offer the allocated resources (TM address,
> > >>>> SlotID,
> > >>>> > etc.) to JM, and JM release resources to RM? So that for all the
> > >>>> resource
> > >>>> > management JM talks to RM, and for the task deployment and
> execution
> > >>>> it
> > >>>> > talks to TM?
> > >>>> >
> > >>>> > I tried to understand the benefits for having the current design,
> > and
> > >>>> found
> > >>>> > the following in FLIP-6[2].
> > >>>> >
> > >>>> >
> > >>>> > All that the ResourceManager does is negotiate between the
> > >>>> > cluster-manager, the JobManager, and the TaskManagers. Its state
> can
> > >>>> hence
> > >>>> > be reconstructed from re-acquiring containers and re-registration
> > from
> > >>>> > JobManagers and TaskManagers
> > >>>> >
> > >>>> > Correct me if I'm wrong, it seems the original purpose is to make
> > >>>> sure the
> > >>>> > assignment between jobs and slots are confirmed between JM and
> TMs,
> > >>>> so that
> > >>>> > failures of RM will not lead to any inconsistency. However, this
> > only
> > >>>> > benefits scenarios where RM fails while JM and TMs live.
> Currently,
> > >>>> JM and
> > >>>> > RM are in the same process. We do not really have any scenario
> where
> > >>>> RM
> > >>>> > fails alone. We might separate JM and RM to different processes in
> > >>>> future,
> > >>>> > but as far as I can see we don't have such requirements at the
> > >>>> moment. It
> > >>>> > seems to me that we are suffering the current problems, complying
> to
> > >>>> > potential future benefits.
> > >>>> >
> > >>>> > Maybe I overlooked something.
> > >>>> >
> > >>>> > *5. Implementation Plan*
> > >>>> >
> > >>>> > For SlotPool, it sounds quite straightforward to "aggregate
> > >>>> individual slot
> > >>>> > requests".
> > >>>> >
> > >>>> > For Resource/SlotManager, it seems there are quite a lot changes
> > >>>> needed,
> > >>>> > with the removal of individual slot requests and AllocationID.
> It's
> > >>>> not
> > >>>> > clear to me what is the first step plan for RM/SM? Do we
> internally
> > >>>> treat
> > >>>> > the resource requirements as individual slot requests as the first
> > >>>> step, so
> > >>>> > only the interfaces are changed? Or do we actually change
> > (practically
> > >>>> > re-write) the slot allocation logics?
> > >>>> >
> > >>>> > Thank you~
> > >>>> >
> > >>>> > Xintong Song
> > >>>> >
> > >>>> >
> > >>>> > [1]
> > >>>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
> > >>>> > [2]
> > >>>>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
> > >>>> >
> > >>>> > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler <
> > ches...@apache.org>
> > >>>> <ches...@apache.org> wrote:
> > >>>> >
> > >>>> >
> > >>>> > Hello,
> > >>>> >
> > >>>> > in FLIP-138 we want to rework the way the JobMaster acquires
> slots,
> > >>>> such
> > >>>> > that required resources are declared before a job is scheduled and
> > th
> > >>>> > job execution is adjusted according to the provided resources
> (e.g.,
> > >>>> > reducing parallelism), instead of asking for a fixed number of
> > >>>> resources
> > >>>> > during scheduling and failing midway through if not enough
> resources
> > >>>> are
> > >>>> > available.
> > >>>> >
> > >>>> > This is a stepping stone towards reactive mode, where Flink will
> > >>>> > automatically make use of new TaskExecutors being started.
> > >>>> >
> > >>>> > More details can be found here
> > >>>> > <
> > >>>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
> > >>>> >
> > >>>> > .
> > >>>> >
> > >>>> >
> > >>>> >
> > >>>>
> > >>>
> > >>>
> > >>
> >
>

Reply via email to