Thanks for the feedback Xintong and Zhu Zhu. I've added a bit more details for the intended interface extensions, potential follow ups (removing the AllocationIDs) and the question about whether to reuse or return a slot if the profiles don't fully match.
If nobody objects, then I would start a vote for this FLIP soon. Cheers, Till On Mon, Aug 31, 2020 at 11:53 AM Zhu Zhu <reed...@gmail.com> wrote: > Thanks for the clarification @Till Rohrmann <trohrm...@apache.org> > > >> # Implications for the scheduling > Agreed that it turned out to be different execution strategies for batch > jobs. > We can have a simple one first and improve it later. > > Thanks, > Zhu > > Xintong Song <tonysong...@gmail.com> 于2020年8月31日周一 下午3:05写道: > >> Thanks for the clarification, @Till. >> >> - For FLIP-56, sounds good to me. I think there should be no problem >> before >> removing AllocationID. And even after replacing AllocationID, it should >> only require limited effort to make FLIP-56 work with SlotID. I was just >> trying to understand when the effort will be needed. >> >> - For offer/release slots between JM/TM, I think you are right. >> Waiting on the confirmation for resource requirement decrease before >> freeing the slot is quite equivalent to releasing slots through RM, in >> terms of it practically preventing JM from releasing slots when the RM is >> absent. But this approach obviously requires less change to the current >> mechanism. >> Since the first problem can be solved by the declarative protocol, and the >> second problem can be addressed by this confirmation based approach, ATM I >> don't see any strong reason for changing to offering and releasing slots >> through RM, especially considering the significant changes it requires. >> >> Thank you~ >> >> Xintong Song >> >> >> >> On Fri, Aug 28, 2020 at 10:07 PM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >> > Thanks for creating this FLIP @Chesnay and the good input @Xintong and >> @Zhu >> > Zhu. >> > >> > Let me try to add some comments concerning your questions: >> > >> > # FLIP-56 >> > >> > I think there is nothing fundamentally contradicting FLIP-56 in the FLIP >> > for declarative resource management. As Chesnay said, we have to keep >> the >> > AllocationID around as long as we have the old scheduler implementation. >> > Once it is replaced, we can think about using the SlotID instead of >> > AllocationIDs for identifying allocated slots. For dynamic slots we can >> > keep the special meaning of a SlotID with a negative index. In the >> future >> > we might think about making this encoding a bit more explicit by >> sending a >> > richer slot request object and reporting the actual SlotID back to the >> RM. >> > >> > For the question of resource utilization vs. deployment latency I >> believe >> > that this will be a question of requirements and preferences as you've >> said >> > Xintong. I can see that we will have different strategies to fulfill the >> > different needs. >> > >> > # Offer/free slots between JM/TM >> > >> > You are right Xintong that the existing slot protocol was developed with >> > the assumption in mind that the RM and JM can run in separate processes >> and >> > that a failure of the RM should only affect the JM in the sense that it >> > cannot ask for more resources. I believe that one could simplify things >> a >> > bit under the assumption that the RM and JM are always colocated in the >> > same process. However, the discussion whether to change it or not should >> > indeed be a separate one. >> > >> > Changing the slot protocol to a declarative resource management should >> > already solve the first problem you have described because we won't ask >> for >> > new slots in case of a failover but simply keep the same resource >> > requirements declared and let the RM make sure that we will receive at >> > least this amount of slots. >> > >> > If releasing a slot should lead to allocating new resources because >> > decreasing the resource requirement declaration takes longer than >> releasing >> > the slot on the TM, then we could apply what Chesnay said. By waiting on >> > the confirmation of the resource requirement decrease and then freeing >> the >> > slot on the TM gives you effectively the same behaviour as if the >> freeing >> > of the slot would be done by the RM. >> > >> > I am not entirely sure whether allocating the slots and receiving the >> slot >> > offers through the RM will allow us to get rid of the pending slot >> state on >> > the RM side. If the RM needs to communicate with the TM and we want to >> have >> > a reconciliation protocol between these components, then I think we >> would >> > have to solve the exact same problem of currently waiting on the TM for >> > confirming that a slot has been allocated. >> > >> > # Implications for the scheduling >> > >> > The FLIP does not fully cover the changes for the scheduler and mainly >> > drafts the rough idea. For the batch scheduling, I believe that we have >> a >> > couple degrees of freedom in how to do things. In the scenario you >> > described, one could choose a simple strategy where we wait for all >> > producers to stop before deciding on the parallelism of the consumer and >> > scheduling the respective tasks (even though they have POINTWISE >> BLOCKING >> > edges). Or we can try to be smart and say if we get at least one slot >> that >> > we can run the consumers with the same parallelism as the producers it >> just >> > might be that we have to run them one after another in a single slot. >> One >> > advantage of not directly schedule the first consumer when the first >> > producer is finished is that one might schedule the consumer stage with >> a >> > higher parallelism because one might acquire more resources a bit later. >> > But I would see this as different execution strategies which have >> different >> > properties. >> > >> > Cheers, >> > Till >> > >> > On Fri, Aug 28, 2020 at 11:21 AM Zhu Zhu <reed...@gmail.com> wrote: >> > >> > > Thanks for the explanation @Chesnay Schepler <ches...@apache.org> . >> > > >> > > Yes, for batch jobs it can be safe to schedule downstream vertices if >> > > there >> > > are enough slots in the pool, even if these slots are still in use at >> > that >> > > moment. >> > > And the job can still progress even if the vertices stick to the >> original >> > > parallelism. >> > > >> > > Looks to me several decision makings can be different for streaming >> and >> > > batch jobs. >> > > Looking forward to the follow-up FLIP on the lazy ExecutionGraph >> > > construction! >> > > >> > > Thanks, >> > > Zhu >> > > >> > > Chesnay Schepler <ches...@apache.org> 于2020年8月28日周五 下午4:35写道: >> > > >> > >> Maybe :) >> > >> >> > >> Imagine a case where the producer and consumer have the same >> > >> ResourceProfile, or at least one where the consumer requirements are >> > less >> > >> than the producer ones. >> > >> In this case, the scheduler can happily schedule consumers, because >> it >> > >> knows it will get enough slots. >> > >> >> > >> If the profiles are different, then the Scheduler _may_ wait >> > >> numberOf(producer) slots; it _may_ also stick with the parallelism >> and >> > >> schedule right away, in the worst case running the consumers in >> > sequence. >> > >> In fact, for batch jobs there is probably(?) never a reason for the >> > >> scheduler to _reduce_ the parallelism; it can always try to run >> things >> > in >> > >> sequence if it doesn't get enough slots. >> > >> Reducing the parallelism would just mean that you'd have to wait for >> > more >> > >> producers to finish. >> > >> >> > >> The scope of this FLIP is just the protocol, without changes to the >> > >> scheduler; in other words just changing how slots are acquired, but >> > change >> > >> nothing about the scheduling. That is tackled in a follow-up FLIP. >> > >> >> > >> On 28/08/2020 07:34, Zhu Zhu wrote: >> > >> >> > >> Thanks for the response! >> > >> >> > >> >> The scheduler doesn't have to wait for one stage to finish >> > >> Does it mean we will declare resources and decide the parallelism >> for a >> > >> stage which is partially >> > >> schedulable, i.e. when input data are ready just for part of the >> > >> execution vertices? >> > >> >> > >> >> This will get more complicated once we allow the scheduler to >> change >> > >> the parallelism while the job is running >> > >> Agreed. Looks to me it's a problem for batch jobs only and can be >> > avoided >> > >> for streaming jobs. >> > >> Will this FLIP limit its scope to streaming jobs, and improvements >> for >> > >> batch jobs are to be done later? >> > >> >> > >> Thanks, >> > >> Zhu >> > >> >> > >> Chesnay Schepler <ches...@apache.org> 于2020年8月28日周五 上午2:27写道: >> > >> >> > >>> The scheduler doesn't have to wait for one stage to finish. It is >> still >> > >>> aware that the upstream execution vertex has finished, and can >> > request/use >> > >>> slots accordingly to schedule the consumer. >> > >>> >> > >>> This will get more complicated once we allow the scheduler to change >> > the >> > >>> parallelism while the job is running, for which we will need some >> > >>> enhancements to the network stack to allow the producer to run >> without >> > >>> knowing the consumer parallelism ahead of time. I'm not too clear on >> > the >> > >>> details, but we'll some form of keygroup-like approach for sub >> > partitions >> > >>> (maxParallelism and all that). >> > >>> >> > >>> On 27/08/2020 20:05, Zhu Zhu wrote: >> > >>> >> > >>> Thanks Chesnay&Till for proposing this improvement. >> > >>> It's of good value to allow jobs to make best use of available >> > resources >> > >>> adaptively. Not >> > >>> to mention it further supports reactive mode. >> > >>> So big +1 for it. >> > >>> >> > >>> I have a minor concern about possible regression in certain cases >> due >> > to >> > >>> the proposed >> > >>> JobVertex-wise scheduling which replaces current >> ExecutionVertex-wise >> > >>> scheduling. >> > >>> In the proposal, looks to me it requires a stage to finish before >> its >> > >>> consumer stage can be >> > >>> scheduled. This limitation, however, does not exist in current >> > >>> scheduler. In the case that there >> > >>> exists a POINTWISE BLOCKING edge, the downstream execution region >> can >> > be >> > >>> scheduled >> > >>> right after its connected upstream execution vertices finishes, even >> > >>> before the whole upstream >> > >>> stage finishes. This allows the region to be launched earlier and >> make >> > >>> use of available resources. >> > >>> Do we need to let the new scheduler retain this property? >> > >>> >> > >>> Thanks, >> > >>> Zhu >> > >>> >> > >>> Xintong Song <tonysong...@gmail.com> 于2020年8月26日周三 下午6:59写道: >> > >>> >> > >>>> Thanks for the quick response. >> > >>>> >> > >>>> *Job prioritization, Allocation IDs, Minimum resource >> > >>>> requirements, SlotManager Implementation Plan:* Sounds good to me. >> > >>>> >> > >>>> *FLIP-56* >> > >>>> Good point about the trade-off. I believe maximum resource >> utilization >> > >>>> and >> > >>>> quick deployment are desired in different scenarios. E.g., a long >> > >>>> running >> > >>>> streaming job deserves some deployment latency to improve the >> resource >> > >>>> utilization, which benefits the entire lifecycle of the job. On the >> > >>>> other >> > >>>> hand, short batch queries may prefer quick deployment, otherwise >> the >> > >>>> time >> > >>>> for resource allocation might significantly increase the response >> > time. >> > >>>> It would be good enough for me to bring these questions to >> attention. >> > >>>> Nothing that I'm aware of should block this FLIP. >> > >>>> >> > >>>> Thank you~ >> > >>>> >> > >>>> Xintong Song >> > >>>> >> > >>>> >> > >>>> >> > >>>> On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler < >> ches...@apache.org> >> > >>>> wrote: >> > >>>> >> > >>>> > Thank you Xintong for your questions! >> > >>>> > Job prioritization >> > >>>> > Yes, the job which declares it's initial requirements first is >> > >>>> prioritized. >> > >>>> > This is very much for simplicity; for example this avoids the >> nasty >> > >>>> case >> > >>>> > where all jobs get some resources, but none get enough to >> actually >> > >>>> run the >> > >>>> > job. >> > >>>> > Minimum resource requirements >> > >>>> > >> > >>>> > My bad; at some point we want to allow the JobMaster to declare a >> > >>>> range of >> > >>>> > resources it could use to run a job, for example min=1, >> target=10, >> > >>>> > max=+inf. >> > >>>> > >> > >>>> > With this model, the RM would then try to balance the resources >> such >> > >>>> that >> > >>>> > as many jobs as possible are as close to the target state as >> > possible. >> > >>>> > >> > >>>> > Currently, the minimum/target/maximum resources are all the >> same. So >> > >>>> the >> > >>>> > notification is sent whenever the current requirements cannot be >> > met. >> > >>>> > Allocation IDs >> > >>>> > We do intend to, at the very least, remove AllocationIDs on the >> > >>>> > SlotManager side, as they are just not required there. >> > >>>> > >> > >>>> > On the slotpool side we have to keep them around at least until >> the >> > >>>> > existing Slotpool implementations are removed (not sure whether >> > we'll >> > >>>> fully >> > >>>> > commit to this in 1.12), since the interfaces use AllocationIDs, >> > >>>> which also >> > >>>> > bleed into the JobMaster. >> > >>>> > The TaskExecutor is in a similar position. >> > >>>> > But in the long-term, yes they will be removed, and most usages >> will >> > >>>> > probably be replaced by the SlotID. >> > >>>> > FLIP-56 >> > >>>> > >> > >>>> > Dynamic slot allocations are indeed quite interesting and raise a >> > few >> > >>>> > questions; for example, the main purpose of it is to ensure >> maximum >> > >>>> > resource utilization. In that case, should the JobMaster be >> allowed >> > to >> > >>>> > re-use a slot it if the task requires less resources than the >> slot >> > >>>> > provides, or should it always request a new slot that exactly >> > matches? >> > >>>> > >> > >>>> > There is a trade-off to be made between maximum resource >> utilization >> > >>>> > (request exactly matching slots, and only re-use exact matches) >> and >> > >>>> quicker >> > >>>> > job deployment (re-use slot even if they don't exactly match, >> skip >> > >>>> > round-trip to RM). >> > >>>> > >> > >>>> > As for how to handle the lack of a preemptively known SlotIDs, >> that >> > >>>> should >> > >>>> > be fine in and of itself; we already handle a similar case when >> we >> > >>>> request >> > >>>> > a new TaskExecutor to be started. So long as there is some way to >> > >>>> know how >> > >>>> > many resources the TaskExecutor has in total I do not see a >> problem >> > >>>> at the >> > >>>> > moment. We will get the SlotID eventually by virtue of the >> heartbeat >> > >>>> > SlotReport. >> > >>>> > Implementation plan (SlotManager) >> > >>>> > You are on the right track. The SlotManager tracks the declared >> > >>>> resource >> > >>>> > requirements, and if the requirements increased it creates a >> > >>>> SlotRequest, >> > >>>> > which then goes through similar code paths as we have at the >> moment >> > >>>> (try to >> > >>>> > find a free slot, if found tell the TM, otherwise try to request >> new >> > >>>> TM). >> > >>>> > The SlotManager changes are not that substantial to get a working >> > >>>> version; >> > >>>> > we have a PoC and most of the work went into refactoring the >> > >>>> SlotManager >> > >>>> > into a more manageable state. (split into several components, >> > >>>> stricter and >> > >>>> > simplified Slot life-cycle, ...). >> > >>>> > Offer/free slots between JM/TM >> > >>>> > Gotta run, but that's a good question and I'll think about. But I >> > >>>> think it >> > >>>> > comes down to making less changes, and being able to leverage >> > existing >> > >>>> > reconciliation protocols. >> > >>>> > Do note that TaskExecutor also explicitly inform the RM about >> freed >> > >>>> slots; >> > >>>> > the heartbeat slot report is just a safety net. >> > >>>> > I'm not sure whether slot requests are able to overtake a slot >> > >>>> release; >> > >>>> > @till do you have thoughts on that? >> > >>>> > As for the race condition between the requirements reduction and >> > slot >> > >>>> > release, if we run into problems we have the backup plan of only >> > >>>> releasing >> > >>>> > the slot after the requirement reduction has been acknowledged. >> > >>>> > >> > >>>> > On 26/08/2020 10:31, Xintong Song wrote: >> > >>>> > >> > >>>> > Thanks for preparing the FLIP and driving this discussion, >> @Chesnay >> > & >> > >>>> @Till. >> > >>>> > >> > >>>> > I really like the idea. I see a great value in the proposed >> > >>>> declarative >> > >>>> > resource management, in terms of flexibility, usability and >> > >>>> efficiency. >> > >>>> > >> > >>>> > I have a few comments and questions regarding the FLIP design. In >> > >>>> general, >> > >>>> > the protocol design makes good sense to me. My main concern is >> that >> > >>>> it is >> > >>>> > not very clear to me what changes are required from the >> > >>>> > Resource/SlotManager side to adapt to the new protocol. >> > >>>> > >> > >>>> > *1. Distributed slots across different jobs* >> > >>>> > >> > >>>> > Jobs which register their requirements first, will have >> precedence >> > >>>> over >> > >>>> > >> > >>>> > other jobs also if the requirements change during the runtime. >> > >>>> > >> > >>>> > Just trying to understand, does this mean jobs are prioritized by >> > the >> > >>>> order >> > >>>> > of their first resource declaring? >> > >>>> > >> > >>>> > *2. AllocationID* >> > >>>> > >> > >>>> > Is this FLIP suggesting to completely remove AllocationID? >> > >>>> > >> > >>>> > I'm fine with this change. It seems where AllocationID is used >> can >> > >>>> either >> > >>>> > be removed or be replaced by JobID. This reflects the concept >> that >> > >>>> slots >> > >>>> > are now assigned to a job instead of its individual slot >> requests. >> > >>>> > >> > >>>> > I would like to bring to attention that this also requires >> changes >> > on >> > >>>> the >> > >>>> > TM side, with respect to FLIP-56[1]. >> > >>>> > >> > >>>> > In the context of dynamic slot allocation introduced by FLIP-56, >> > >>>> slots do >> > >>>> > not pre-exist on TM and are dynamically created when RM calls >> > >>>> > TaskExecutorGateway.requestSlot. Since the slots do not >> pre-exist, >> > nor >> > >>>> > their SlotIDs, RM requests slots from TM with a special SlotID >> > >>>> (negative >> > >>>> > slot index). The semantic changes from "requesting the slot >> > >>>> identified by >> > >>>> > the given SlotID" to "requesting a slot with the given resource >> > >>>> profile". >> > >>>> > The AllocationID is used for identifying the dynamic slots in >> such >> > >>>> cases. >> > >>>> > >> > >>>> > >From the perspective of FLIP-56 and fine grained resource >> > >>>> management, I'm >> > >>>> > fine with removing AllocationID. In the meantime, we would need >> TM >> > to >> > >>>> > recognize the special negative indexed SlotID and generate a new >> > >>>> unique >> > >>>> > SlotID for identifying the slot. >> > >>>> > >> > >>>> > *3. Minimum resource requirement* >> > >>>> > >> > >>>> > However, we can let the JobMaster know if we cannot fulfill the >> > >>>> minimum >> > >>>> > >> > >>>> > resource requirement for a job after >> > >>>> > resourcemanager.standalone.start-up-time has passed. >> > >>>> > >> > >>>> > What is the "minimum resource requirement for a job"? Did I >> overlook >> > >>>> > anything? >> > >>>> > >> > >>>> > *4. Offer/free slots between JM/TM* >> > >>>> > >> > >>>> > This probably deserves a separate discussion thread. Just want to >> > >>>> bring it >> > >>>> > up. >> > >>>> > >> > >>>> > The idea has been coming to me for quite some time. Is this >> design, >> > >>>> that JM >> > >>>> > requests resources from RM while accepting/releasing resources >> > >>>> from/to TM, >> > >>>> > the right thing? >> > >>>> > >> > >>>> > The pain point is that events of JM's activities >> > (requesting/releasing >> > >>>> > resources) arrive at RM out of order. This leads to several >> > problems. >> > >>>> > >> > >>>> > - When a job fails and task cancelation takes long, some of >> the >> > >>>> slots >> > >>>> > might be released from the slot pool due to being unused for a >> > >>>> while. Then >> > >>>> > the job restarts and requests these slots again. At this >> time, RM >> > >>>> may >> > >>>> > receive slot requests before noticing from TM heartbeats that >> > >>>> previous >> > >>>> > slots are released, thus requesting new resources. I've seen >> many >> > >>>> times >> > >>>> > that the Yarn cluster has a heavy load and is not allocating >> > >>>> resources >> > >>>> > quickly enough, which leads to slot request timeout and job >> > >>>> failover, and >> > >>>> > during the failover more resources are requested which adds >> more >> > >>>> load to >> > >>>> > the Yarn cluster. Happily, this should be improved with the >> > >>>> declarative >> > >>>> > resource management. :) >> > >>>> > - As described in this FLIP, it is possible that RM learns the >> > >>>> releasing >> > >>>> > of slots from TM heartbeat before noticing the resource >> > requirement >> > >>>> > decreasing, it may allocate more resources which need to be >> > >>>> released soon. >> > >>>> > - It complicates the ResourceManager/SlotManager, by >> requiring an >> > >>>> > additional slot state PENDING, which means the slot is >> assigned >> > by >> > >>>> RM but >> > >>>> > is not confirmed successfully ordered by TM. >> > >>>> > >> > >>>> > Why not just make RM offer the allocated resources (TM address, >> > >>>> SlotID, >> > >>>> > etc.) to JM, and JM release resources to RM? So that for all the >> > >>>> resource >> > >>>> > management JM talks to RM, and for the task deployment and >> execution >> > >>>> it >> > >>>> > talks to TM? >> > >>>> > >> > >>>> > I tried to understand the benefits for having the current design, >> > and >> > >>>> found >> > >>>> > the following in FLIP-6[2]. >> > >>>> > >> > >>>> > >> > >>>> > All that the ResourceManager does is negotiate between the >> > >>>> > cluster-manager, the JobManager, and the TaskManagers. Its state >> can >> > >>>> hence >> > >>>> > be reconstructed from re-acquiring containers and re-registration >> > from >> > >>>> > JobManagers and TaskManagers >> > >>>> > >> > >>>> > Correct me if I'm wrong, it seems the original purpose is to make >> > >>>> sure the >> > >>>> > assignment between jobs and slots are confirmed between JM and >> TMs, >> > >>>> so that >> > >>>> > failures of RM will not lead to any inconsistency. However, this >> > only >> > >>>> > benefits scenarios where RM fails while JM and TMs live. >> Currently, >> > >>>> JM and >> > >>>> > RM are in the same process. We do not really have any scenario >> where >> > >>>> RM >> > >>>> > fails alone. We might separate JM and RM to different processes >> in >> > >>>> future, >> > >>>> > but as far as I can see we don't have such requirements at the >> > >>>> moment. It >> > >>>> > seems to me that we are suffering the current problems, >> complying to >> > >>>> > potential future benefits. >> > >>>> > >> > >>>> > Maybe I overlooked something. >> > >>>> > >> > >>>> > *5. Implementation Plan* >> > >>>> > >> > >>>> > For SlotPool, it sounds quite straightforward to "aggregate >> > >>>> individual slot >> > >>>> > requests". >> > >>>> > >> > >>>> > For Resource/SlotManager, it seems there are quite a lot changes >> > >>>> needed, >> > >>>> > with the removal of individual slot requests and AllocationID. >> It's >> > >>>> not >> > >>>> > clear to me what is the first step plan for RM/SM? Do we >> internally >> > >>>> treat >> > >>>> > the resource requirements as individual slot requests as the >> first >> > >>>> step, so >> > >>>> > only the interfaces are changed? Or do we actually change >> > (practically >> > >>>> > re-write) the slot allocation logics? >> > >>>> > >> > >>>> > Thank you~ >> > >>>> > >> > >>>> > Xintong Song >> > >>>> > >> > >>>> > >> > >>>> > [1] >> > >>>> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation >> > >>>> > [2] >> > >>>> >> > >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 >> > >>>> > >> > >>>> > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler < >> > ches...@apache.org> >> > >>>> <ches...@apache.org> wrote: >> > >>>> > >> > >>>> > >> > >>>> > Hello, >> > >>>> > >> > >>>> > in FLIP-138 we want to rework the way the JobMaster acquires >> slots, >> > >>>> such >> > >>>> > that required resources are declared before a job is scheduled >> and >> > th >> > >>>> > job execution is adjusted according to the provided resources >> (e.g., >> > >>>> > reducing parallelism), instead of asking for a fixed number of >> > >>>> resources >> > >>>> > during scheduling and failing midway through if not enough >> resources >> > >>>> are >> > >>>> > available. >> > >>>> > >> > >>>> > This is a stepping stone towards reactive mode, where Flink will >> > >>>> > automatically make use of new TaskExecutors being started. >> > >>>> > >> > >>>> > More details can be found here >> > >>>> > < >> > >>>> >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management >> > >>>> > >> > >>>> > . >> > >>>> > >> > >>>> > >> > >>>> > >> > >>>> >> > >>> >> > >>> >> > >> >> > >> >