Thanks for the clarification @Till Rohrmann <trohrm...@apache.org> >> # Implications for the scheduling Agreed that it turned out to be different execution strategies for batch jobs. We can have a simple one first and improve it later.
Thanks, Zhu Xintong Song <tonysong...@gmail.com> 于2020年8月31日周一 下午3:05写道: > Thanks for the clarification, @Till. > > - For FLIP-56, sounds good to me. I think there should be no problem before > removing AllocationID. And even after replacing AllocationID, it should > only require limited effort to make FLIP-56 work with SlotID. I was just > trying to understand when the effort will be needed. > > - For offer/release slots between JM/TM, I think you are right. > Waiting on the confirmation for resource requirement decrease before > freeing the slot is quite equivalent to releasing slots through RM, in > terms of it practically preventing JM from releasing slots when the RM is > absent. But this approach obviously requires less change to the current > mechanism. > Since the first problem can be solved by the declarative protocol, and the > second problem can be addressed by this confirmation based approach, ATM I > don't see any strong reason for changing to offering and releasing slots > through RM, especially considering the significant changes it requires. > > Thank you~ > > Xintong Song > > > > On Fri, Aug 28, 2020 at 10:07 PM Till Rohrmann <trohrm...@apache.org> > wrote: > > > Thanks for creating this FLIP @Chesnay and the good input @Xintong and > @Zhu > > Zhu. > > > > Let me try to add some comments concerning your questions: > > > > # FLIP-56 > > > > I think there is nothing fundamentally contradicting FLIP-56 in the FLIP > > for declarative resource management. As Chesnay said, we have to keep the > > AllocationID around as long as we have the old scheduler implementation. > > Once it is replaced, we can think about using the SlotID instead of > > AllocationIDs for identifying allocated slots. For dynamic slots we can > > keep the special meaning of a SlotID with a negative index. In the future > > we might think about making this encoding a bit more explicit by sending > a > > richer slot request object and reporting the actual SlotID back to the > RM. > > > > For the question of resource utilization vs. deployment latency I believe > > that this will be a question of requirements and preferences as you've > said > > Xintong. I can see that we will have different strategies to fulfill the > > different needs. > > > > # Offer/free slots between JM/TM > > > > You are right Xintong that the existing slot protocol was developed with > > the assumption in mind that the RM and JM can run in separate processes > and > > that a failure of the RM should only affect the JM in the sense that it > > cannot ask for more resources. I believe that one could simplify things a > > bit under the assumption that the RM and JM are always colocated in the > > same process. However, the discussion whether to change it or not should > > indeed be a separate one. > > > > Changing the slot protocol to a declarative resource management should > > already solve the first problem you have described because we won't ask > for > > new slots in case of a failover but simply keep the same resource > > requirements declared and let the RM make sure that we will receive at > > least this amount of slots. > > > > If releasing a slot should lead to allocating new resources because > > decreasing the resource requirement declaration takes longer than > releasing > > the slot on the TM, then we could apply what Chesnay said. By waiting on > > the confirmation of the resource requirement decrease and then freeing > the > > slot on the TM gives you effectively the same behaviour as if the freeing > > of the slot would be done by the RM. > > > > I am not entirely sure whether allocating the slots and receiving the > slot > > offers through the RM will allow us to get rid of the pending slot state > on > > the RM side. If the RM needs to communicate with the TM and we want to > have > > a reconciliation protocol between these components, then I think we would > > have to solve the exact same problem of currently waiting on the TM for > > confirming that a slot has been allocated. > > > > # Implications for the scheduling > > > > The FLIP does not fully cover the changes for the scheduler and mainly > > drafts the rough idea. For the batch scheduling, I believe that we have a > > couple degrees of freedom in how to do things. In the scenario you > > described, one could choose a simple strategy where we wait for all > > producers to stop before deciding on the parallelism of the consumer and > > scheduling the respective tasks (even though they have POINTWISE BLOCKING > > edges). Or we can try to be smart and say if we get at least one slot > that > > we can run the consumers with the same parallelism as the producers it > just > > might be that we have to run them one after another in a single slot. One > > advantage of not directly schedule the first consumer when the first > > producer is finished is that one might schedule the consumer stage with a > > higher parallelism because one might acquire more resources a bit later. > > But I would see this as different execution strategies which have > different > > properties. > > > > Cheers, > > Till > > > > On Fri, Aug 28, 2020 at 11:21 AM Zhu Zhu <reed...@gmail.com> wrote: > > > > > Thanks for the explanation @Chesnay Schepler <ches...@apache.org> . > > > > > > Yes, for batch jobs it can be safe to schedule downstream vertices if > > > there > > > are enough slots in the pool, even if these slots are still in use at > > that > > > moment. > > > And the job can still progress even if the vertices stick to the > original > > > parallelism. > > > > > > Looks to me several decision makings can be different for streaming and > > > batch jobs. > > > Looking forward to the follow-up FLIP on the lazy ExecutionGraph > > > construction! > > > > > > Thanks, > > > Zhu > > > > > > Chesnay Schepler <ches...@apache.org> 于2020年8月28日周五 下午4:35写道: > > > > > >> Maybe :) > > >> > > >> Imagine a case where the producer and consumer have the same > > >> ResourceProfile, or at least one where the consumer requirements are > > less > > >> than the producer ones. > > >> In this case, the scheduler can happily schedule consumers, because it > > >> knows it will get enough slots. > > >> > > >> If the profiles are different, then the Scheduler _may_ wait > > >> numberOf(producer) slots; it _may_ also stick with the parallelism and > > >> schedule right away, in the worst case running the consumers in > > sequence. > > >> In fact, for batch jobs there is probably(?) never a reason for the > > >> scheduler to _reduce_ the parallelism; it can always try to run things > > in > > >> sequence if it doesn't get enough slots. > > >> Reducing the parallelism would just mean that you'd have to wait for > > more > > >> producers to finish. > > >> > > >> The scope of this FLIP is just the protocol, without changes to the > > >> scheduler; in other words just changing how slots are acquired, but > > change > > >> nothing about the scheduling. That is tackled in a follow-up FLIP. > > >> > > >> On 28/08/2020 07:34, Zhu Zhu wrote: > > >> > > >> Thanks for the response! > > >> > > >> >> The scheduler doesn't have to wait for one stage to finish > > >> Does it mean we will declare resources and decide the parallelism for > a > > >> stage which is partially > > >> schedulable, i.e. when input data are ready just for part of the > > >> execution vertices? > > >> > > >> >> This will get more complicated once we allow the scheduler to > change > > >> the parallelism while the job is running > > >> Agreed. Looks to me it's a problem for batch jobs only and can be > > avoided > > >> for streaming jobs. > > >> Will this FLIP limit its scope to streaming jobs, and improvements for > > >> batch jobs are to be done later? > > >> > > >> Thanks, > > >> Zhu > > >> > > >> Chesnay Schepler <ches...@apache.org> 于2020年8月28日周五 上午2:27写道: > > >> > > >>> The scheduler doesn't have to wait for one stage to finish. It is > still > > >>> aware that the upstream execution vertex has finished, and can > > request/use > > >>> slots accordingly to schedule the consumer. > > >>> > > >>> This will get more complicated once we allow the scheduler to change > > the > > >>> parallelism while the job is running, for which we will need some > > >>> enhancements to the network stack to allow the producer to run > without > > >>> knowing the consumer parallelism ahead of time. I'm not too clear on > > the > > >>> details, but we'll some form of keygroup-like approach for sub > > partitions > > >>> (maxParallelism and all that). > > >>> > > >>> On 27/08/2020 20:05, Zhu Zhu wrote: > > >>> > > >>> Thanks Chesnay&Till for proposing this improvement. > > >>> It's of good value to allow jobs to make best use of available > > resources > > >>> adaptively. Not > > >>> to mention it further supports reactive mode. > > >>> So big +1 for it. > > >>> > > >>> I have a minor concern about possible regression in certain cases due > > to > > >>> the proposed > > >>> JobVertex-wise scheduling which replaces current ExecutionVertex-wise > > >>> scheduling. > > >>> In the proposal, looks to me it requires a stage to finish before its > > >>> consumer stage can be > > >>> scheduled. This limitation, however, does not exist in current > > >>> scheduler. In the case that there > > >>> exists a POINTWISE BLOCKING edge, the downstream execution region can > > be > > >>> scheduled > > >>> right after its connected upstream execution vertices finishes, even > > >>> before the whole upstream > > >>> stage finishes. This allows the region to be launched earlier and > make > > >>> use of available resources. > > >>> Do we need to let the new scheduler retain this property? > > >>> > > >>> Thanks, > > >>> Zhu > > >>> > > >>> Xintong Song <tonysong...@gmail.com> 于2020年8月26日周三 下午6:59写道: > > >>> > > >>>> Thanks for the quick response. > > >>>> > > >>>> *Job prioritization, Allocation IDs, Minimum resource > > >>>> requirements, SlotManager Implementation Plan:* Sounds good to me. > > >>>> > > >>>> *FLIP-56* > > >>>> Good point about the trade-off. I believe maximum resource > utilization > > >>>> and > > >>>> quick deployment are desired in different scenarios. E.g., a long > > >>>> running > > >>>> streaming job deserves some deployment latency to improve the > resource > > >>>> utilization, which benefits the entire lifecycle of the job. On the > > >>>> other > > >>>> hand, short batch queries may prefer quick deployment, otherwise the > > >>>> time > > >>>> for resource allocation might significantly increase the response > > time. > > >>>> It would be good enough for me to bring these questions to > attention. > > >>>> Nothing that I'm aware of should block this FLIP. > > >>>> > > >>>> Thank you~ > > >>>> > > >>>> Xintong Song > > >>>> > > >>>> > > >>>> > > >>>> On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler < > ches...@apache.org> > > >>>> wrote: > > >>>> > > >>>> > Thank you Xintong for your questions! > > >>>> > Job prioritization > > >>>> > Yes, the job which declares it's initial requirements first is > > >>>> prioritized. > > >>>> > This is very much for simplicity; for example this avoids the > nasty > > >>>> case > > >>>> > where all jobs get some resources, but none get enough to actually > > >>>> run the > > >>>> > job. > > >>>> > Minimum resource requirements > > >>>> > > > >>>> > My bad; at some point we want to allow the JobMaster to declare a > > >>>> range of > > >>>> > resources it could use to run a job, for example min=1, target=10, > > >>>> > max=+inf. > > >>>> > > > >>>> > With this model, the RM would then try to balance the resources > such > > >>>> that > > >>>> > as many jobs as possible are as close to the target state as > > possible. > > >>>> > > > >>>> > Currently, the minimum/target/maximum resources are all the same. > So > > >>>> the > > >>>> > notification is sent whenever the current requirements cannot be > > met. > > >>>> > Allocation IDs > > >>>> > We do intend to, at the very least, remove AllocationIDs on the > > >>>> > SlotManager side, as they are just not required there. > > >>>> > > > >>>> > On the slotpool side we have to keep them around at least until > the > > >>>> > existing Slotpool implementations are removed (not sure whether > > we'll > > >>>> fully > > >>>> > commit to this in 1.12), since the interfaces use AllocationIDs, > > >>>> which also > > >>>> > bleed into the JobMaster. > > >>>> > The TaskExecutor is in a similar position. > > >>>> > But in the long-term, yes they will be removed, and most usages > will > > >>>> > probably be replaced by the SlotID. > > >>>> > FLIP-56 > > >>>> > > > >>>> > Dynamic slot allocations are indeed quite interesting and raise a > > few > > >>>> > questions; for example, the main purpose of it is to ensure > maximum > > >>>> > resource utilization. In that case, should the JobMaster be > allowed > > to > > >>>> > re-use a slot it if the task requires less resources than the slot > > >>>> > provides, or should it always request a new slot that exactly > > matches? > > >>>> > > > >>>> > There is a trade-off to be made between maximum resource > utilization > > >>>> > (request exactly matching slots, and only re-use exact matches) > and > > >>>> quicker > > >>>> > job deployment (re-use slot even if they don't exactly match, skip > > >>>> > round-trip to RM). > > >>>> > > > >>>> > As for how to handle the lack of a preemptively known SlotIDs, > that > > >>>> should > > >>>> > be fine in and of itself; we already handle a similar case when we > > >>>> request > > >>>> > a new TaskExecutor to be started. So long as there is some way to > > >>>> know how > > >>>> > many resources the TaskExecutor has in total I do not see a > problem > > >>>> at the > > >>>> > moment. We will get the SlotID eventually by virtue of the > heartbeat > > >>>> > SlotReport. > > >>>> > Implementation plan (SlotManager) > > >>>> > You are on the right track. The SlotManager tracks the declared > > >>>> resource > > >>>> > requirements, and if the requirements increased it creates a > > >>>> SlotRequest, > > >>>> > which then goes through similar code paths as we have at the > moment > > >>>> (try to > > >>>> > find a free slot, if found tell the TM, otherwise try to request > new > > >>>> TM). > > >>>> > The SlotManager changes are not that substantial to get a working > > >>>> version; > > >>>> > we have a PoC and most of the work went into refactoring the > > >>>> SlotManager > > >>>> > into a more manageable state. (split into several components, > > >>>> stricter and > > >>>> > simplified Slot life-cycle, ...). > > >>>> > Offer/free slots between JM/TM > > >>>> > Gotta run, but that's a good question and I'll think about. But I > > >>>> think it > > >>>> > comes down to making less changes, and being able to leverage > > existing > > >>>> > reconciliation protocols. > > >>>> > Do note that TaskExecutor also explicitly inform the RM about > freed > > >>>> slots; > > >>>> > the heartbeat slot report is just a safety net. > > >>>> > I'm not sure whether slot requests are able to overtake a slot > > >>>> release; > > >>>> > @till do you have thoughts on that? > > >>>> > As for the race condition between the requirements reduction and > > slot > > >>>> > release, if we run into problems we have the backup plan of only > > >>>> releasing > > >>>> > the slot after the requirement reduction has been acknowledged. > > >>>> > > > >>>> > On 26/08/2020 10:31, Xintong Song wrote: > > >>>> > > > >>>> > Thanks for preparing the FLIP and driving this discussion, > @Chesnay > > & > > >>>> @Till. > > >>>> > > > >>>> > I really like the idea. I see a great value in the proposed > > >>>> declarative > > >>>> > resource management, in terms of flexibility, usability and > > >>>> efficiency. > > >>>> > > > >>>> > I have a few comments and questions regarding the FLIP design. In > > >>>> general, > > >>>> > the protocol design makes good sense to me. My main concern is > that > > >>>> it is > > >>>> > not very clear to me what changes are required from the > > >>>> > Resource/SlotManager side to adapt to the new protocol. > > >>>> > > > >>>> > *1. Distributed slots across different jobs* > > >>>> > > > >>>> > Jobs which register their requirements first, will have precedence > > >>>> over > > >>>> > > > >>>> > other jobs also if the requirements change during the runtime. > > >>>> > > > >>>> > Just trying to understand, does this mean jobs are prioritized by > > the > > >>>> order > > >>>> > of their first resource declaring? > > >>>> > > > >>>> > *2. AllocationID* > > >>>> > > > >>>> > Is this FLIP suggesting to completely remove AllocationID? > > >>>> > > > >>>> > I'm fine with this change. It seems where AllocationID is used can > > >>>> either > > >>>> > be removed or be replaced by JobID. This reflects the concept that > > >>>> slots > > >>>> > are now assigned to a job instead of its individual slot requests. > > >>>> > > > >>>> > I would like to bring to attention that this also requires changes > > on > > >>>> the > > >>>> > TM side, with respect to FLIP-56[1]. > > >>>> > > > >>>> > In the context of dynamic slot allocation introduced by FLIP-56, > > >>>> slots do > > >>>> > not pre-exist on TM and are dynamically created when RM calls > > >>>> > TaskExecutorGateway.requestSlot. Since the slots do not pre-exist, > > nor > > >>>> > their SlotIDs, RM requests slots from TM with a special SlotID > > >>>> (negative > > >>>> > slot index). The semantic changes from "requesting the slot > > >>>> identified by > > >>>> > the given SlotID" to "requesting a slot with the given resource > > >>>> profile". > > >>>> > The AllocationID is used for identifying the dynamic slots in such > > >>>> cases. > > >>>> > > > >>>> > >From the perspective of FLIP-56 and fine grained resource > > >>>> management, I'm > > >>>> > fine with removing AllocationID. In the meantime, we would need TM > > to > > >>>> > recognize the special negative indexed SlotID and generate a new > > >>>> unique > > >>>> > SlotID for identifying the slot. > > >>>> > > > >>>> > *3. Minimum resource requirement* > > >>>> > > > >>>> > However, we can let the JobMaster know if we cannot fulfill the > > >>>> minimum > > >>>> > > > >>>> > resource requirement for a job after > > >>>> > resourcemanager.standalone.start-up-time has passed. > > >>>> > > > >>>> > What is the "minimum resource requirement for a job"? Did I > overlook > > >>>> > anything? > > >>>> > > > >>>> > *4. Offer/free slots between JM/TM* > > >>>> > > > >>>> > This probably deserves a separate discussion thread. Just want to > > >>>> bring it > > >>>> > up. > > >>>> > > > >>>> > The idea has been coming to me for quite some time. Is this > design, > > >>>> that JM > > >>>> > requests resources from RM while accepting/releasing resources > > >>>> from/to TM, > > >>>> > the right thing? > > >>>> > > > >>>> > The pain point is that events of JM's activities > > (requesting/releasing > > >>>> > resources) arrive at RM out of order. This leads to several > > problems. > > >>>> > > > >>>> > - When a job fails and task cancelation takes long, some of the > > >>>> slots > > >>>> > might be released from the slot pool due to being unused for a > > >>>> while. Then > > >>>> > the job restarts and requests these slots again. At this time, > RM > > >>>> may > > >>>> > receive slot requests before noticing from TM heartbeats that > > >>>> previous > > >>>> > slots are released, thus requesting new resources. I've seen > many > > >>>> times > > >>>> > that the Yarn cluster has a heavy load and is not allocating > > >>>> resources > > >>>> > quickly enough, which leads to slot request timeout and job > > >>>> failover, and > > >>>> > during the failover more resources are requested which adds > more > > >>>> load to > > >>>> > the Yarn cluster. Happily, this should be improved with the > > >>>> declarative > > >>>> > resource management. :) > > >>>> > - As described in this FLIP, it is possible that RM learns the > > >>>> releasing > > >>>> > of slots from TM heartbeat before noticing the resource > > requirement > > >>>> > decreasing, it may allocate more resources which need to be > > >>>> released soon. > > >>>> > - It complicates the ResourceManager/SlotManager, by requiring > an > > >>>> > additional slot state PENDING, which means the slot is assigned > > by > > >>>> RM but > > >>>> > is not confirmed successfully ordered by TM. > > >>>> > > > >>>> > Why not just make RM offer the allocated resources (TM address, > > >>>> SlotID, > > >>>> > etc.) to JM, and JM release resources to RM? So that for all the > > >>>> resource > > >>>> > management JM talks to RM, and for the task deployment and > execution > > >>>> it > > >>>> > talks to TM? > > >>>> > > > >>>> > I tried to understand the benefits for having the current design, > > and > > >>>> found > > >>>> > the following in FLIP-6[2]. > > >>>> > > > >>>> > > > >>>> > All that the ResourceManager does is negotiate between the > > >>>> > cluster-manager, the JobManager, and the TaskManagers. Its state > can > > >>>> hence > > >>>> > be reconstructed from re-acquiring containers and re-registration > > from > > >>>> > JobManagers and TaskManagers > > >>>> > > > >>>> > Correct me if I'm wrong, it seems the original purpose is to make > > >>>> sure the > > >>>> > assignment between jobs and slots are confirmed between JM and > TMs, > > >>>> so that > > >>>> > failures of RM will not lead to any inconsistency. However, this > > only > > >>>> > benefits scenarios where RM fails while JM and TMs live. > Currently, > > >>>> JM and > > >>>> > RM are in the same process. We do not really have any scenario > where > > >>>> RM > > >>>> > fails alone. We might separate JM and RM to different processes in > > >>>> future, > > >>>> > but as far as I can see we don't have such requirements at the > > >>>> moment. It > > >>>> > seems to me that we are suffering the current problems, complying > to > > >>>> > potential future benefits. > > >>>> > > > >>>> > Maybe I overlooked something. > > >>>> > > > >>>> > *5. Implementation Plan* > > >>>> > > > >>>> > For SlotPool, it sounds quite straightforward to "aggregate > > >>>> individual slot > > >>>> > requests". > > >>>> > > > >>>> > For Resource/SlotManager, it seems there are quite a lot changes > > >>>> needed, > > >>>> > with the removal of individual slot requests and AllocationID. > It's > > >>>> not > > >>>> > clear to me what is the first step plan for RM/SM? Do we > internally > > >>>> treat > > >>>> > the resource requirements as individual slot requests as the first > > >>>> step, so > > >>>> > only the interfaces are changed? Or do we actually change > > (practically > > >>>> > re-write) the slot allocation logics? > > >>>> > > > >>>> > Thank you~ > > >>>> > > > >>>> > Xintong Song > > >>>> > > > >>>> > > > >>>> > [1] > > >>>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation > > >>>> > [2] > > >>>> > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 > > >>>> > > > >>>> > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler < > > ches...@apache.org> > > >>>> <ches...@apache.org> wrote: > > >>>> > > > >>>> > > > >>>> > Hello, > > >>>> > > > >>>> > in FLIP-138 we want to rework the way the JobMaster acquires > slots, > > >>>> such > > >>>> > that required resources are declared before a job is scheduled and > > th > > >>>> > job execution is adjusted according to the provided resources > (e.g., > > >>>> > reducing parallelism), instead of asking for a fixed number of > > >>>> resources > > >>>> > during scheduling and failing midway through if not enough > resources > > >>>> are > > >>>> > available. > > >>>> > > > >>>> > This is a stepping stone towards reactive mode, where Flink will > > >>>> > automatically make use of new TaskExecutors being started. > > >>>> > > > >>>> > More details can be found here > > >>>> > < > > >>>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management > > >>>> > > > >>>> > . > > >>>> > > > >>>> > > > >>>> > > > >>>> > > >>> > > >>> > > >> > > >