Thanks for the correction, Till. Regarding your comments: - You are right, we should not change the edge type for streaming jobs. Then I think we can change the option 'allSourcesInSamePipelinedRegion' in step 2 to 'isStreamingJob', and implement the current step 2 before the current step 1 so we can use this option to decide whether should change the edge type. What do you think? - Agree. It should be easier to make the default value of 'allSourcesInSamePipelinedRegion' (or 'isStreamingJob') 'true', and set it to 'false' when using DataSet API or blink planner.
Thank you~ Xintong Song On Tue, Aug 27, 2019 at 8:59 PM Till Rohrmann <trohrm...@apache.org> wrote: > Thanks for creating the implementation plan Xintong. Overall, the > implementation plan looks good. I had a couple of comments: > > - What will happen if a user has defined a streaming job with two slot > sharing groups? Would the code insert a blocking data exchange between > these two groups? If yes, then this breaks existing Flink streaming jobs. > - How do we detect unbounded streaming jobs to set > the allSourcesInSamePipelinedRegion to `true`? Wouldn't it be easier to set > it false if we are using the DataSet API or the Blink planner with a > bounded job? > > Cheers, > Till > > On Tue, Aug 27, 2019 at 2:16 PM Till Rohrmann <trohrm...@apache.org> > wrote: > > > I guess there is a typo since the link to the FLIP-53 is > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management > > > > Cheers, > > Till > > > > On Tue, Aug 27, 2019 at 1:42 PM Xintong Song <tonysong...@gmail.com> > > wrote: > > > >> Added implementation steps for this FLIP on the wiki page [1]. > >> > >> > >> Thank you~ > >> > >> Xintong Song > >> > >> > >> [1] > >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors > >> > >> On Mon, Aug 19, 2019 at 10:29 PM Xintong Song <tonysong...@gmail.com> > >> wrote: > >> > >> > Hi everyone, > >> > > >> > As Till suggested, the original "FLIP-53: Fine Grained Resource > >> > Management" splits into two separate FLIPs, > >> > > >> > - FLIP-53: Fine Grained Operator Resource Management [1] > >> > - FLIP-56: Dynamic Slot Allocation [2] > >> > > >> > We'll continue using this discussion thread for FLIP-53. For FLIP-56, > I > >> > just started a new discussion thread [3]. > >> > > >> > Thank you~ > >> > > >> > Xintong Song > >> > > >> > > >> > [1] > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management > >> > > >> > [2] > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation > >> > > >> > [3] > >> > > >> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-56-Dynamic-Slot-Allocation-td31960.html > >> > > >> > On Mon, Aug 19, 2019 at 2:55 PM Xintong Song <tonysong...@gmail.com> > >> > wrote: > >> > > >> >> Thinks for the comments, Yang. > >> >> > >> >> Regarding your questions: > >> >> > >> >> 1. How to calculate the resource specification of TaskManagers? Do > >> they > >> >>> have them same resource spec calculated based on the > >> configuration? I > >> >>> think > >> >>> we still have wasted resources in this situation. Or we could > start > >> >>> TaskManagers with different spec. > >> >>> > >> >> I agree with you that we can further improve the resource utility by > >> >> customizing task executors with different resource specifications. > >> However, > >> >> I'm in favor of limiting the scope of this FLIP and leave it as a > >> future > >> >> optimization. The plan for that part is to move the logic of deciding > >> task > >> >> executor specifications into the slot manager and make slot manager > >> >> pluggable, so inside the slot manager plugin we can have different > >> logics > >> >> for deciding the task executor specifications. > >> >> > >> >> > >> >>> 2. If a slot is released and returned to SlotPool, does it could > be > >> >>> reused by other SlotRequest that the request resource is smaller > >> than > >> >>> it? > >> >>> > >> >> No, I think slot pool should always return slots if they do not > exactly > >> >> match the pending requests, so that resource manager can deal with > the > >> >> extra resources. > >> >> > >> >>> - If it is yes, what happens to the available resource in the > >> >> > >> >> TaskManager. > >> >>> - What is the SlotStatus of the cached slot in SlotPool? The > >> >>> AllocationId is null? > >> >>> > >> >> The allocation id does not change as long as the slot is not returned > >> >> from the job master, no matter its occupied or available in the slot > >> pool. > >> >> I think we have the same behavior currently. No matter how many tasks > >> the > >> >> job master deploy into the slot, concurrently or sequentially, it is > >> one > >> >> allocation from the cluster to the job until the slot is freed from > >> the job > >> >> master. > >> >> > >> >>> 3. In a session cluster, some jobs are configured with operator > >> >>> resources, meanwhile other jobs are using UNKNOWN. How to deal > with > >> >>> this > >> >>> situation? > >> >> > >> >> As long as we do not mix unknown / specified resource profiles within > >> the > >> >> same job / slot, there shouldn't be a problem. Resource manager > >> converts > >> >> unknown resource profiles in slot requests to specified default > >> resource > >> >> profiles, so they can be dynamically allocated from task executors' > >> >> available resources just as other slot requests with specified > resource > >> >> profiles. > >> >> > >> >> Thank you~ > >> >> > >> >> Xintong Song > >> >> > >> >> > >> >> > >> >> On Mon, Aug 19, 2019 at 11:39 AM Yang Wang <danrtsey...@gmail.com> > >> wrote: > >> >> > >> >>> Hi Xintong, > >> >>> > >> >>> > >> >>> Thanks for your detailed proposal. I think many users are suffering > >> from > >> >>> waste of resources. The resource spec of all task managers are same > >> and > >> >>> we > >> >>> have to increase all task managers to make the heavy one more > stable. > >> So > >> >>> we > >> >>> will benefit from the fine grained resource management a lot. We > could > >> >>> get > >> >>> better resource utilization and stability. > >> >>> > >> >>> > >> >>> Just to share some thoughts. > >> >>> > >> >>> > >> >>> > >> >>> 1. How to calculate the resource specification of TaskManagers? > Do > >> >>> they > >> >>> have them same resource spec calculated based on the > >> configuration? I > >> >>> think > >> >>> we still have wasted resources in this situation. Or we could > start > >> >>> TaskManagers with different spec. > >> >>> 2. If a slot is released and returned to SlotPool, does it could > be > >> >>> reused by other SlotRequest that the request resource is smaller > >> than > >> >>> it? > >> >>> - If it is yes, what happens to the available resource in the > >> >>> TaskManager. > >> >>> - What is the SlotStatus of the cached slot in SlotPool? The > >> >>> AllocationId is null? > >> >>> 3. In a session cluster, some jobs are configured with operator > >> >>> resources, meanwhile other jobs are using UNKNOWN. How to deal > with > >> >>> this > >> >>> situation? > >> >>> > >> >>> > >> >>> > >> >>> Best, > >> >>> Yang > >> >>> > >> >>> Xintong Song <tonysong...@gmail.com> 于2019年8月16日周五 下午8:57写道: > >> >>> > >> >>> > Thanks for the feedbacks, Yangze and Till. > >> >>> > > >> >>> > Yangze, > >> >>> > > >> >>> > I agree with you that we should make scheduling strategy pluggable > >> and > >> >>> > optimize the strategy to reduce the memory fragmentation problem, > >> and > >> >>> > thanks for the inputs on the potential algorithmic solutions. > >> However, > >> >>> I'm > >> >>> > in favor of keep this FLIP focusing on the overall mechanism > design > >> >>> rather > >> >>> > than strategies. Solving the fragmentation issue should be > >> considered > >> >>> as an > >> >>> > optimization, and I agree with Till that we probably should tackle > >> this > >> >>> > afterwards. > >> >>> > > >> >>> > Till, > >> >>> > > >> >>> > - Regarding splitting the FLIP, I think it makes sense. The > operator > >> >>> > resource management and dynamic slot allocation do not have much > >> >>> dependency > >> >>> > on each other. > >> >>> > > >> >>> > - Regarding the default slot size, I think this is similar to > >> FLIP-49 > >> >>> [1] > >> >>> > where we want all the deriving happens at one place. I think it > >> would > >> >>> be > >> >>> > nice to pass the default slot size into the task executor in the > >> same > >> >>> way > >> >>> > that we pass in the memory pool sizes in FLIP-49 [1]. > >> >>> > > >> >>> > - Regarding the return value of > >> TaskExecutorGateway#requestResource, I > >> >>> > think you're right. We should avoid using null as the return > value. > >> I > >> >>> think > >> >>> > we probably should thrown an exception here. > >> >>> > > >> >>> > Thank you~ > >> >>> > > >> >>> > Xintong Song > >> >>> > > >> >>> > > >> >>> > [1] > >> >>> > > >> >>> > > >> >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors > >> >>> > > >> >>> > On Fri, Aug 16, 2019 at 2:18 PM Till Rohrmann < > trohrm...@apache.org > >> > > >> >>> > wrote: > >> >>> > > >> >>> > > Hi Xintong, > >> >>> > > > >> >>> > > thanks for drafting this FLIP. I think your proposal helps to > >> >>> improve the > >> >>> > > execution of batch jobs more efficiently. Moreover, it enables > the > >> >>> proper > >> >>> > > integration of the Blink planner which is very important as > well. > >> >>> > > > >> >>> > > Overall, the FLIP looks good to me. I was wondering whether it > >> >>> wouldn't > >> >>> > > make sense to actually split it up into two FLIPs: Operator > >> resource > >> >>> > > management and dynamic slot allocation. I think these two FLIPs > >> >>> could be > >> >>> > > seen as orthogonal and it would decrease the scope of each > >> individual > >> >>> > FLIP. > >> >>> > > > >> >>> > > Some smaller comments: > >> >>> > > > >> >>> > > - I'm not sure whether we should pass in the default slot size > >> via an > >> >>> > > environment variable. Without having unified the way how Flink > >> >>> components > >> >>> > > are configured [1], I think it would be better to pass it in as > >> part > >> >>> of > >> >>> > the > >> >>> > > configuration. > >> >>> > > - I would avoid returning a null value from > >> >>> > > TaskExecutorGateway#requestResource if it cannot be fulfilled. > >> >>> Either we > >> >>> > > should introduce an explicit return value saying this or throw > an > >> >>> > > exception. > >> >>> > > > >> >>> > > Concerning Yangze's comments: I think you are right that it > would > >> be > >> >>> > > helpful to make the selection strategy pluggable. Also batching > >> slot > >> >>> > > requests to the RM could be a good optimization. For the sake of > >> >>> keeping > >> >>> > > the scope of this FLIP smaller I would try to tackle these > things > >> >>> after > >> >>> > the > >> >>> > > initial version has been completed (without spoiling these > >> >>> optimization > >> >>> > > opportunities). In particular batching the slot requests depends > >> on > >> >>> the > >> >>> > > current scheduler refactoring and could also be realized on the > RM > >> >>> side > >> >>> > > only. > >> >>> > > > >> >>> > > [1] > >> >>> > > > >> >>> > > > >> >>> > > >> >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration > >> >>> > > > >> >>> > > Cheers, > >> >>> > > Till > >> >>> > > > >> >>> > > > >> >>> > > > >> >>> > > On Fri, Aug 16, 2019 at 11:11 AM Yangze Guo <karma...@gmail.com > > > >> >>> wrote: > >> >>> > > > >> >>> > > > Hi, Xintong > >> >>> > > > > >> >>> > > > Thanks to propose this FLIP. The general design looks good to > >> me, > >> >>> +1 > >> >>> > > > for this feature. > >> >>> > > > > >> >>> > > > Since slots in the same task executor could have different > >> resource > >> >>> > > > profile, we will > >> >>> > > > meet resource fragment problem. Think about this case: > >> >>> > > > - request A want 1G memory while request B & C want 0.5G > memory > >> >>> > > > - There are two task executors T1 & T2 with 1G and 0.5G free > >> >>> memory > >> >>> > > > respectively > >> >>> > > > If B come first and we cut a slot from T1 for B, A must wait > for > >> >>> the > >> >>> > > > free resource from > >> >>> > > > other task. But A could have been scheduled immediately if we > >> cut a > >> >>> > > > slot from T2 for B. > >> >>> > > > > >> >>> > > > The logic of findMatchingSlot now become finding a task > executor > >> >>> which > >> >>> > > > has enough > >> >>> > > > resource and then cut a slot from it. Current method could be > >> seen > >> >>> as > >> >>> > > > "First-fit strategy", > >> >>> > > > which works well in general but sometimes could not be the > >> >>> optimization > >> >>> > > > method. > >> >>> > > > > >> >>> > > > Actually, this problem could be abstracted as "Bin Packing > >> >>> Problem"[1]. > >> >>> > > > Here are > >> >>> > > > some common approximate algorithms: > >> >>> > > > - First fit > >> >>> > > > - Next fit > >> >>> > > > - Best fit > >> >>> > > > > >> >>> > > > But it become multi-dimensional bin packing problem if we take > >> CPU > >> >>> > > > into account. It hard > >> >>> > > > to define which one is best fit now. Some research addressed > >> this > >> >>> > > > problem, such like Tetris[2]. > >> >>> > > > > >> >>> > > > Here are some thinking about it: > >> >>> > > > 1. We could make the strategy of finding matching task > executor > >> >>> > > > pluginable. Let user to config the > >> >>> > > > best strategy in their scenario. > >> >>> > > > 2. We could support batch request interface in RM, because we > >> have > >> >>> > > > opportunities to optimize > >> >>> > > > if we have more information. If we know the A, B, C at the > same > >> >>> time, > >> >>> > > > we could always make the best decision. > >> >>> > > > > >> >>> > > > [1] http://www.or.deis.unibo.it/kp/Chapter8.pdf > >> >>> > > > [2] > >> >>> > > >> https://www.cs.cmu.edu/~xia/resources/Documents/grandl_sigcomm14.pdf > >> >>> > > > > >> >>> > > > Best, > >> >>> > > > Yangze Guo > >> >>> > > > > >> >>> > > > On Thu, Aug 15, 2019 at 10:40 PM Xintong Song < > >> >>> tonysong...@gmail.com> > >> >>> > > > wrote: > >> >>> > > > > > >> >>> > > > > Hi everyone, > >> >>> > > > > > >> >>> > > > > We would like to start a discussion thread on "FLIP-53: Fine > >> >>> Grained > >> >>> > > > > Resource Management"[1], where we propose how to improve > Flink > >> >>> > resource > >> >>> > > > > management and scheduling. > >> >>> > > > > > >> >>> > > > > This FLIP mainly discusses the following issues. > >> >>> > > > > > >> >>> > > > > - How to support tasks with fine grained resource > >> >>> requirements. > >> >>> > > > > - How to unify resource management for jobs with / > without > >> >>> fine > >> >>> > > > grained > >> >>> > > > > resource requirements. > >> >>> > > > > - How to unify resource management for streaming / batch > >> jobs. > >> >>> > > > > > >> >>> > > > > Key changes proposed in the FLIP are as follows. > >> >>> > > > > > >> >>> > > > > - Unify memory management for operators with / without > fine > >> >>> > grained > >> >>> > > > > resource requirements by applying a fraction based quota > >> >>> > mechanism. > >> >>> > > > > - Unify resource scheduling for streaming and batch jobs > by > >> >>> > setting > >> >>> > > > slot > >> >>> > > > > sharing groups for pipelined regions during compiling > >> stage. > >> >>> > > > > - Dynamically allocate slots from task executors' > available > >> >>> > > resources. > >> >>> > > > > > >> >>> > > > > Please find more details in the FLIP wiki document [1]. > >> Looking > >> >>> > forward > >> >>> > > > to > >> >>> > > > > your feedbacks. > >> >>> > > > > > >> >>> > > > > Thank you~ > >> >>> > > > > > >> >>> > > > > Xintong Song > >> >>> > > > > > >> >>> > > > > > >> >>> > > > > [1] > >> >>> > > > > > >> >>> > > > > >> >>> > > > >> >>> > > >> >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Resource+Management > >> >>> > > > > >> >>> > > > >> >>> > > >> >>> > >> >> > >> > > >