The scheduler doesn't have to wait for one stage to finish. It is still aware that the upstream execution vertex has finished, and can request/use slots accordingly to schedule the consumer.

This will get more complicated once we allow the scheduler to change the parallelism while the job is running, for which we will need some enhancements to the network stack to allow the producer to run without knowing the consumer parallelism ahead of time. I'm not too clear on the details, but we'll some form of keygroup-like approach for sub partitions (maxParallelism and all that).


On 27/08/2020 20:05, Zhu Zhu wrote:
Thanks Chesnay&Till for proposing this improvement.
It's of good value to allow jobs to make best use of available resources adaptively. Not
to mention it further supports reactive mode.
So big +1 for it.

I have a minor concern about possible regression in certain cases due to the proposed JobVertex-wise scheduling which replaces current ExecutionVertex-wise scheduling. In the proposal, looks to me it requires a stage to finish before its consumer stage can be scheduled. This limitation, however, does not exist in current scheduler. In the case that there exists a POINTWISE BLOCKING edge, the downstream execution region can be scheduled right after its connected upstream execution vertices finishes, even before the whole upstream stage finishes. This allows the region to be launched earlier and make use of available resources.
Do we need to let the new scheduler retain this property?

Thanks,
Zhu

Xintong Song <tonysong...@gmail.com <mailto:tonysong...@gmail.com>> 于2020年8月26日周三 下午6:59写道:

    Thanks for the quick response.

    *Job prioritization, Allocation IDs, Minimum resource
    requirements, SlotManager Implementation Plan:* Sounds good to me.

    *FLIP-56*
    Good point about the trade-off. I believe maximum resource
    utilization and
    quick deployment are desired in different scenarios. E.g., a long
    running
    streaming job deserves some deployment latency to improve the resource
    utilization, which benefits the entire lifecycle of the job. On
    the other
    hand, short batch queries may prefer quick deployment, otherwise
    the time
    for resource allocation might significantly increase the response
    time.
    It would be good enough for me to bring these questions to attention.
    Nothing that I'm aware of should block this FLIP.

    Thank you~

    Xintong Song



    On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler
    <ches...@apache.org <mailto:ches...@apache.org>> wrote:

    > Thank you Xintong for your questions!
    > Job prioritization
    > Yes, the job which declares it's initial requirements first is
    prioritized.
    > This is very much for simplicity; for example this avoids the
    nasty case
    > where all jobs get some resources, but none get enough to
    actually run the
    > job.
    > Minimum resource requirements
    >
    > My bad; at some point we want to allow the JobMaster to declare
    a range of
    > resources it could use to run a job, for example min=1, target=10,
    > max=+inf.
    >
    > With this model, the RM would then try to balance the resources
    such that
    > as many jobs as possible are as close to the target state as
    possible.
    >
    > Currently, the minimum/target/maximum resources are all the
    same. So the
    > notification is sent whenever the current requirements cannot be
    met.
    > Allocation IDs
    > We do intend to, at the very least, remove AllocationIDs on the
    > SlotManager side, as they are just not required there.
    >
    > On the slotpool side we have to keep them around at least until the
    > existing Slotpool implementations are removed (not sure whether
    we'll fully
    > commit to this in 1.12), since the interfaces use AllocationIDs,
    which also
    > bleed into the JobMaster.
    > The TaskExecutor is in a similar position.
    > But in the long-term, yes they will be removed, and most usages will
    > probably be replaced by the SlotID.
    > FLIP-56
    >
    > Dynamic slot allocations are indeed quite interesting and raise
    a few
    > questions; for example, the main purpose of it is to ensure maximum
    > resource utilization. In that case, should the JobMaster be
    allowed to
    > re-use a slot it if the task requires less resources than the slot
    > provides, or should it always request a new slot that exactly
    matches?
    >
    > There is a trade-off to be made between maximum resource utilization
    > (request exactly matching slots, and only re-use exact matches)
    and quicker
    > job deployment (re-use slot even if they don't exactly match, skip
    > round-trip to RM).
    >
    > As for how to handle the lack of a preemptively known SlotIDs,
    that should
    > be fine in and of itself; we already handle a similar case when
    we request
    > a new TaskExecutor to be started. So long as there is some way
    to know how
    > many resources the TaskExecutor has in total I do not see a
    problem at the
    > moment. We will get the SlotID eventually by virtue of the heartbeat
    > SlotReport.
    > Implementation plan (SlotManager)
    > You are on the right track. The SlotManager tracks the declared
    resource
    > requirements, and if the requirements increased it creates a
    SlotRequest,
    > which then goes through similar code paths as we have at the
    moment (try to
    > find a free slot, if found tell the TM, otherwise try to request
    new TM).
    > The SlotManager changes are not that substantial to get a
    working version;
    > we have a PoC and most of the work went into refactoring the
    SlotManager
    > into a more manageable state. (split into several components,
    stricter and
    > simplified Slot life-cycle, ...).
    > Offer/free slots between JM/TM
    > Gotta run, but that's a good question and I'll think about. But
    I think it
    > comes down to making less changes, and being able to leverage
    existing
    > reconciliation protocols.
    > Do note that TaskExecutor also explicitly inform the RM about
    freed slots;
    > the heartbeat slot report is just a safety net.
    > I'm not sure whether slot requests are able to overtake a slot
    release;
    > @till do you have thoughts on that?
    > As for the race condition between the requirements reduction and
    slot
    > release, if we run into problems we have the backup plan of only
    releasing
    > the slot after the requirement reduction has been acknowledged.
    >
    > On 26/08/2020 10:31, Xintong Song wrote:
    >
    > Thanks for preparing the FLIP and driving this discussion,
    @Chesnay & @Till.
    >
    > I really like the idea. I see a great value in the proposed
    declarative
    > resource management, in terms of flexibility, usability and
    efficiency.
    >
    > I have a few comments and questions regarding the FLIP design.
    In general,
    > the protocol design makes good sense to me. My main concern is
    that it is
    > not very clear to me what changes are required from the
    > Resource/SlotManager side to adapt to the new protocol.
    >
    > *1. Distributed slots across different jobs*
    >
    > Jobs which register their requirements first, will have
    precedence over
    >
    > other jobs also if the requirements change during the runtime.
    >
    > Just trying to understand, does this mean jobs are prioritized
    by the order
    > of their first resource declaring?
    >
    > *2. AllocationID*
    >
    > Is this FLIP suggesting to completely remove AllocationID?
    >
    > I'm fine with this change. It seems where AllocationID is used
    can either
    > be removed or be replaced by JobID. This reflects the concept
    that slots
    > are now assigned to a job instead of its individual slot requests.
    >
    > I would like to bring to attention that this also requires
    changes on the
    > TM side, with respect to FLIP-56[1].
    >
    > In the context of dynamic slot allocation introduced by FLIP-56,
    slots do
    > not pre-exist on TM and are dynamically created when RM calls
    > TaskExecutorGateway.requestSlot. Since the slots do not
    pre-exist, nor
    > their SlotIDs, RM requests slots from TM with a special SlotID
    (negative
    > slot index). The semantic changes from "requesting the slot
    identified by
    > the given SlotID" to "requesting a slot with the given resource
    profile".
    > The AllocationID is used for identifying the dynamic slots in
    such cases.
    >
    > >From the perspective of FLIP-56 and fine grained resource
    management, I'm
    > fine with removing AllocationID. In the meantime, we would need
    TM to
    > recognize the special negative indexed SlotID and generate a new
    unique
    > SlotID for identifying the slot.
    >
    > *3. Minimum resource requirement*
    >
    > However, we can let the JobMaster know if we cannot fulfill the
    minimum
    >
    > resource requirement for a job after
    > resourcemanager.standalone.start-up-time has passed.
    >
    > What is the "minimum resource requirement for a job"? Did I overlook
    > anything?
    >
    > *4. Offer/free slots between JM/TM*
    >
    > This probably deserves a separate discussion thread. Just want
    to bring it
    > up.
    >
    > The idea has been coming to me for quite some time. Is this
    design, that JM
    > requests resources from RM while accepting/releasing resources
    from/to TM,
    > the right thing?
    >
    > The pain point is that events of JM's activities
    (requesting/releasing
    > resources) arrive at RM out of order. This leads to several
    problems.
    >
    >    - When a job fails and task cancelation takes long, some of
    the slots
    >    might be released from the slot pool due to being unused for
    a while. Then
    >    the job restarts and requests these slots again. At this
    time, RM may
    >    receive slot requests before noticing from TM heartbeats that
    previous
    >    slots are released, thus requesting new resources. I've seen
    many times
    >    that the Yarn cluster has a heavy load and is not allocating
    resources
    >    quickly enough, which leads to slot request timeout and job
    failover, and
    >    during the failover more resources are requested which adds
    more load to
    >    the Yarn cluster. Happily, this should be improved with the
    declarative
    >    resource management. :)
    >    - As described in this FLIP, it is possible that RM learns
    the releasing
    >    of slots from TM heartbeat before noticing the resource
    requirement
    >    decreasing, it may allocate more resources which need to be
    released soon.
    >    - It complicates the ResourceManager/SlotManager, by requiring an
    >    additional slot state PENDING, which means the slot is
    assigned by RM but
    >    is not confirmed successfully ordered by TM.
    >
    > Why not just make RM offer the allocated resources (TM address,
    SlotID,
    > etc.) to JM, and JM release resources to RM? So that for all the
    resource
    > management JM talks to RM, and for the task deployment and
    execution it
    > talks to TM?
    >
    > I tried to understand the benefits for having the current
    design, and found
    > the following in FLIP-6[2].
    >
    >
    > All that the ResourceManager does is negotiate between the
    > cluster-manager, the JobManager, and the TaskManagers. Its state
    can hence
    > be reconstructed from re-acquiring containers and
    re-registration from
    > JobManagers and TaskManagers
    >
    > Correct me if I'm wrong, it seems the original purpose is to
    make sure the
    > assignment between jobs and slots are confirmed between JM and
    TMs, so that
    > failures of RM will not lead to any inconsistency. However, this
    only
    > benefits scenarios where RM fails while JM and TMs live.
    Currently, JM and
    > RM are in the same process. We do not really have any scenario
    where RM
    > fails alone. We might separate JM and RM to different processes
    in future,
    > but as far as I can see we don't have such requirements at the
    moment. It
    > seems to me that we are suffering the current problems, complying to
    > potential future benefits.
    >
    > Maybe I overlooked something.
    >
    > *5. Implementation Plan*
    >
    > For SlotPool, it sounds quite straightforward to "aggregate
    individual slot
    > requests".
    >
    > For Resource/SlotManager, it seems there are quite a lot changes
    needed,
    > with the removal of individual slot requests and AllocationID.
    It's not
    > clear to me what is the first step plan for RM/SM? Do we
    internally treat
    > the resource requirements as individual slot requests as the
    first step, so
    > only the interfaces are changed? Or do we actually change
    (practically
    > re-write) the slot allocation logics?
    >
    > Thank you~
    >
    > Xintong Song
    >
    >
    >
    
[1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
    >
    [2]https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
    >
    > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler
    <ches...@apache.org <mailto:ches...@apache.org>>
    <ches...@apache.org <mailto:ches...@apache.org>> wrote:
    >
    >
    > Hello,
    >
    > in FLIP-138 we want to rework the way the JobMaster acquires
    slots, such
    > that required resources are declared before a job is scheduled
    and th
    > job execution is adjusted according to the provided resources (e.g.,
    > reducing parallelism), instead of asking for a fixed number of
    resources
    > during scheduling and failing midway through if not enough
    resources are
    > available.
    >
    > This is a stepping stone towards reactive mode, where Flink will
    > automatically make use of new TaskExecutors being started.
    >
    > More details can be found here
    >
    
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
    >
    > .
    >
    >
    >


Reply via email to