Thanks for preparing the FLIP and driving this discussion, @Chesnay & @Till.
I really like the idea. I see a great value in the proposed declarative resource management, in terms of flexibility, usability and efficiency. I have a few comments and questions regarding the FLIP design. In general, the protocol design makes good sense to me. My main concern is that it is not very clear to me what changes are required from the Resource/SlotManager side to adapt to the new protocol. *1. Distributed slots across different jobs* Jobs which register their requirements first, will have precedence over > other jobs also if the requirements change during the runtime. Just trying to understand, does this mean jobs are prioritized by the order of their first resource declaring? *2. AllocationID* Is this FLIP suggesting to completely remove AllocationID? I'm fine with this change. It seems where AllocationID is used can either be removed or be replaced by JobID. This reflects the concept that slots are now assigned to a job instead of its individual slot requests. I would like to bring to attention that this also requires changes on the TM side, with respect to FLIP-56[1]. In the context of dynamic slot allocation introduced by FLIP-56, slots do not pre-exist on TM and are dynamically created when RM calls TaskExecutorGateway.requestSlot. Since the slots do not pre-exist, nor their SlotIDs, RM requests slots from TM with a special SlotID (negative slot index). The semantic changes from "requesting the slot identified by the given SlotID" to "requesting a slot with the given resource profile". The AllocationID is used for identifying the dynamic slots in such cases. >From the perspective of FLIP-56 and fine grained resource management, I'm fine with removing AllocationID. In the meantime, we would need TM to recognize the special negative indexed SlotID and generate a new unique SlotID for identifying the slot. *3. Minimum resource requirement* However, we can let the JobMaster know if we cannot fulfill the minimum > resource requirement for a job after > resourcemanager.standalone.start-up-time has passed. What is the "minimum resource requirement for a job"? Did I overlook anything? *4. Offer/free slots between JM/TM* This probably deserves a separate discussion thread. Just want to bring it up. The idea has been coming to me for quite some time. Is this design, that JM requests resources from RM while accepting/releasing resources from/to TM, the right thing? The pain point is that events of JM's activities (requesting/releasing resources) arrive at RM out of order. This leads to several problems. - When a job fails and task cancelation takes long, some of the slots might be released from the slot pool due to being unused for a while. Then the job restarts and requests these slots again. At this time, RM may receive slot requests before noticing from TM heartbeats that previous slots are released, thus requesting new resources. I've seen many times that the Yarn cluster has a heavy load and is not allocating resources quickly enough, which leads to slot request timeout and job failover, and during the failover more resources are requested which adds more load to the Yarn cluster. Happily, this should be improved with the declarative resource management. :) - As described in this FLIP, it is possible that RM learns the releasing of slots from TM heartbeat before noticing the resource requirement decreasing, it may allocate more resources which need to be released soon. - It complicates the ResourceManager/SlotManager, by requiring an additional slot state PENDING, which means the slot is assigned by RM but is not confirmed successfully ordered by TM. Why not just make RM offer the allocated resources (TM address, SlotID, etc.) to JM, and JM release resources to RM? So that for all the resource management JM talks to RM, and for the task deployment and execution it talks to TM? I tried to understand the benefits for having the current design, and found the following in FLIP-6[2]. > All that the ResourceManager does is negotiate between the > cluster-manager, the JobManager, and the TaskManagers. Its state can hence > be reconstructed from re-acquiring containers and re-registration from > JobManagers and TaskManagers Correct me if I'm wrong, it seems the original purpose is to make sure the assignment between jobs and slots are confirmed between JM and TMs, so that failures of RM will not lead to any inconsistency. However, this only benefits scenarios where RM fails while JM and TMs live. Currently, JM and RM are in the same process. We do not really have any scenario where RM fails alone. We might separate JM and RM to different processes in future, but as far as I can see we don't have such requirements at the moment. It seems to me that we are suffering the current problems, complying to potential future benefits. Maybe I overlooked something. *5. Implementation Plan* For SlotPool, it sounds quite straightforward to "aggregate individual slot requests". For Resource/SlotManager, it seems there are quite a lot changes needed, with the removal of individual slot requests and AllocationID. It's not clear to me what is the first step plan for RM/SM? Do we internally treat the resource requirements as individual slot requests as the first step, so only the interfaces are changed? Or do we actually change (practically re-write) the slot allocation logics? Thank you~ Xintong Song [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation [2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077 On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler <ches...@apache.org> wrote: > Hello, > > in FLIP-138 we want to rework the way the JobMaster acquires slots, such > that required resources are declared before a job is scheduled and th > job execution is adjusted according to the provided resources (e.g., > reducing parallelism), instead of asking for a fixed number of resources > during scheduling and failing midway through if not enough resources are > available. > > This is a stepping stone towards reactive mode, where Flink will > automatically make use of new TaskExecutors being started. > > More details can be found here > < > https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management > >. > >