Updated the FLIP wiki page [1], with the following changes. - Remove the step of converting pipelined edges between different slot sharing groups into blocking edges. - Set `allSourcesInSamePipelinedRegion` to true by default.
Thank you~ Xintong Song On Mon, Sep 2, 2019 at 11:50 AM Xintong Song <tonysong...@gmail.com> wrote: > Regarding changing edge type, I think actually we don't need to do this > for batch jobs neither, because we don't have public interfaces for users > to explicitly set slot sharing groups in DataSet API and SQL/Table API. We > have such interfaces in DataStream API only. > > Thank you~ > > Xintong Song > > > > On Tue, Aug 27, 2019 at 10:16 PM Xintong Song <tonysong...@gmail.com> > wrote: > >> Thanks for the correction, Till. >> >> Regarding your comments: >> - You are right, we should not change the edge type for streaming jobs. >> Then I think we can change the option 'allSourcesInSamePipelinedRegion' in >> step 2 to 'isStreamingJob', and implement the current step 2 before the >> current step 1 so we can use this option to decide whether should change >> the edge type. What do you think? >> - Agree. It should be easier to make the default value of >> 'allSourcesInSamePipelinedRegion' (or 'isStreamingJob') 'true', and set it >> to 'false' when using DataSet API or blink planner. >> >> Thank you~ >> >> Xintong Song >> >> >> >> On Tue, Aug 27, 2019 at 8:59 PM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Thanks for creating the implementation plan Xintong. Overall, the >>> implementation plan looks good. I had a couple of comments: >>> >>> - What will happen if a user has defined a streaming job with two slot >>> sharing groups? Would the code insert a blocking data exchange between >>> these two groups? If yes, then this breaks existing Flink streaming jobs. >>> - How do we detect unbounded streaming jobs to set >>> the allSourcesInSamePipelinedRegion to `true`? Wouldn't it be easier to >>> set >>> it false if we are using the DataSet API or the Blink planner with a >>> bounded job? >>> >>> Cheers, >>> Till >>> >>> On Tue, Aug 27, 2019 at 2:16 PM Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>> > I guess there is a typo since the link to the FLIP-53 is >>> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management >>> > >>> > Cheers, >>> > Till >>> > >>> > On Tue, Aug 27, 2019 at 1:42 PM Xintong Song <tonysong...@gmail.com> >>> > wrote: >>> > >>> >> Added implementation steps for this FLIP on the wiki page [1]. >>> >> >>> >> >>> >> Thank you~ >>> >> >>> >> Xintong Song >>> >> >>> >> >>> >> [1] >>> >> >>> >> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors >>> >> >>> >> On Mon, Aug 19, 2019 at 10:29 PM Xintong Song <tonysong...@gmail.com> >>> >> wrote: >>> >> >>> >> > Hi everyone, >>> >> > >>> >> > As Till suggested, the original "FLIP-53: Fine Grained Resource >>> >> > Management" splits into two separate FLIPs, >>> >> > >>> >> > - FLIP-53: Fine Grained Operator Resource Management [1] >>> >> > - FLIP-56: Dynamic Slot Allocation [2] >>> >> > >>> >> > We'll continue using this discussion thread for FLIP-53. For >>> FLIP-56, I >>> >> > just started a new discussion thread [3]. >>> >> > >>> >> > Thank you~ >>> >> > >>> >> > Xintong Song >>> >> > >>> >> > >>> >> > [1] >>> >> > >>> >> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management >>> >> > >>> >> > [2] >>> >> > >>> >> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation >>> >> > >>> >> > [3] >>> >> > >>> >> >>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-56-Dynamic-Slot-Allocation-td31960.html >>> >> > >>> >> > On Mon, Aug 19, 2019 at 2:55 PM Xintong Song <tonysong...@gmail.com >>> > >>> >> > wrote: >>> >> > >>> >> >> Thinks for the comments, Yang. >>> >> >> >>> >> >> Regarding your questions: >>> >> >> >>> >> >> 1. How to calculate the resource specification of TaskManagers? >>> Do >>> >> they >>> >> >>> have them same resource spec calculated based on the >>> >> configuration? I >>> >> >>> think >>> >> >>> we still have wasted resources in this situation. Or we could >>> start >>> >> >>> TaskManagers with different spec. >>> >> >>> >>> >> >> I agree with you that we can further improve the resource utility >>> by >>> >> >> customizing task executors with different resource specifications. >>> >> However, >>> >> >> I'm in favor of limiting the scope of this FLIP and leave it as a >>> >> future >>> >> >> optimization. The plan for that part is to move the logic of >>> deciding >>> >> task >>> >> >> executor specifications into the slot manager and make slot manager >>> >> >> pluggable, so inside the slot manager plugin we can have different >>> >> logics >>> >> >> for deciding the task executor specifications. >>> >> >> >>> >> >> >>> >> >>> 2. If a slot is released and returned to SlotPool, does it >>> could be >>> >> >>> reused by other SlotRequest that the request resource is >>> smaller >>> >> than >>> >> >>> it? >>> >> >>> >>> >> >> No, I think slot pool should always return slots if they do not >>> exactly >>> >> >> match the pending requests, so that resource manager can deal with >>> the >>> >> >> extra resources. >>> >> >> >>> >> >>> - If it is yes, what happens to the available resource in >>> the >>> >> >> >>> >> >> TaskManager. >>> >> >>> - What is the SlotStatus of the cached slot in SlotPool? The >>> >> >>> AllocationId is null? >>> >> >>> >>> >> >> The allocation id does not change as long as the slot is not >>> returned >>> >> >> from the job master, no matter its occupied or available in the >>> slot >>> >> pool. >>> >> >> I think we have the same behavior currently. No matter how many >>> tasks >>> >> the >>> >> >> job master deploy into the slot, concurrently or sequentially, it >>> is >>> >> one >>> >> >> allocation from the cluster to the job until the slot is freed from >>> >> the job >>> >> >> master. >>> >> >> >>> >> >>> 3. In a session cluster, some jobs are configured with operator >>> >> >>> resources, meanwhile other jobs are using UNKNOWN. How to deal >>> with >>> >> >>> this >>> >> >>> situation? >>> >> >> >>> >> >> As long as we do not mix unknown / specified resource profiles >>> within >>> >> the >>> >> >> same job / slot, there shouldn't be a problem. Resource manager >>> >> converts >>> >> >> unknown resource profiles in slot requests to specified default >>> >> resource >>> >> >> profiles, so they can be dynamically allocated from task executors' >>> >> >> available resources just as other slot requests with specified >>> resource >>> >> >> profiles. >>> >> >> >>> >> >> Thank you~ >>> >> >> >>> >> >> Xintong Song >>> >> >> >>> >> >> >>> >> >> >>> >> >> On Mon, Aug 19, 2019 at 11:39 AM Yang Wang <danrtsey...@gmail.com> >>> >> wrote: >>> >> >> >>> >> >>> Hi Xintong, >>> >> >>> >>> >> >>> >>> >> >>> Thanks for your detailed proposal. I think many users are >>> suffering >>> >> from >>> >> >>> waste of resources. The resource spec of all task managers are >>> same >>> >> and >>> >> >>> we >>> >> >>> have to increase all task managers to make the heavy one more >>> stable. >>> >> So >>> >> >>> we >>> >> >>> will benefit from the fine grained resource management a lot. We >>> could >>> >> >>> get >>> >> >>> better resource utilization and stability. >>> >> >>> >>> >> >>> >>> >> >>> Just to share some thoughts. >>> >> >>> >>> >> >>> >>> >> >>> >>> >> >>> 1. How to calculate the resource specification of >>> TaskManagers? Do >>> >> >>> they >>> >> >>> have them same resource spec calculated based on the >>> >> configuration? I >>> >> >>> think >>> >> >>> we still have wasted resources in this situation. Or we could >>> start >>> >> >>> TaskManagers with different spec. >>> >> >>> 2. If a slot is released and returned to SlotPool, does it >>> could be >>> >> >>> reused by other SlotRequest that the request resource is >>> smaller >>> >> than >>> >> >>> it? >>> >> >>> - If it is yes, what happens to the available resource in >>> the >>> >> >>> TaskManager. >>> >> >>> - What is the SlotStatus of the cached slot in SlotPool? The >>> >> >>> AllocationId is null? >>> >> >>> 3. In a session cluster, some jobs are configured with operator >>> >> >>> resources, meanwhile other jobs are using UNKNOWN. How to deal >>> with >>> >> >>> this >>> >> >>> situation? >>> >> >>> >>> >> >>> >>> >> >>> >>> >> >>> Best, >>> >> >>> Yang >>> >> >>> >>> >> >>> Xintong Song <tonysong...@gmail.com> 于2019年8月16日周五 下午8:57写道: >>> >> >>> >>> >> >>> > Thanks for the feedbacks, Yangze and Till. >>> >> >>> > >>> >> >>> > Yangze, >>> >> >>> > >>> >> >>> > I agree with you that we should make scheduling strategy >>> pluggable >>> >> and >>> >> >>> > optimize the strategy to reduce the memory fragmentation >>> problem, >>> >> and >>> >> >>> > thanks for the inputs on the potential algorithmic solutions. >>> >> However, >>> >> >>> I'm >>> >> >>> > in favor of keep this FLIP focusing on the overall mechanism >>> design >>> >> >>> rather >>> >> >>> > than strategies. Solving the fragmentation issue should be >>> >> considered >>> >> >>> as an >>> >> >>> > optimization, and I agree with Till that we probably should >>> tackle >>> >> this >>> >> >>> > afterwards. >>> >> >>> > >>> >> >>> > Till, >>> >> >>> > >>> >> >>> > - Regarding splitting the FLIP, I think it makes sense. The >>> operator >>> >> >>> > resource management and dynamic slot allocation do not have much >>> >> >>> dependency >>> >> >>> > on each other. >>> >> >>> > >>> >> >>> > - Regarding the default slot size, I think this is similar to >>> >> FLIP-49 >>> >> >>> [1] >>> >> >>> > where we want all the deriving happens at one place. I think it >>> >> would >>> >> >>> be >>> >> >>> > nice to pass the default slot size into the task executor in the >>> >> same >>> >> >>> way >>> >> >>> > that we pass in the memory pool sizes in FLIP-49 [1]. >>> >> >>> > >>> >> >>> > - Regarding the return value of >>> >> TaskExecutorGateway#requestResource, I >>> >> >>> > think you're right. We should avoid using null as the return >>> value. >>> >> I >>> >> >>> think >>> >> >>> > we probably should thrown an exception here. >>> >> >>> > >>> >> >>> > Thank you~ >>> >> >>> > >>> >> >>> > Xintong Song >>> >> >>> > >>> >> >>> > >>> >> >>> > [1] >>> >> >>> > >>> >> >>> > >>> >> >>> >>> >> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors >>> >> >>> > >>> >> >>> > On Fri, Aug 16, 2019 at 2:18 PM Till Rohrmann < >>> trohrm...@apache.org >>> >> > >>> >> >>> > wrote: >>> >> >>> > >>> >> >>> > > Hi Xintong, >>> >> >>> > > >>> >> >>> > > thanks for drafting this FLIP. I think your proposal helps to >>> >> >>> improve the >>> >> >>> > > execution of batch jobs more efficiently. Moreover, it >>> enables the >>> >> >>> proper >>> >> >>> > > integration of the Blink planner which is very important as >>> well. >>> >> >>> > > >>> >> >>> > > Overall, the FLIP looks good to me. I was wondering whether it >>> >> >>> wouldn't >>> >> >>> > > make sense to actually split it up into two FLIPs: Operator >>> >> resource >>> >> >>> > > management and dynamic slot allocation. I think these two >>> FLIPs >>> >> >>> could be >>> >> >>> > > seen as orthogonal and it would decrease the scope of each >>> >> individual >>> >> >>> > FLIP. >>> >> >>> > > >>> >> >>> > > Some smaller comments: >>> >> >>> > > >>> >> >>> > > - I'm not sure whether we should pass in the default slot size >>> >> via an >>> >> >>> > > environment variable. Without having unified the way how Flink >>> >> >>> components >>> >> >>> > > are configured [1], I think it would be better to pass it in >>> as >>> >> part >>> >> >>> of >>> >> >>> > the >>> >> >>> > > configuration. >>> >> >>> > > - I would avoid returning a null value from >>> >> >>> > > TaskExecutorGateway#requestResource if it cannot be fulfilled. >>> >> >>> Either we >>> >> >>> > > should introduce an explicit return value saying this or >>> throw an >>> >> >>> > > exception. >>> >> >>> > > >>> >> >>> > > Concerning Yangze's comments: I think you are right that it >>> would >>> >> be >>> >> >>> > > helpful to make the selection strategy pluggable. Also >>> batching >>> >> slot >>> >> >>> > > requests to the RM could be a good optimization. For the sake >>> of >>> >> >>> keeping >>> >> >>> > > the scope of this FLIP smaller I would try to tackle these >>> things >>> >> >>> after >>> >> >>> > the >>> >> >>> > > initial version has been completed (without spoiling these >>> >> >>> optimization >>> >> >>> > > opportunities). In particular batching the slot requests >>> depends >>> >> on >>> >> >>> the >>> >> >>> > > current scheduler refactoring and could also be realized on >>> the RM >>> >> >>> side >>> >> >>> > > only. >>> >> >>> > > >>> >> >>> > > [1] >>> >> >>> > > >>> >> >>> > > >>> >> >>> > >>> >> >>> >>> >> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration >>> >> >>> > > >>> >> >>> > > Cheers, >>> >> >>> > > Till >>> >> >>> > > >>> >> >>> > > >>> >> >>> > > >>> >> >>> > > On Fri, Aug 16, 2019 at 11:11 AM Yangze Guo < >>> karma...@gmail.com> >>> >> >>> wrote: >>> >> >>> > > >>> >> >>> > > > Hi, Xintong >>> >> >>> > > > >>> >> >>> > > > Thanks to propose this FLIP. The general design looks good >>> to >>> >> me, >>> >> >>> +1 >>> >> >>> > > > for this feature. >>> >> >>> > > > >>> >> >>> > > > Since slots in the same task executor could have different >>> >> resource >>> >> >>> > > > profile, we will >>> >> >>> > > > meet resource fragment problem. Think about this case: >>> >> >>> > > > - request A want 1G memory while request B & C want 0.5G >>> memory >>> >> >>> > > > - There are two task executors T1 & T2 with 1G and 0.5G >>> free >>> >> >>> memory >>> >> >>> > > > respectively >>> >> >>> > > > If B come first and we cut a slot from T1 for B, A must >>> wait for >>> >> >>> the >>> >> >>> > > > free resource from >>> >> >>> > > > other task. But A could have been scheduled immediately if >>> we >>> >> cut a >>> >> >>> > > > slot from T2 for B. >>> >> >>> > > > >>> >> >>> > > > The logic of findMatchingSlot now become finding a task >>> executor >>> >> >>> which >>> >> >>> > > > has enough >>> >> >>> > > > resource and then cut a slot from it. Current method could >>> be >>> >> seen >>> >> >>> as >>> >> >>> > > > "First-fit strategy", >>> >> >>> > > > which works well in general but sometimes could not be the >>> >> >>> optimization >>> >> >>> > > > method. >>> >> >>> > > > >>> >> >>> > > > Actually, this problem could be abstracted as "Bin Packing >>> >> >>> Problem"[1]. >>> >> >>> > > > Here are >>> >> >>> > > > some common approximate algorithms: >>> >> >>> > > > - First fit >>> >> >>> > > > - Next fit >>> >> >>> > > > - Best fit >>> >> >>> > > > >>> >> >>> > > > But it become multi-dimensional bin packing problem if we >>> take >>> >> CPU >>> >> >>> > > > into account. It hard >>> >> >>> > > > to define which one is best fit now. Some research addressed >>> >> this >>> >> >>> > > > problem, such like Tetris[2]. >>> >> >>> > > > >>> >> >>> > > > Here are some thinking about it: >>> >> >>> > > > 1. We could make the strategy of finding matching task >>> executor >>> >> >>> > > > pluginable. Let user to config the >>> >> >>> > > > best strategy in their scenario. >>> >> >>> > > > 2. We could support batch request interface in RM, because >>> we >>> >> have >>> >> >>> > > > opportunities to optimize >>> >> >>> > > > if we have more information. If we know the A, B, C at the >>> same >>> >> >>> time, >>> >> >>> > > > we could always make the best decision. >>> >> >>> > > > >>> >> >>> > > > [1] http://www.or.deis.unibo.it/kp/Chapter8.pdf >>> >> >>> > > > [2] >>> >> >>> > >>> >> https://www.cs.cmu.edu/~xia/resources/Documents/grandl_sigcomm14.pdf >>> >> >>> > > > >>> >> >>> > > > Best, >>> >> >>> > > > Yangze Guo >>> >> >>> > > > >>> >> >>> > > > On Thu, Aug 15, 2019 at 10:40 PM Xintong Song < >>> >> >>> tonysong...@gmail.com> >>> >> >>> > > > wrote: >>> >> >>> > > > > >>> >> >>> > > > > Hi everyone, >>> >> >>> > > > > >>> >> >>> > > > > We would like to start a discussion thread on "FLIP-53: >>> Fine >>> >> >>> Grained >>> >> >>> > > > > Resource Management"[1], where we propose how to improve >>> Flink >>> >> >>> > resource >>> >> >>> > > > > management and scheduling. >>> >> >>> > > > > >>> >> >>> > > > > This FLIP mainly discusses the following issues. >>> >> >>> > > > > >>> >> >>> > > > > - How to support tasks with fine grained resource >>> >> >>> requirements. >>> >> >>> > > > > - How to unify resource management for jobs with / >>> without >>> >> >>> fine >>> >> >>> > > > grained >>> >> >>> > > > > resource requirements. >>> >> >>> > > > > - How to unify resource management for streaming / >>> batch >>> >> jobs. >>> >> >>> > > > > >>> >> >>> > > > > Key changes proposed in the FLIP are as follows. >>> >> >>> > > > > >>> >> >>> > > > > - Unify memory management for operators with / without >>> fine >>> >> >>> > grained >>> >> >>> > > > > resource requirements by applying a fraction based >>> quota >>> >> >>> > mechanism. >>> >> >>> > > > > - Unify resource scheduling for streaming and batch >>> jobs by >>> >> >>> > setting >>> >> >>> > > > slot >>> >> >>> > > > > sharing groups for pipelined regions during compiling >>> >> stage. >>> >> >>> > > > > - Dynamically allocate slots from task executors' >>> available >>> >> >>> > > resources. >>> >> >>> > > > > >>> >> >>> > > > > Please find more details in the FLIP wiki document [1]. >>> >> Looking >>> >> >>> > forward >>> >> >>> > > > to >>> >> >>> > > > > your feedbacks. >>> >> >>> > > > > >>> >> >>> > > > > Thank you~ >>> >> >>> > > > > >>> >> >>> > > > > Xintong Song >>> >> >>> > > > > >>> >> >>> > > > > >>> >> >>> > > > > [1] >>> >> >>> > > > > >>> >> >>> > > > >>> >> >>> > > >>> >> >>> > >>> >> >>> >>> >> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Resource+Management >>> >> >>> > > > >>> >> >>> > > >>> >> >>> > >>> >> >>> >>> >> >> >>> >> >>> > >>> >>