Hi Rui, Not sure if I understand your question correctly. The two modes are not the same: {taskmanager.load-balance.mode: Slots} = {cluster.evenly-spread-out-slots: true, slot.sharing-strategy: LOCAL_INPUT_PREFERRED} {taskmanager.load-balance.mode: Tasks} = {cluster.evenly-spread-out-slots: true, slot.sharing-strategy: TASK_BALANCED_PREFERRED}
Thanks, Zhu Rui Fan <1996fan...@gmail.com> 于2023年10月10日周二 10:27写道: > Hi Zhu, > > Thanks for your feedback! > > >> 2. When it's set to Tasks, how to assign slots to TM? > > It's option2 at the moment. However, I think it's just implementation > > details and can be changed/refined later. > > > > As you mentioned in another comment, 'taskmanager.load-balance.mode' is > > a user oriented configuration. The goal is to achieve load balance, > while > > the load can be defined as allocated slots or assigned tasks. > > The 'Tasks' mode, just the same as what is proposed in the FLIP, > currently > > use the mechanism of 'cluster.evenly-spread-out-slots' to help to > achieve > > balanced number of tasks. It's not perfect, but has acceptable > effectiveness > > and lower implementation complexity. > > > > The 'Slots' mode is needed for compatible reasons. Users that are > satisfied > > with the current ability of 'cluster.evenly-spread-out-slots' can > continue > > using it after the config 'cluster.evenly-spread-out-slots' is > deprecated. > > IIUC, the 'Slots' mode is needed for compatibility with > 'cluster.evenly-spread-out-slots'. > The reason I ask this question is: if the behavior and logic of 'Slots' > and > 'Tasks' are exactly the same, it feels a bit strange to define two > enumerations. > And it may cause confusion to users. > > If they are totally the same, how about combining them to SlotsAndTasks? > It can be compatible with 'cluster.evenly-spread-out-slots', and avoid > the redundant enum. Of course, if the name(SlotsAndTasks) is ugly, > we can discuss it. The core idea is combining them. > > WDYT? > > Best, > Rui > > On Mon, Oct 9, 2023 at 3:24 PM Zhu Zhu <reed...@gmail.com> wrote: > >> Thanks for the response, Rui and Yuepeng. >> >> >> Rui >> > 1. The default value is None, right? >> Exactly. >> >> > 2. When it's set to Tasks, how to assign slots to TM? >> It's option2 at the moment. However, I think it's just implementation >> details and can be changed/refined later. >> >> As you mentioned in another comment, 'taskmanager.load-balance.mode' is >> a user oriented configuration. The goal is to achieve load balance, while >> the load can be defined as allocated slots or assigned tasks. >> The 'Tasks' mode, just the same as what is proposed in the FLIP, currently >> use the mechanism of 'cluster.evenly-spread-out-slots' to help to achieve >> balanced number of tasks. It's not perfect, but has acceptable >> effectiveness >> and lower implementation complexity. >> >> The 'Slots' mode is needed for compatible reasons. Users that are >> satisfied >> with the current ability of 'cluster.evenly-spread-out-slots' can continue >> using it after the config 'cluster.evenly-spread-out-slots' is deprecated. >> >> >> >> Yuepeng >> I think what users want is load balance. The combination is implementation >> details and should be transparent to users. >> >> Meanwhile, I think locality does not entirely conflict with load balance. >> In fact, >> they should be both considered when assigning tasks. Usually, state >> locality >> should have the highest priority, and input locality can also be taken >> care >> of when trying to balance tasks to slots and TMs. We can see that the most >> important input locality, i.e. forward, is always covered in this FLIP >> when >> computing slot sharing groups. It can be further optimized if we find it >> problematic. >> >> Thanks, >> Zhu >> >> Yangze Guo <karma...@gmail.com> 于2023年10月8日周日 13:53写道: >> >>> Thanks for the updates, Rui. >>> >>> It does seem challenging to ensure evenness in slot deployment unless >>> we introduce batch slot requests in SlotPool. However, one possibility >>> is to add a delay of around 50ms during the SlotPool's resource >>> requirement declaration to the ResourceManager, similar to the >>> checkResourceRequirementsWithDelay in the SlotManager. In most cases, >>> this delay would allow the SlotManager to see all resource >>> requirements, then it can allocate the slot more evenly. As a side >>> effect, it could also significantly reduce the number of RPC messages >>> to the ResourceManager, which could become a single-point bottleneck >>> in OLAP scenarios. WDYT? >>> >>> Best, >>> Yangze Guo >>> >>> On Sat, Oct 7, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> wrote: >>> > >>> > Hi Yangze, >>> > >>> > Thanks for your quick response! >>> > >>> > Sorry, I re-read the 2.2.2 part[1] about the Waiting Mechanism, I found >>> > it isn't clear. The root cause of introducing the waiting mechanism is >>> > that the slot requests are sent from JobMaster to SlotPool is >>> > one by one instead of one whole batch. I have rewritten the 2.2.2 part, >>> > please read it again in your free time. >>> > >>> > [1] >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling#FLIP370:SupportBalancedTasksScheduling-2.2.2Waitingmechanism >>> > >>> > Best, >>> > Rui >>> > >>> > On Sat, Oct 7, 2023 at 4:34 PM Yangze Guo <karma...@gmail.com> wrote: >>> >> >>> >> Thanks for the clarification, Rui. >>> >> >>> >> I believe the root cause of this issue is that in the current >>> >> DefaultResourceAllocationStrategy, slot allocation begins before the >>> >> decision to PendingTaskManagers requesting is made. That can be fixed >>> >> within the strategy without introducing another waiting mechanism. I >>> >> think it would be better to address this issue within the scope of >>> >> this FLIP. However, I don't have a strong opinion on it, it depends on >>> >> your bandwidth. >>> >> >>> >> >>> >> Best, >>> >> Yangze Guo >>> >> >>> >> On Sat, Oct 7, 2023 at 4:16 PM Rui Fan <1996fan...@gmail.com> wrote: >>> >> > >>> >> > Hi Yangze, >>> >> > >>> >> > > 2. From my understanding, if user enable the >>> >> > > cluster.evenly-spread-out-slots, >>> >> > > LeastUtilizationResourceMatchingStrategy will be used to >>> determine the >>> >> > > slot distribution and the slot allocation in the three TM will be >>> >> > > (taskmanager.numberOfTaskSlots=3): >>> >> > > TM1: 3 slot >>> >> > > TM2: 2 slot >>> >> > > TM3: 2 slot >>> >> > >>> >> > When all tms are ready in advance, the three TM will be: >>> >> > TM1: 3 slot >>> >> > TM2: 2 slot >>> >> > TM3: 2 slot >>> >> > >>> >> > For application mode, the resource manager doesn't apply for >>> >> > TM in advance, and slots aren't enough before the third TM is ready. >>> >> > So all slots of the second TM will be used up. The three TM will be: >>> >> > TM1: 3 slot >>> >> > TM2: 3 slot >>> >> > TM3: 1 slot >>> >> > >>> >> > That's why the FLIP add some notes: >>> >> > >>> >> > All free slots are in the last TM, because ResourceManager doesn’t >>> have the waiting mechanism, and it just requests 7 slots for this JobMaster. >>> >> > Why is it acceptable? >>> >> > >>> >> > If we just add the waiting mechanism to JobMaster but not in >>> ResourceManager, all free slots will be in the last TM. All slots of other >>> TMs are offered to JM. >>> >> > That is, only one TM may have fewer tasks than the other TMs. The >>> difference between the number of tasks of other TMs is at most 1.So When p >>> >> slotsPerTM, the problem can be ignored. >>> >> > We can also suggest users, in cases that p is small, it's better to >>> configure slotsPerTM to 1, or let p % slotsPerTM == 0. >>> >> > >>> >> > Please correct me if my understanding is wrong, thanks~ >>> >> > >>> >> > Best, >>> >> > Rui >>> >> > >>> >> > On Sun, Oct 1, 2023 at 7:38 PM Yangze Guo <karma...@gmail.com> >>> wrote: >>> >> >> >>> >> >> Hi, Rui, >>> >> >> >>> >> >> 1. With the current mechanism, when physical slots are offered from >>> >> >> TM, the JobMaster will start deploying tasks and synchronizing >>> their >>> >> >> states. With the addition of the waiting mechanism, IIUC, the >>> >> >> JobMaster will deploy and synchronize the states of all tasks only >>> >> >> after all resources are available. The task deployment and state >>> >> >> synchronization both occupy the JobMaster's RPC main thread. In >>> >> >> complex jobs with a lot of tasks, this waiting mechanism may >>> increase >>> >> >> the pressure on the JobMaster and increase the end-to-end job >>> >> >> deployment time. >>> >> >> >>> >> >> 2. From my understanding, if user enable the >>> >> >> cluster.evenly-spread-out-slots, >>> >> >> LeastUtilizationResourceMatchingStrategy will be used to determine >>> the >>> >> >> slot distribution and the slot allocation in the three TM will be >>> >> >> (taskmanager.numberOfTaskSlots=3): >>> >> >> TM1: 3 slot >>> >> >> TM2: 2 slot >>> >> >> TM3: 2 slot >>> >> >> >>> >> >> Best, >>> >> >> Yangze Guo >>> >> >> >>> >> >> On Sun, Oct 1, 2023 at 6:14 PM Rui Fan <1996fan...@gmail.com> >>> wrote: >>> >> >> > >>> >> >> > Hi Shammon, >>> >> >> > >>> >> >> > Thanks for your feedback as well! >>> >> >> > >>> >> >> > > IIUC, the overall balance is divided into two parts: slot to >>> TM and task >>> >> >> > to slot. >>> >> >> > > 1. Slot to TM is guaranteed by SlotManager in ResourceManager >>> >> >> > > 2. Task to slot is guaranteed by the slot pool in JM >>> >> >> > > >>> >> >> > > These two are completely independent, what are the benefits of >>> unifying >>> >> >> > > these two into one option? Also, do we want to share the same >>> >> >> > > option between SlotPool in JM and SlotManager in RM? This >>> sounds a bit >>> >> >> > > strange. >>> >> >> > >>> >> >> > Your understanding is totally right, the balance needs 2 parts: >>> slot to TM >>> >> >> > and task to slot. >>> >> >> > >>> >> >> > As I understand, the following are benefits of unifying them >>> into one >>> >> >> > option: >>> >> >> > >>> >> >> > - Flink users don't care about these principles inside of flink, >>> they don't >>> >> >> > know these 2 parts. >>> >> >> > - If flink provides 2 options, flink users need to set 2 options >>> for their >>> >> >> > job. >>> >> >> > - If one option is missed, the final result may not be good. >>> (Users may >>> >> >> > have questions when using) >>> >> >> > - If flink just provides 1 option, enabling one option is >>> enough. (Reduce >>> >> >> > the probability of misconfiguration) >>> >> >> > >>> >> >> > Also, Flink’s options are user-oriented. Each option represents >>> a switch or >>> >> >> > parameter of a feature. >>> >> >> > A feature may be composed of multiple components inside Flink. >>> >> >> > It might be better to keep only one switch per feature. >>> >> >> > >>> >> >> > Actually, the cluster.evenly-spread-out-slots option is used >>> between >>> >> >> > SlotPool in JM and SlotManager in RM. 2 components to ensure >>> >> >> > this feature works well. >>> >> >> > >>> >> >> > Please correct me if my understanding is wrong, >>> >> >> > and looking forward to your feedback, thanks! >>> >> >> > >>> >> >> > Best, >>> >> >> > Rui >>> >> >> > >>> >> >> > On Sun, Oct 1, 2023 at 5:52 PM Rui Fan <1996fan...@gmail.com> >>> wrote: >>> >> >> > >>> >> >> > > Hi Yangze, >>> >> >> > > >>> >> >> > > Thanks for your feedback! >>> >> >> > > >>> >> >> > > > 1. Is it possible for the SlotPool to get the slot >>> allocation results >>> >> >> > > > from the SlotManager in advance instead of waiting for the >>> actual >>> >> >> > > > physical slots to be registered, and perform pre-allocation? >>> The >>> >> >> > > > benefit of doing this is to make the task deployment process >>> smoother, >>> >> >> > > > especially when there are a large number of tasks in the job. >>> >> >> > > >>> >> >> > > Could you elaborate on that? I didn't understand what's the >>> benefit and >>> >> >> > > smoother. >>> >> >> > > >>> >> >> > > > 2. If user enable the cluster.evenly-spread-out-slots, the >>> issue in >>> >> >> > > > example 2 of section 2.2.3 can be resolved. Do I understand >>> it >>> >> >> > > > correctly? >>> >> >> > > >>> >> >> > > The example assigned result is the final allocation result >>> when flink >>> >> >> > > user enables the cluster.evenly-spread-out-slots. We think the >>> >> >> > > assigned result is expected, so I think your understanding is >>> right. >>> >> >> > > >>> >> >> > > Best, >>> >> >> > > Rui >>> >> >> > > >>> >> >> > > On Thu, Sep 28, 2023 at 1:10 PM Shammon FY <zjur...@gmail.com> >>> wrote: >>> >> >> > > >>> >> >> > >> Thanks Yuepeng for initiating this discussion. >>> >> >> > >> >>> >> >> > >> +1 in general too, in fact we have implemented a similar >>> mechanism >>> >> >> > >> internally to ensure a balanced allocation of tasks to slots, >>> it works >>> >> >> > >> well. >>> >> >> > >> >>> >> >> > >> Some comments about the mechanism >>> >> >> > >> >>> >> >> > >> 1. This mechanism will be only supported in `SlotPool` or >>> both `SlotPool` >>> >> >> > >> and `DeclarativeSlotPool`? Currently the two slot pools are >>> used in >>> >> >> > >> different schedulers. I think this will also bring value to >>> >> >> > >> `DeclarativeSlotPool`, but currently FLIP content seems to be >>> based on >>> >> >> > >> `SlotPool`, right? >>> >> >> > >> >>> >> >> > >> 2. In fine-grained resource management, we can set different >>> resource >>> >> >> > >> requirements for different nodes, which means that the >>> resources of each >>> >> >> > >> slot are different. What should be done when the slot >>> selected by the >>> >> >> > >> round-robin strategy cannot meet the resource requirements? >>> Will this lead >>> >> >> > >> to the failure of the balance strategy? >>> >> >> > >> >>> >> >> > >> 3. Is the assignment of tasks to slots balanced based on >>> region or job >>> >> >> > >> level? When multiple TMs fail over, will it cause the >>> balancing strategy >>> >> >> > >> to >>> >> >> > >> fail or even worse? What is the current processing strategy? >>> >> >> > >> >>> >> >> > >> For Zhuzhu and Rui: >>> >> >> > >> >>> >> >> > >> IIUC, the overall balance is divided into two parts: slot to >>> TM and task >>> >> >> > >> to >>> >> >> > >> slot. >>> >> >> > >> 1. Slot to TM is guaranteed by SlotManager in ResourceManager >>> >> >> > >> 2. Task to slot is guaranteed by the slot pool in JM >>> >> >> > >> >>> >> >> > >> These two are completely independent, what are the benefits >>> of unifying >>> >> >> > >> these two into one option? Also, do we want to share the same >>> >> >> > >> option between SlotPool in JM and SlotManager in RM? This >>> sounds a bit >>> >> >> > >> strange. >>> >> >> > >> >>> >> >> > >> Best, >>> >> >> > >> Shammon FY >>> >> >> > >> >>> >> >> > >> >>> >> >> > >> >>> >> >> > >> On Thu, Sep 28, 2023 at 12:08 PM Rui Fan < >>> 1996fan...@gmail.com> wrote: >>> >> >> > >> >>> >> >> > >> > Hi Zhu Zhu, >>> >> >> > >> > >>> >> >> > >> > Thanks for your feedback here! >>> >> >> > >> > >>> >> >> > >> > You are right, user needs to set 2 options: >>> >> >> > >> > - cluster.evenly-spread-out-slots=true >>> >> >> > >> > - slot.sharing-strategy=TASK_BALANCED_PREFERRED >>> >> >> > >> > >>> >> >> > >> > Update it to one option is useful at user side, so >>> >> >> > >> > `taskmanager.load-balance.mode` sounds good to me. >>> >> >> > >> > I want to check some points and behaviors about this option: >>> >> >> > >> > >>> >> >> > >> > 1. The default value is None, right? >>> >> >> > >> > 2. When it's set to Tasks, how to assign slots to TM? >>> >> >> > >> > - Option1: It's just check task number >>> >> >> > >> > - Option2: It''s check the slot number first, then check the >>> >> >> > >> > task number when the slot number is the same. >>> >> >> > >> > >>> >> >> > >> > Giving an example to explain what's the difference between >>> them: >>> >> >> > >> > >>> >> >> > >> > - A session cluster has 2 flink jobs, they are jobA and jobB >>> >> >> > >> > - Each TM has 4 slots. >>> >> >> > >> > - The task number of one slot of jobA is 3 >>> >> >> > >> > - The task number of one slot of jobB is 1 >>> >> >> > >> > - We have 2 TaskManagers: >>> >> >> > >> > - tm1 runs 3 slots of jobB, so tm1 runs 3 tasks >>> >> >> > >> > - tm2 runs 1 slot of jobA, and 1 slot of jobB, so tm2 >>> runs 4 tasks. >>> >> >> > >> > >>> >> >> > >> > Now, we need to run a new slot, which tm should offer it? >>> >> >> > >> > - Option1: If we just check the task number, the tm1 is >>> better. >>> >> >> > >> > - Option2: If we check the slot number first, and then >>> check task, the >>> >> >> > >> tm2 >>> >> >> > >> > is better >>> >> >> > >> > >>> >> >> > >> > The original FLIP selected option2, that's why we didn't >>> add the >>> >> >> > >> > third option. The option2 didn't break the semantics when >>> >> >> > >> > `cluster.evenly-spread-out-slots` is true, and it just >>> improve the >>> >> >> > >> > behavior without the semantics is changed. >>> >> >> > >> > >>> >> >> > >> > In the other hands, if we choose option2, when user set >>> >> >> > >> > `taskmanager.load-balance.mode` is Tasks. It also can >>> achieve >>> >> >> > >> > the goal when it's Slots. >>> >> >> > >> > >>> >> >> > >> > So I think the `Slots` enum isn't needed if we choose >>> option2. >>> >> >> > >> > Of course, If we choose the option1, the enum is needed. >>> >> >> > >> > >>> >> >> > >> > Looking forward to your feedback, thanks~ >>> >> >> > >> > >>> >> >> > >> > Best, >>> >> >> > >> > Rui >>> >> >> > >> > >>> >> >> > >> > On Wed, Sep 27, 2023 at 9:11 PM Zhu Zhu <reed...@gmail.com> >>> wrote: >>> >> >> > >> > >>> >> >> > >> > > Thanks Yuepeng and Rui for creating this FLIP. >>> >> >> > >> > > >>> >> >> > >> > > +1 in general >>> >> >> > >> > > The idea is straight forward: best-effort gather all the >>> slot requests >>> >> >> > >> > > and offered slots to form an overview before assigning >>> slots, trying >>> >> >> > >> to >>> >> >> > >> > > balance the loads of task managers when assigning slots. >>> >> >> > >> > > >>> >> >> > >> > > I have one comment regarding the configuration for ease >>> of use: >>> >> >> > >> > > >>> >> >> > >> > > IIUC, this FLIP uses an existing config >>> >> >> > >> 'cluster.evenly-spread-out-slots' >>> >> >> > >> > > as the main switch of the new feature. That is, from user >>> perspective, >>> >> >> > >> > > with this improvement, the >>> 'cluster.evenly-spread-out-slots' feature >>> >> >> > >> not >>> >> >> > >> > > only balances the number of slots on task managers, but >>> also balances >>> >> >> > >> the >>> >> >> > >> > > number of tasks. This is a behavior change anyway. >>> Besides that, it >>> >> >> > >> also >>> >> >> > >> > > requires users to set 'slot.sharing-strategy' to >>> >> >> > >> > 'TASK_BALANCED_PREFERRED' >>> >> >> > >> > > to balance the tasks in each slot. >>> >> >> > >> > > >>> >> >> > >> > > I think we can introduce a new config option >>> >> >> > >> > > `taskmanager.load-balance.mode`, >>> >> >> > >> > > which accepts "None"/"Slots"/"Tasks". >>> >> >> > >> `cluster.evenly-spread-out-slots` >>> >> >> > >> > > can be superseded by the "Slots" mode and get deprecated. >>> In the >>> >> >> > >> future >>> >> >> > >> > > it can support more mode, e.g. "CpuCores", to work better >>> for jobs >>> >> >> > >> with >>> >> >> > >> > > fine-grained resources. The proposed config option >>> >> >> > >> > > `slot.request.max-interval` >>> >> >> > >> > > then can be renamed to >>> >> >> > >> > > `taskmanager.load-balance.request-stablizing-timeout` >>> >> >> > >> > > to show its relation with the feature. The proposed >>> >> >> > >> > `slot.sharing-strategy` >>> >> >> > >> > > is not needed, because the configured "Tasks" mode will >>> do the work. >>> >> >> > >> > > >>> >> >> > >> > > WDYT? >>> >> >> > >> > > >>> >> >> > >> > > Thanks, >>> >> >> > >> > > Zhu Zhu >>> >> >> > >> > > >>> >> >> > >> > > Yuepeng Pan <panyuep...@apache.org> 于2023年9月25日周一 >>> 16:26写道: >>> >> >> > >> > > >>> >> >> > >> > >> Hi all, >>> >> >> > >> > >> >>> >> >> > >> > >> >>> >> >> > >> > >> I and Fan Rui(CC’ed) created the FLIP-370[1] to support >>> balanced >>> >> >> > >> tasks >>> >> >> > >> > >> scheduling. >>> >> >> > >> > >> >>> >> >> > >> > >> >>> >> >> > >> > >> The current strategy of Flink to deploy tasks sometimes >>> leads some >>> >> >> > >> > >> TMs(TaskManagers) to have more tasks while others have >>> fewer tasks, >>> >> >> > >> > >> resulting in excessive resource utilization at some TMs >>> that contain >>> >> >> > >> > more >>> >> >> > >> > >> tasks and becoming a bottleneck for the entire job >>> processing. >>> >> >> > >> > Developing >>> >> >> > >> > >> strategies to achieve task load balancing for TMs and >>> reducing job >>> >> >> > >> > >> bottlenecks becomes very meaningful. >>> >> >> > >> > >> >>> >> >> > >> > >> >>> >> >> > >> > >> The raw design and discussions could be found in the >>> Flink JIRA[2] >>> >> >> > >> and >>> >> >> > >> > >> Google doc[3]. We really appreciate Zhu Zhu(CC’ed) for >>> providing some >>> >> >> > >> > >> valuable help and suggestions in advance. >>> >> >> > >> > >> >>> >> >> > >> > >> >>> >> >> > >> > >> Please refer to the FLIP[1] document for more details >>> about the >>> >> >> > >> proposed >>> >> >> > >> > >> design and implementation. We welcome any feedback and >>> opinions on >>> >> >> > >> this >>> >> >> > >> > >> proposal. >>> >> >> > >> > >> >>> >> >> > >> > >> >>> >> >> > >> > >> [1] >>> >> >> > >> > >> >>> >> >> > >> > >>> >> >> > >> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling >>> >> >> > >> > >> >>> >> >> > >> > >> [2] https://issues.apache.org/jira/browse/FLINK-31757 >>> >> >> > >> > >> >>> >> >> > >> > >> [3] >>> >> >> > >> > >> >>> >> >> > >> > >>> >> >> > >> >>> https://docs.google.com/document/d/14WhrSNGBdcsRl3IK7CZO-RaZ5KXU2X1dWqxPEFr3iS8 >>> >> >> > >> > >> >>> >> >> > >> > >> >>> >> >> > >> > >> Best, >>> >> >> > >> > >> >>> >> >> > >> > >> Yuepeng Pan >>> >> >> > >> > >> >>> >> >> > >> > > >>> >> >> > >> > >>> >> >> > >> >>> >> >> > > >>> >>