Thank you Xintong for your questions!
Job prioritization
Yes, the job which declares it's initial requirements first is prioritized.
This is very much for simplicity; for example this avoids the nasty case
where all jobs get some resources, but none get enough to actually run
the job.
Minimum resource requirements
My bad; at some point we want to allow the JobMaster to declare a range
of resources it could use to run a job, for example min=1, target=10,
max=+inf.
With this model, the RM would then try to balance the resources such
that as many jobs as possible are as close to the target state as possible.
Currently, the minimum/target/maximum resources are all the same. So the
notification is sent whenever the current requirements cannot be met.
Allocation IDs
We do intend to, at the very least, remove AllocationIDs on the
SlotManager side, as they are just not required there.
On the slotpool side we have to keep them around at least until the
existing Slotpool implementations are removed (not sure whether we'll
fully commit to this in 1.12), since the interfaces use AllocationIDs,
which also bleed into the JobMaster.
The TaskExecutor is in a similar position.
But in the long-term, yes they will be removed, and most usages will
probably be replaced by the SlotID.
FLIP-56
Dynamic slot allocations are indeed quite interesting and raise a few
questions; for example, the main purpose of it is to ensure maximum
resource utilization. In that case, should the JobMaster be allowed to
re-use a slot it if the task requires less resources than the slot
provides, or should it always request a new slot that exactly matches?
There is a trade-off to be made between maximum resource utilization
(request exactly matching slots, and only re-use exact matches) and
quicker job deployment (re-use slot even if they don't exactly match,
skip round-trip to RM).
As for how to handle the lack of a preemptively known SlotIDs, that
should be fine in and of itself; we already handle a similar case when
we request a new TaskExecutor to be started. So long as there is some
way to know how many resources the TaskExecutor has in total I do not
see a problem at the moment. We will get the SlotID eventually by virtue
of the heartbeat SlotReport.
Implementation plan (SlotManager)
You are on the right track. The SlotManager tracks the declared resource
requirements, and if the requirements increased it creates a
SlotRequest, which then goes through similar code paths as we have at
the moment (try to find a free slot, if found tell the TM, otherwise try
to request new TM).
The SlotManager changes are not that substantial to get a working
version; we have a PoC and most of the work went into refactoring the
SlotManager into a more manageable state. (split into several
components, stricter and simplified Slot life-cycle, ...).
Offer/free slots between JM/TM
Gotta run, but that's a good question and I'll think about. But I think
it comes down to making less changes, and being able to leverage
existing reconciliation protocols.
Do note that TaskExecutor also explicitly inform the RM about freed
slots; the heartbeat slot report is just a safety net.
I'm not sure whether slot requests are able to overtake a slot release;
@till do you have thoughts on that?
As for the race condition between the requirements reduction and slot
release, if we run into problems we have the backup plan of only
releasing the slot after the requirement reduction has been acknowledged.
On 26/08/2020 10:31, Xintong Song wrote:
Thanks for preparing the FLIP and driving this discussion, @Chesnay & @Till.
I really like the idea. I see a great value in the proposed declarative
resource management, in terms of flexibility, usability and efficiency.
I have a few comments and questions regarding the FLIP design. In general,
the protocol design makes good sense to me. My main concern is that it is
not very clear to me what changes are required from the
Resource/SlotManager side to adapt to the new protocol.
*1. Distributed slots across different jobs*
Jobs which register their requirements first, will have precedence over
other jobs also if the requirements change during the runtime.
Just trying to understand, does this mean jobs are prioritized by the order
of their first resource declaring?
*2. AllocationID*
Is this FLIP suggesting to completely remove AllocationID?
I'm fine with this change. It seems where AllocationID is used can either
be removed or be replaced by JobID. This reflects the concept that slots
are now assigned to a job instead of its individual slot requests.
I would like to bring to attention that this also requires changes on the
TM side, with respect to FLIP-56[1].
In the context of dynamic slot allocation introduced by FLIP-56, slots do
not pre-exist on TM and are dynamically created when RM calls
TaskExecutorGateway.requestSlot. Since the slots do not pre-exist, nor
their SlotIDs, RM requests slots from TM with a special SlotID (negative
slot index). The semantic changes from "requesting the slot identified by
the given SlotID" to "requesting a slot with the given resource profile".
The AllocationID is used for identifying the dynamic slots in such cases.
>From the perspective of FLIP-56 and fine grained resource management, I'm
fine with removing AllocationID. In the meantime, we would need TM to
recognize the special negative indexed SlotID and generate a new unique
SlotID for identifying the slot.
*3. Minimum resource requirement*
However, we can let the JobMaster know if we cannot fulfill the minimum
resource requirement for a job after
resourcemanager.standalone.start-up-time has passed.
What is the "minimum resource requirement for a job"? Did I overlook
anything?
*4. Offer/free slots between JM/TM*
This probably deserves a separate discussion thread. Just want to bring it
up.
The idea has been coming to me for quite some time. Is this design, that JM
requests resources from RM while accepting/releasing resources from/to TM,
the right thing?
The pain point is that events of JM's activities (requesting/releasing
resources) arrive at RM out of order. This leads to several problems.
- When a job fails and task cancelation takes long, some of the slots
might be released from the slot pool due to being unused for a while. Then
the job restarts and requests these slots again. At this time, RM may
receive slot requests before noticing from TM heartbeats that previous
slots are released, thus requesting new resources. I've seen many times
that the Yarn cluster has a heavy load and is not allocating resources
quickly enough, which leads to slot request timeout and job failover, and
during the failover more resources are requested which adds more load to
the Yarn cluster. Happily, this should be improved with the declarative
resource management. :)
- As described in this FLIP, it is possible that RM learns the releasing
of slots from TM heartbeat before noticing the resource requirement
decreasing, it may allocate more resources which need to be released soon.
- It complicates the ResourceManager/SlotManager, by requiring an
additional slot state PENDING, which means the slot is assigned by RM but
is not confirmed successfully ordered by TM.
Why not just make RM offer the allocated resources (TM address, SlotID,
etc.) to JM, and JM release resources to RM? So that for all the resource
management JM talks to RM, and for the task deployment and execution it
talks to TM?
I tried to understand the benefits for having the current design, and found
the following in FLIP-6[2].
All that the ResourceManager does is negotiate between the
cluster-manager, the JobManager, and the TaskManagers. Its state can hence
be reconstructed from re-acquiring containers and re-registration from
JobManagers and TaskManagers
Correct me if I'm wrong, it seems the original purpose is to make sure the
assignment between jobs and slots are confirmed between JM and TMs, so that
failures of RM will not lead to any inconsistency. However, this only
benefits scenarios where RM fails while JM and TMs live. Currently, JM and
RM are in the same process. We do not really have any scenario where RM
fails alone. We might separate JM and RM to different processes in future,
but as far as I can see we don't have such requirements at the moment. It
seems to me that we are suffering the current problems, complying to
potential future benefits.
Maybe I overlooked something.
*5. Implementation Plan*
For SlotPool, it sounds quite straightforward to "aggregate individual slot
requests".
For Resource/SlotManager, it seems there are quite a lot changes needed,
with the removal of individual slot requests and AllocationID. It's not
clear to me what is the first step plan for RM/SM? Do we internally treat
the resource requirements as individual slot requests as the first step, so
only the interfaces are changed? Or do we actually change (practically
re-write) the slot allocation logics?
Thank you~
Xintong Song
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
[2]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler <ches...@apache.org> wrote:
Hello,
in FLIP-138 we want to rework the way the JobMaster acquires slots, such
that required resources are declared before a job is scheduled and th
job execution is adjusted according to the provided resources (e.g.,
reducing parallelism), instead of asking for a fixed number of resources
during scheduling and failing midway through if not enough resources are
available.
This is a stepping stone towards reactive mode, where Flink will
automatically make use of new TaskExecutors being started.
More details can be found here
<
https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
.