Thanks for creating the implementation plan Xintong. Overall, the implementation plan looks good. I had a couple of comments:
- What will happen if a user has defined a streaming job with two slot sharing groups? Would the code insert a blocking data exchange between these two groups? If yes, then this breaks existing Flink streaming jobs. - How do we detect unbounded streaming jobs to set the allSourcesInSamePipelinedRegion to `true`? Wouldn't it be easier to set it false if we are using the DataSet API or the Blink planner with a bounded job? Cheers, Till On Tue, Aug 27, 2019 at 2:16 PM Till Rohrmann <trohrm...@apache.org> wrote: > I guess there is a typo since the link to the FLIP-53 is > https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management > > Cheers, > Till > > On Tue, Aug 27, 2019 at 1:42 PM Xintong Song <tonysong...@gmail.com> > wrote: > >> Added implementation steps for this FLIP on the wiki page [1]. >> >> >> Thank you~ >> >> Xintong Song >> >> >> [1] >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors >> >> On Mon, Aug 19, 2019 at 10:29 PM Xintong Song <tonysong...@gmail.com> >> wrote: >> >> > Hi everyone, >> > >> > As Till suggested, the original "FLIP-53: Fine Grained Resource >> > Management" splits into two separate FLIPs, >> > >> > - FLIP-53: Fine Grained Operator Resource Management [1] >> > - FLIP-56: Dynamic Slot Allocation [2] >> > >> > We'll continue using this discussion thread for FLIP-53. For FLIP-56, I >> > just started a new discussion thread [3]. >> > >> > Thank you~ >> > >> > Xintong Song >> > >> > >> > [1] >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management >> > >> > [2] >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation >> > >> > [3] >> > >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-56-Dynamic-Slot-Allocation-td31960.html >> > >> > On Mon, Aug 19, 2019 at 2:55 PM Xintong Song <tonysong...@gmail.com> >> > wrote: >> > >> >> Thinks for the comments, Yang. >> >> >> >> Regarding your questions: >> >> >> >> 1. How to calculate the resource specification of TaskManagers? Do >> they >> >>> have them same resource spec calculated based on the >> configuration? I >> >>> think >> >>> we still have wasted resources in this situation. Or we could start >> >>> TaskManagers with different spec. >> >>> >> >> I agree with you that we can further improve the resource utility by >> >> customizing task executors with different resource specifications. >> However, >> >> I'm in favor of limiting the scope of this FLIP and leave it as a >> future >> >> optimization. The plan for that part is to move the logic of deciding >> task >> >> executor specifications into the slot manager and make slot manager >> >> pluggable, so inside the slot manager plugin we can have different >> logics >> >> for deciding the task executor specifications. >> >> >> >> >> >>> 2. If a slot is released and returned to SlotPool, does it could be >> >>> reused by other SlotRequest that the request resource is smaller >> than >> >>> it? >> >>> >> >> No, I think slot pool should always return slots if they do not exactly >> >> match the pending requests, so that resource manager can deal with the >> >> extra resources. >> >> >> >>> - If it is yes, what happens to the available resource in the >> >> >> >> TaskManager. >> >>> - What is the SlotStatus of the cached slot in SlotPool? The >> >>> AllocationId is null? >> >>> >> >> The allocation id does not change as long as the slot is not returned >> >> from the job master, no matter its occupied or available in the slot >> pool. >> >> I think we have the same behavior currently. No matter how many tasks >> the >> >> job master deploy into the slot, concurrently or sequentially, it is >> one >> >> allocation from the cluster to the job until the slot is freed from >> the job >> >> master. >> >> >> >>> 3. In a session cluster, some jobs are configured with operator >> >>> resources, meanwhile other jobs are using UNKNOWN. How to deal with >> >>> this >> >>> situation? >> >> >> >> As long as we do not mix unknown / specified resource profiles within >> the >> >> same job / slot, there shouldn't be a problem. Resource manager >> converts >> >> unknown resource profiles in slot requests to specified default >> resource >> >> profiles, so they can be dynamically allocated from task executors' >> >> available resources just as other slot requests with specified resource >> >> profiles. >> >> >> >> Thank you~ >> >> >> >> Xintong Song >> >> >> >> >> >> >> >> On Mon, Aug 19, 2019 at 11:39 AM Yang Wang <danrtsey...@gmail.com> >> wrote: >> >> >> >>> Hi Xintong, >> >>> >> >>> >> >>> Thanks for your detailed proposal. I think many users are suffering >> from >> >>> waste of resources. The resource spec of all task managers are same >> and >> >>> we >> >>> have to increase all task managers to make the heavy one more stable. >> So >> >>> we >> >>> will benefit from the fine grained resource management a lot. We could >> >>> get >> >>> better resource utilization and stability. >> >>> >> >>> >> >>> Just to share some thoughts. >> >>> >> >>> >> >>> >> >>> 1. How to calculate the resource specification of TaskManagers? Do >> >>> they >> >>> have them same resource spec calculated based on the >> configuration? I >> >>> think >> >>> we still have wasted resources in this situation. Or we could start >> >>> TaskManagers with different spec. >> >>> 2. If a slot is released and returned to SlotPool, does it could be >> >>> reused by other SlotRequest that the request resource is smaller >> than >> >>> it? >> >>> - If it is yes, what happens to the available resource in the >> >>> TaskManager. >> >>> - What is the SlotStatus of the cached slot in SlotPool? The >> >>> AllocationId is null? >> >>> 3. In a session cluster, some jobs are configured with operator >> >>> resources, meanwhile other jobs are using UNKNOWN. How to deal with >> >>> this >> >>> situation? >> >>> >> >>> >> >>> >> >>> Best, >> >>> Yang >> >>> >> >>> Xintong Song <tonysong...@gmail.com> 于2019年8月16日周五 下午8:57写道: >> >>> >> >>> > Thanks for the feedbacks, Yangze and Till. >> >>> > >> >>> > Yangze, >> >>> > >> >>> > I agree with you that we should make scheduling strategy pluggable >> and >> >>> > optimize the strategy to reduce the memory fragmentation problem, >> and >> >>> > thanks for the inputs on the potential algorithmic solutions. >> However, >> >>> I'm >> >>> > in favor of keep this FLIP focusing on the overall mechanism design >> >>> rather >> >>> > than strategies. Solving the fragmentation issue should be >> considered >> >>> as an >> >>> > optimization, and I agree with Till that we probably should tackle >> this >> >>> > afterwards. >> >>> > >> >>> > Till, >> >>> > >> >>> > - Regarding splitting the FLIP, I think it makes sense. The operator >> >>> > resource management and dynamic slot allocation do not have much >> >>> dependency >> >>> > on each other. >> >>> > >> >>> > - Regarding the default slot size, I think this is similar to >> FLIP-49 >> >>> [1] >> >>> > where we want all the deriving happens at one place. I think it >> would >> >>> be >> >>> > nice to pass the default slot size into the task executor in the >> same >> >>> way >> >>> > that we pass in the memory pool sizes in FLIP-49 [1]. >> >>> > >> >>> > - Regarding the return value of >> TaskExecutorGateway#requestResource, I >> >>> > think you're right. We should avoid using null as the return value. >> I >> >>> think >> >>> > we probably should thrown an exception here. >> >>> > >> >>> > Thank you~ >> >>> > >> >>> > Xintong Song >> >>> > >> >>> > >> >>> > [1] >> >>> > >> >>> > >> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors >> >>> > >> >>> > On Fri, Aug 16, 2019 at 2:18 PM Till Rohrmann <trohrm...@apache.org >> > >> >>> > wrote: >> >>> > >> >>> > > Hi Xintong, >> >>> > > >> >>> > > thanks for drafting this FLIP. I think your proposal helps to >> >>> improve the >> >>> > > execution of batch jobs more efficiently. Moreover, it enables the >> >>> proper >> >>> > > integration of the Blink planner which is very important as well. >> >>> > > >> >>> > > Overall, the FLIP looks good to me. I was wondering whether it >> >>> wouldn't >> >>> > > make sense to actually split it up into two FLIPs: Operator >> resource >> >>> > > management and dynamic slot allocation. I think these two FLIPs >> >>> could be >> >>> > > seen as orthogonal and it would decrease the scope of each >> individual >> >>> > FLIP. >> >>> > > >> >>> > > Some smaller comments: >> >>> > > >> >>> > > - I'm not sure whether we should pass in the default slot size >> via an >> >>> > > environment variable. Without having unified the way how Flink >> >>> components >> >>> > > are configured [1], I think it would be better to pass it in as >> part >> >>> of >> >>> > the >> >>> > > configuration. >> >>> > > - I would avoid returning a null value from >> >>> > > TaskExecutorGateway#requestResource if it cannot be fulfilled. >> >>> Either we >> >>> > > should introduce an explicit return value saying this or throw an >> >>> > > exception. >> >>> > > >> >>> > > Concerning Yangze's comments: I think you are right that it would >> be >> >>> > > helpful to make the selection strategy pluggable. Also batching >> slot >> >>> > > requests to the RM could be a good optimization. For the sake of >> >>> keeping >> >>> > > the scope of this FLIP smaller I would try to tackle these things >> >>> after >> >>> > the >> >>> > > initial version has been completed (without spoiling these >> >>> optimization >> >>> > > opportunities). In particular batching the slot requests depends >> on >> >>> the >> >>> > > current scheduler refactoring and could also be realized on the RM >> >>> side >> >>> > > only. >> >>> > > >> >>> > > [1] >> >>> > > >> >>> > > >> >>> > >> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration >> >>> > > >> >>> > > Cheers, >> >>> > > Till >> >>> > > >> >>> > > >> >>> > > >> >>> > > On Fri, Aug 16, 2019 at 11:11 AM Yangze Guo <karma...@gmail.com> >> >>> wrote: >> >>> > > >> >>> > > > Hi, Xintong >> >>> > > > >> >>> > > > Thanks to propose this FLIP. The general design looks good to >> me, >> >>> +1 >> >>> > > > for this feature. >> >>> > > > >> >>> > > > Since slots in the same task executor could have different >> resource >> >>> > > > profile, we will >> >>> > > > meet resource fragment problem. Think about this case: >> >>> > > > - request A want 1G memory while request B & C want 0.5G memory >> >>> > > > - There are two task executors T1 & T2 with 1G and 0.5G free >> >>> memory >> >>> > > > respectively >> >>> > > > If B come first and we cut a slot from T1 for B, A must wait for >> >>> the >> >>> > > > free resource from >> >>> > > > other task. But A could have been scheduled immediately if we >> cut a >> >>> > > > slot from T2 for B. >> >>> > > > >> >>> > > > The logic of findMatchingSlot now become finding a task executor >> >>> which >> >>> > > > has enough >> >>> > > > resource and then cut a slot from it. Current method could be >> seen >> >>> as >> >>> > > > "First-fit strategy", >> >>> > > > which works well in general but sometimes could not be the >> >>> optimization >> >>> > > > method. >> >>> > > > >> >>> > > > Actually, this problem could be abstracted as "Bin Packing >> >>> Problem"[1]. >> >>> > > > Here are >> >>> > > > some common approximate algorithms: >> >>> > > > - First fit >> >>> > > > - Next fit >> >>> > > > - Best fit >> >>> > > > >> >>> > > > But it become multi-dimensional bin packing problem if we take >> CPU >> >>> > > > into account. It hard >> >>> > > > to define which one is best fit now. Some research addressed >> this >> >>> > > > problem, such like Tetris[2]. >> >>> > > > >> >>> > > > Here are some thinking about it: >> >>> > > > 1. We could make the strategy of finding matching task executor >> >>> > > > pluginable. Let user to config the >> >>> > > > best strategy in their scenario. >> >>> > > > 2. We could support batch request interface in RM, because we >> have >> >>> > > > opportunities to optimize >> >>> > > > if we have more information. If we know the A, B, C at the same >> >>> time, >> >>> > > > we could always make the best decision. >> >>> > > > >> >>> > > > [1] http://www.or.deis.unibo.it/kp/Chapter8.pdf >> >>> > > > [2] >> >>> > >> https://www.cs.cmu.edu/~xia/resources/Documents/grandl_sigcomm14.pdf >> >>> > > > >> >>> > > > Best, >> >>> > > > Yangze Guo >> >>> > > > >> >>> > > > On Thu, Aug 15, 2019 at 10:40 PM Xintong Song < >> >>> tonysong...@gmail.com> >> >>> > > > wrote: >> >>> > > > > >> >>> > > > > Hi everyone, >> >>> > > > > >> >>> > > > > We would like to start a discussion thread on "FLIP-53: Fine >> >>> Grained >> >>> > > > > Resource Management"[1], where we propose how to improve Flink >> >>> > resource >> >>> > > > > management and scheduling. >> >>> > > > > >> >>> > > > > This FLIP mainly discusses the following issues. >> >>> > > > > >> >>> > > > > - How to support tasks with fine grained resource >> >>> requirements. >> >>> > > > > - How to unify resource management for jobs with / without >> >>> fine >> >>> > > > grained >> >>> > > > > resource requirements. >> >>> > > > > - How to unify resource management for streaming / batch >> jobs. >> >>> > > > > >> >>> > > > > Key changes proposed in the FLIP are as follows. >> >>> > > > > >> >>> > > > > - Unify memory management for operators with / without fine >> >>> > grained >> >>> > > > > resource requirements by applying a fraction based quota >> >>> > mechanism. >> >>> > > > > - Unify resource scheduling for streaming and batch jobs by >> >>> > setting >> >>> > > > slot >> >>> > > > > sharing groups for pipelined regions during compiling >> stage. >> >>> > > > > - Dynamically allocate slots from task executors' available >> >>> > > resources. >> >>> > > > > >> >>> > > > > Please find more details in the FLIP wiki document [1]. >> Looking >> >>> > forward >> >>> > > > to >> >>> > > > > your feedbacks. >> >>> > > > > >> >>> > > > > Thank you~ >> >>> > > > > >> >>> > > > > Xintong Song >> >>> > > > > >> >>> > > > > >> >>> > > > > [1] >> >>> > > > > >> >>> > > > >> >>> > > >> >>> > >> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Resource+Management >> >>> > > > >> >>> > > >> >>> > >> >>> >> >> >> >