Thanks Chesnay&Till for proposing this improvement.
It's of good value to allow jobs to make best use of available
resources adaptively. Not
to mention it further supports reactive mode.
So big +1 for it.
I have a minor concern about possible regression in certain cases due
to the proposed
JobVertex-wise scheduling which replaces current ExecutionVertex-wise
scheduling.
In the proposal, looks to me it requires a stage to finish before its
consumer stage can be
scheduled. This limitation, however, does not exist in current
scheduler. In the case that there
exists a POINTWISE BLOCKING edge, the downstream execution region can
be scheduled
right after its connected upstream execution vertices finishes, even
before the whole upstream
stage finishes. This allows the region to be launched earlier and make
use of available resources.
Do we need to let the new scheduler retain this property?
Thanks,
Zhu
Xintong Song <tonysong...@gmail.com <mailto:tonysong...@gmail.com>>
于2020年8月26日周三 下午6:59写道:
Thanks for the quick response.
*Job prioritization, Allocation IDs, Minimum resource
requirements, SlotManager Implementation Plan:* Sounds good to me.
*FLIP-56*
Good point about the trade-off. I believe maximum resource
utilization and
quick deployment are desired in different scenarios. E.g., a long
running
streaming job deserves some deployment latency to improve the resource
utilization, which benefits the entire lifecycle of the job. On
the other
hand, short batch queries may prefer quick deployment, otherwise
the time
for resource allocation might significantly increase the response
time.
It would be good enough for me to bring these questions to attention.
Nothing that I'm aware of should block this FLIP.
Thank you~
Xintong Song
On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler
<ches...@apache.org <mailto:ches...@apache.org>> wrote:
> Thank you Xintong for your questions!
> Job prioritization
> Yes, the job which declares it's initial requirements first is
prioritized.
> This is very much for simplicity; for example this avoids the
nasty case
> where all jobs get some resources, but none get enough to
actually run the
> job.
> Minimum resource requirements
>
> My bad; at some point we want to allow the JobMaster to declare
a range of
> resources it could use to run a job, for example min=1, target=10,
> max=+inf.
>
> With this model, the RM would then try to balance the resources
such that
> as many jobs as possible are as close to the target state as
possible.
>
> Currently, the minimum/target/maximum resources are all the
same. So the
> notification is sent whenever the current requirements cannot be
met.
> Allocation IDs
> We do intend to, at the very least, remove AllocationIDs on the
> SlotManager side, as they are just not required there.
>
> On the slotpool side we have to keep them around at least until the
> existing Slotpool implementations are removed (not sure whether
we'll fully
> commit to this in 1.12), since the interfaces use AllocationIDs,
which also
> bleed into the JobMaster.
> The TaskExecutor is in a similar position.
> But in the long-term, yes they will be removed, and most usages will
> probably be replaced by the SlotID.
> FLIP-56
>
> Dynamic slot allocations are indeed quite interesting and raise
a few
> questions; for example, the main purpose of it is to ensure maximum
> resource utilization. In that case, should the JobMaster be
allowed to
> re-use a slot it if the task requires less resources than the slot
> provides, or should it always request a new slot that exactly
matches?
>
> There is a trade-off to be made between maximum resource utilization
> (request exactly matching slots, and only re-use exact matches)
and quicker
> job deployment (re-use slot even if they don't exactly match, skip
> round-trip to RM).
>
> As for how to handle the lack of a preemptively known SlotIDs,
that should
> be fine in and of itself; we already handle a similar case when
we request
> a new TaskExecutor to be started. So long as there is some way
to know how
> many resources the TaskExecutor has in total I do not see a
problem at the
> moment. We will get the SlotID eventually by virtue of the heartbeat
> SlotReport.
> Implementation plan (SlotManager)
> You are on the right track. The SlotManager tracks the declared
resource
> requirements, and if the requirements increased it creates a
SlotRequest,
> which then goes through similar code paths as we have at the
moment (try to
> find a free slot, if found tell the TM, otherwise try to request
new TM).
> The SlotManager changes are not that substantial to get a
working version;
> we have a PoC and most of the work went into refactoring the
SlotManager
> into a more manageable state. (split into several components,
stricter and
> simplified Slot life-cycle, ...).
> Offer/free slots between JM/TM
> Gotta run, but that's a good question and I'll think about. But
I think it
> comes down to making less changes, and being able to leverage
existing
> reconciliation protocols.
> Do note that TaskExecutor also explicitly inform the RM about
freed slots;
> the heartbeat slot report is just a safety net.
> I'm not sure whether slot requests are able to overtake a slot
release;
> @till do you have thoughts on that?
> As for the race condition between the requirements reduction and
slot
> release, if we run into problems we have the backup plan of only
releasing
> the slot after the requirement reduction has been acknowledged.
>
> On 26/08/2020 10:31, Xintong Song wrote:
>
> Thanks for preparing the FLIP and driving this discussion,
@Chesnay & @Till.
>
> I really like the idea. I see a great value in the proposed
declarative
> resource management, in terms of flexibility, usability and
efficiency.
>
> I have a few comments and questions regarding the FLIP design.
In general,
> the protocol design makes good sense to me. My main concern is
that it is
> not very clear to me what changes are required from the
> Resource/SlotManager side to adapt to the new protocol.
>
> *1. Distributed slots across different jobs*
>
> Jobs which register their requirements first, will have
precedence over
>
> other jobs also if the requirements change during the runtime.
>
> Just trying to understand, does this mean jobs are prioritized
by the order
> of their first resource declaring?
>
> *2. AllocationID*
>
> Is this FLIP suggesting to completely remove AllocationID?
>
> I'm fine with this change. It seems where AllocationID is used
can either
> be removed or be replaced by JobID. This reflects the concept
that slots
> are now assigned to a job instead of its individual slot requests.
>
> I would like to bring to attention that this also requires
changes on the
> TM side, with respect to FLIP-56[1].
>
> In the context of dynamic slot allocation introduced by FLIP-56,
slots do
> not pre-exist on TM and are dynamically created when RM calls
> TaskExecutorGateway.requestSlot. Since the slots do not
pre-exist, nor
> their SlotIDs, RM requests slots from TM with a special SlotID
(negative
> slot index). The semantic changes from "requesting the slot
identified by
> the given SlotID" to "requesting a slot with the given resource
profile".
> The AllocationID is used for identifying the dynamic slots in
such cases.
>
> >From the perspective of FLIP-56 and fine grained resource
management, I'm
> fine with removing AllocationID. In the meantime, we would need
TM to
> recognize the special negative indexed SlotID and generate a new
unique
> SlotID for identifying the slot.
>
> *3. Minimum resource requirement*
>
> However, we can let the JobMaster know if we cannot fulfill the
minimum
>
> resource requirement for a job after
> resourcemanager.standalone.start-up-time has passed.
>
> What is the "minimum resource requirement for a job"? Did I overlook
> anything?
>
> *4. Offer/free slots between JM/TM*
>
> This probably deserves a separate discussion thread. Just want
to bring it
> up.
>
> The idea has been coming to me for quite some time. Is this
design, that JM
> requests resources from RM while accepting/releasing resources
from/to TM,
> the right thing?
>
> The pain point is that events of JM's activities
(requesting/releasing
> resources) arrive at RM out of order. This leads to several
problems.
>
> - When a job fails and task cancelation takes long, some of
the slots
> might be released from the slot pool due to being unused for
a while. Then
> the job restarts and requests these slots again. At this
time, RM may
> receive slot requests before noticing from TM heartbeats that
previous
> slots are released, thus requesting new resources. I've seen
many times
> that the Yarn cluster has a heavy load and is not allocating
resources
> quickly enough, which leads to slot request timeout and job
failover, and
> during the failover more resources are requested which adds
more load to
> the Yarn cluster. Happily, this should be improved with the
declarative
> resource management. :)
> - As described in this FLIP, it is possible that RM learns
the releasing
> of slots from TM heartbeat before noticing the resource
requirement
> decreasing, it may allocate more resources which need to be
released soon.
> - It complicates the ResourceManager/SlotManager, by requiring an
> additional slot state PENDING, which means the slot is
assigned by RM but
> is not confirmed successfully ordered by TM.
>
> Why not just make RM offer the allocated resources (TM address,
SlotID,
> etc.) to JM, and JM release resources to RM? So that for all the
resource
> management JM talks to RM, and for the task deployment and
execution it
> talks to TM?
>
> I tried to understand the benefits for having the current
design, and found
> the following in FLIP-6[2].
>
>
> All that the ResourceManager does is negotiate between the
> cluster-manager, the JobManager, and the TaskManagers. Its state
can hence
> be reconstructed from re-acquiring containers and
re-registration from
> JobManagers and TaskManagers
>
> Correct me if I'm wrong, it seems the original purpose is to
make sure the
> assignment between jobs and slots are confirmed between JM and
TMs, so that
> failures of RM will not lead to any inconsistency. However, this
only
> benefits scenarios where RM fails while JM and TMs live.
Currently, JM and
> RM are in the same process. We do not really have any scenario
where RM
> fails alone. We might separate JM and RM to different processes
in future,
> but as far as I can see we don't have such requirements at the
moment. It
> seems to me that we are suffering the current problems, complying to
> potential future benefits.
>
> Maybe I overlooked something.
>
> *5. Implementation Plan*
>
> For SlotPool, it sounds quite straightforward to "aggregate
individual slot
> requests".
>
> For Resource/SlotManager, it seems there are quite a lot changes
needed,
> with the removal of individual slot requests and AllocationID.
It's not
> clear to me what is the first step plan for RM/SM? Do we
internally treat
> the resource requirements as individual slot requests as the
first step, so
> only the interfaces are changed? Or do we actually change
(practically
> re-write) the slot allocation logics?
>
> Thank you~
>
> Xintong Song
>
>
>
[1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
>
[2]https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
>
> On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler
<ches...@apache.org <mailto:ches...@apache.org>>
<ches...@apache.org <mailto:ches...@apache.org>> wrote:
>
>
> Hello,
>
> in FLIP-138 we want to rework the way the JobMaster acquires
slots, such
> that required resources are declared before a job is scheduled
and th
> job execution is adjusted according to the provided resources (e.g.,
> reducing parallelism), instead of asking for a fixed number of
resources
> during scheduling and failing midway through if not enough
resources are
> available.
>
> This is a stepping stone towards reactive mode, where Flink will
> automatically make use of new TaskExecutors being started.
>
> More details can be found here
>
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
>
> .
>
>
>