Thanks for the comments, Zhu & Kurt. Andrey and I also had some discussions offline, and I would like to first post a summary of our discussion:
1. The motivation of the fraction based approach is to unify resource management for both operators with specified and unknown resource requirements. 2. The fraction based approach proposed in this FLIP should only affect streaming jobs (both bounded and unbounded). For DataSet jobs, there are already some fraction based approach (in TaskConfig and ChainedDriver), and we do not make any change to the existing approach. 3. The scope of this FLIP does not include discussion of how to set ResourceSpec for operators. 1. For blink jobs, the optimizer can set operator resources for the users, according to their configurations (default: unknown) 2. For DataStream jobs, there are no method / interface to set operator resources at the moment (1.10). We can have in the future. 3. For DataSet jobs, there are existing user interfaces to set operator resources. 4. The FLIP should explain more about how ResourceSpecs works 1. PhysicalTransformations (deployed with operators into the StreamTasks) get ResourceSpec: unknown by default or known (e.g. from the Blink planner) 2. While generating stream graph, calculate fractions and set to StreamConfig 3. While scheduling, convert ResourceSpec to ResourceProfile (ResourceSpec + network memory), and deploy to slots / TMs matching the resources 4. While starting Task in TM, each operator gets fraction converted back to the original absolute value requested by user or fair unknown share of the slot 5. We should not set `allSourcesInSamePipelinedRegion` to `false` for DataSet jobs. Behaviors of DataSet jobs should not be changed. 6. The FLIP document should differentiate works planed in this FLIP and the future follow-ups more clearly, by put the follow-ups in a separate section 7. Another limitation of the rejected alternative setting fractions at scheduling time is that, the scheduler implementation does not know which tasks will be deployed into the same slot in advance. Andrey, Please bring it up if there is anything I missed. Zhu, regarding your comments: 1. If we do not set `allSourcesInSamePipelinedRegion` to `false` for DataSet jobs (point 5 in the discussion summary above), then there shouldn't be any regression right? 2. I think it makes sense to set the max possible network memory for the JobVertex. When you say parallel instances of the same JobVertex may have need different network memory, I guess you mean the rescale scenarios where parallelisms of upstream / downstream vertex cannot be exactly divided by parallelism of downstream / upstream vertex? I would say it's acceptable to have slight difference between actually needed and allocated network memory. 3. Yes, by numOpsUseOnHeapManagedMemory I mean numOpsUseOnHeapManagedMemoryInTheSameSharedGroup. I'll update the doc. 4. Yes, it should be StreamingJobGraphGenerator. Thanks for the correction. Kurt, regarding your comments: 1. I think we don't have network memory in ResourceSpec, which is the user facing API. We only have network memory in ResourceProfile, which is used internally for scheduling. The reason we do not expose network memory to the user is that, currently how many network buffers each task needs is decided by the topology of execution graph (how many input / output channels it has). 2. In the section "Operator Resource Requirements": "For the first version, we do not support mixing operators with specified / unknown resource requirements in the same job. Either all or none of the operators of the same job should specify their resource requirements. StreamGraphGenerator should check this and throw an error when mixing of specified / unknown resource requirements is detected, during the compilation stage." 3. If the user set a resource requirement, then it is guaranteed that the task should get at least the much resource, otherwise there should be an exception. That should be guaranteed by the "Dynamic Slot Allocation" approach (FLIP-56). I'll update the FLIP document addressing the comments ASAP. Thank you~ Xintong Song On Tue, Sep 3, 2019 at 2:42 PM Kurt Young <ykt...@gmail.com> wrote: > Thanks Xingtong for driving this effort, I haven't finished the whole > document yet, > but have couple of questions: > > 1. Regarding to network memory, the document said it will be derived by > framework > automatically. I'm wondering whether we should delete this dimension from > user- > facing API? > > 2. Regarding to fraction based quota, I don't quite get the meaning of > "slotSharingGroupOnHeapManagedMem" and "slotSharingGroupOffHeapManagedMem". > What if the sharing group is mixed with specified resource and UNKNOWN > resource > requirements. > > 3 IIUC, even user had set resource requirements, lets say 500MB off-heap > managed > memory, during execution the operator may or may not have 500MB off-heap > managed > memory, right? > > Best, > Kurt > > > On Mon, Sep 2, 2019 at 8:36 PM Zhu Zhu <reed...@gmail.com> wrote: > > > Thanks Xintong for proposing this improvement. Fine grained resources can > > be very helpful when user has good planning on resources. > > > > I have a few questions: > > 1. Currently in a batch job, vertices from different regions can run at > the > > same time in slots from the same shared group, as long as they do not > have > > data dependency on each other and available slot count is not smaller > than > > the *max* of parallelism of all tasks. > > With changes in this FLIP however, tasks from different regions cannot > > share slots anymore. > > Once available slot count is smaller than the *sum* of all parallelism of > > tasks from all regions, tasks may need to be executed sequentially, which > > might result in a performance regression. > > Is this(performance regression to existing DataSet jobs) considered as a > > necessary and accepted trade off in this FLIP? > > > > 2. The network memory depends on the input/output ExecutionEdge count and > > thus can be different even for parallel instances of the same JobVertex. > > Does this mean that when adding task resources to calculating the slot > > resource for a shared group, the max possible network memory of the > vertex > > instance shall be used? > > This might result in larger resource required than actually needed. > > > > And some minor comments: > > 1. Regarding "fracManagedMemOnHeap = 1 / numOpsUseOnHeapManagedMemory", I > > guess you mean numOpsUseOnHeapManagedMemoryInTheSameSharedGroup ? > > 2. I think the *StreamGraphGenerator* in the #Slot Sharing section and > > implementation step 4 should be *StreamingJobGraphGenerator*, as > > *StreamGraphGenerator* is not aware of JobGraph and pipelined region. > > > > > > Thanks, > > Zhu Zhu > > > > Xintong Song <tonysong...@gmail.com> 于2019年9月2日周一 上午11:59写道: > > > > > Updated the FLIP wiki page [1], with the following changes. > > > > > > - Remove the step of converting pipelined edges between different > slot > > > sharing groups into blocking edges. > > > - Set `allSourcesInSamePipelinedRegion` to true by default. > > > > > > Thank you~ > > > > > > Xintong Song > > > > > > > > > > > > On Mon, Sep 2, 2019 at 11:50 AM Xintong Song <tonysong...@gmail.com> > > > wrote: > > > > > > > Regarding changing edge type, I think actually we don't need to do > this > > > > for batch jobs neither, because we don't have public interfaces for > > users > > > > to explicitly set slot sharing groups in DataSet API and SQL/Table > API. > > > We > > > > have such interfaces in DataStream API only. > > > > > > > > Thank you~ > > > > > > > > Xintong Song > > > > > > > > > > > > > > > > On Tue, Aug 27, 2019 at 10:16 PM Xintong Song <tonysong...@gmail.com > > > > > > wrote: > > > > > > > >> Thanks for the correction, Till. > > > >> > > > >> Regarding your comments: > > > >> - You are right, we should not change the edge type for streaming > > jobs. > > > >> Then I think we can change the option > > 'allSourcesInSamePipelinedRegion' > > > in > > > >> step 2 to 'isStreamingJob', and implement the current step 2 before > > the > > > >> current step 1 so we can use this option to decide whether should > > change > > > >> the edge type. What do you think? > > > >> - Agree. It should be easier to make the default value of > > > >> 'allSourcesInSamePipelinedRegion' (or 'isStreamingJob') 'true', and > > set > > > it > > > >> to 'false' when using DataSet API or blink planner. > > > >> > > > >> Thank you~ > > > >> > > > >> Xintong Song > > > >> > > > >> > > > >> > > > >> On Tue, Aug 27, 2019 at 8:59 PM Till Rohrmann <trohrm...@apache.org > > > > > >> wrote: > > > >> > > > >>> Thanks for creating the implementation plan Xintong. Overall, the > > > >>> implementation plan looks good. I had a couple of comments: > > > >>> > > > >>> - What will happen if a user has defined a streaming job with two > > slot > > > >>> sharing groups? Would the code insert a blocking data exchange > > between > > > >>> these two groups? If yes, then this breaks existing Flink streaming > > > jobs. > > > >>> - How do we detect unbounded streaming jobs to set > > > >>> the allSourcesInSamePipelinedRegion to `true`? Wouldn't it be > easier > > to > > > >>> set > > > >>> it false if we are using the DataSet API or the Blink planner with > a > > > >>> bounded job? > > > >>> > > > >>> Cheers, > > > >>> Till > > > >>> > > > >>> On Tue, Aug 27, 2019 at 2:16 PM Till Rohrmann < > trohrm...@apache.org> > > > >>> wrote: > > > >>> > > > >>> > I guess there is a typo since the link to the FLIP-53 is > > > >>> > > > > >>> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management > > > >>> > > > > >>> > Cheers, > > > >>> > Till > > > >>> > > > > >>> > On Tue, Aug 27, 2019 at 1:42 PM Xintong Song < > > tonysong...@gmail.com> > > > >>> > wrote: > > > >>> > > > > >>> >> Added implementation steps for this FLIP on the wiki page [1]. > > > >>> >> > > > >>> >> > > > >>> >> Thank you~ > > > >>> >> > > > >>> >> Xintong Song > > > >>> >> > > > >>> >> > > > >>> >> [1] > > > >>> >> > > > >>> >> > > > >>> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors > > > >>> >> > > > >>> >> On Mon, Aug 19, 2019 at 10:29 PM Xintong Song < > > > tonysong...@gmail.com> > > > >>> >> wrote: > > > >>> >> > > > >>> >> > Hi everyone, > > > >>> >> > > > > >>> >> > As Till suggested, the original "FLIP-53: Fine Grained > Resource > > > >>> >> > Management" splits into two separate FLIPs, > > > >>> >> > > > > >>> >> > - FLIP-53: Fine Grained Operator Resource Management [1] > > > >>> >> > - FLIP-56: Dynamic Slot Allocation [2] > > > >>> >> > > > > >>> >> > We'll continue using this discussion thread for FLIP-53. For > > > >>> FLIP-56, I > > > >>> >> > just started a new discussion thread [3]. > > > >>> >> > > > > >>> >> > Thank you~ > > > >>> >> > > > > >>> >> > Xintong Song > > > >>> >> > > > > >>> >> > > > > >>> >> > [1] > > > >>> >> > > > > >>> >> > > > >>> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management > > > >>> >> > > > > >>> >> > [2] > > > >>> >> > > > > >>> >> > > > >>> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation > > > >>> >> > > > > >>> >> > [3] > > > >>> >> > > > > >>> >> > > > >>> > > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-56-Dynamic-Slot-Allocation-td31960.html > > > >>> >> > > > > >>> >> > On Mon, Aug 19, 2019 at 2:55 PM Xintong Song < > > > tonysong...@gmail.com > > > >>> > > > > >>> >> > wrote: > > > >>> >> > > > > >>> >> >> Thinks for the comments, Yang. > > > >>> >> >> > > > >>> >> >> Regarding your questions: > > > >>> >> >> > > > >>> >> >> 1. How to calculate the resource specification of > > > TaskManagers? > > > >>> Do > > > >>> >> they > > > >>> >> >>> have them same resource spec calculated based on the > > > >>> >> configuration? I > > > >>> >> >>> think > > > >>> >> >>> we still have wasted resources in this situation. Or we > > could > > > >>> start > > > >>> >> >>> TaskManagers with different spec. > > > >>> >> >>> > > > >>> >> >> I agree with you that we can further improve the resource > > utility > > > >>> by > > > >>> >> >> customizing task executors with different resource > > > specifications. > > > >>> >> However, > > > >>> >> >> I'm in favor of limiting the scope of this FLIP and leave it > > as a > > > >>> >> future > > > >>> >> >> optimization. The plan for that part is to move the logic of > > > >>> deciding > > > >>> >> task > > > >>> >> >> executor specifications into the slot manager and make slot > > > manager > > > >>> >> >> pluggable, so inside the slot manager plugin we can have > > > different > > > >>> >> logics > > > >>> >> >> for deciding the task executor specifications. > > > >>> >> >> > > > >>> >> >> > > > >>> >> >>> 2. If a slot is released and returned to SlotPool, does > it > > > >>> could be > > > >>> >> >>> reused by other SlotRequest that the request resource is > > > >>> smaller > > > >>> >> than > > > >>> >> >>> it? > > > >>> >> >>> > > > >>> >> >> No, I think slot pool should always return slots if they do > not > > > >>> exactly > > > >>> >> >> match the pending requests, so that resource manager can deal > > > with > > > >>> the > > > >>> >> >> extra resources. > > > >>> >> >> > > > >>> >> >>> - If it is yes, what happens to the available resource > > in > > > >>> the > > > >>> >> >> > > > >>> >> >> TaskManager. > > > >>> >> >>> - What is the SlotStatus of the cached slot in > SlotPool? > > > The > > > >>> >> >>> AllocationId is null? > > > >>> >> >>> > > > >>> >> >> The allocation id does not change as long as the slot is not > > > >>> returned > > > >>> >> >> from the job master, no matter its occupied or available in > the > > > >>> slot > > > >>> >> pool. > > > >>> >> >> I think we have the same behavior currently. No matter how > many > > > >>> tasks > > > >>> >> the > > > >>> >> >> job master deploy into the slot, concurrently or > sequentially, > > it > > > >>> is > > > >>> >> one > > > >>> >> >> allocation from the cluster to the job until the slot is > freed > > > from > > > >>> >> the job > > > >>> >> >> master. > > > >>> >> >> > > > >>> >> >>> 3. In a session cluster, some jobs are configured with > > > operator > > > >>> >> >>> resources, meanwhile other jobs are using UNKNOWN. How to > > > deal > > > >>> with > > > >>> >> >>> this > > > >>> >> >>> situation? > > > >>> >> >> > > > >>> >> >> As long as we do not mix unknown / specified resource > profiles > > > >>> within > > > >>> >> the > > > >>> >> >> same job / slot, there shouldn't be a problem. Resource > manager > > > >>> >> converts > > > >>> >> >> unknown resource profiles in slot requests to specified > default > > > >>> >> resource > > > >>> >> >> profiles, so they can be dynamically allocated from task > > > executors' > > > >>> >> >> available resources just as other slot requests with > specified > > > >>> resource > > > >>> >> >> profiles. > > > >>> >> >> > > > >>> >> >> Thank you~ > > > >>> >> >> > > > >>> >> >> Xintong Song > > > >>> >> >> > > > >>> >> >> > > > >>> >> >> > > > >>> >> >> On Mon, Aug 19, 2019 at 11:39 AM Yang Wang < > > > danrtsey...@gmail.com> > > > >>> >> wrote: > > > >>> >> >> > > > >>> >> >>> Hi Xintong, > > > >>> >> >>> > > > >>> >> >>> > > > >>> >> >>> Thanks for your detailed proposal. I think many users are > > > >>> suffering > > > >>> >> from > > > >>> >> >>> waste of resources. The resource spec of all task managers > are > > > >>> same > > > >>> >> and > > > >>> >> >>> we > > > >>> >> >>> have to increase all task managers to make the heavy one > more > > > >>> stable. > > > >>> >> So > > > >>> >> >>> we > > > >>> >> >>> will benefit from the fine grained resource management a > lot. > > We > > > >>> could > > > >>> >> >>> get > > > >>> >> >>> better resource utilization and stability. > > > >>> >> >>> > > > >>> >> >>> > > > >>> >> >>> Just to share some thoughts. > > > >>> >> >>> > > > >>> >> >>> > > > >>> >> >>> > > > >>> >> >>> 1. How to calculate the resource specification of > > > >>> TaskManagers? Do > > > >>> >> >>> they > > > >>> >> >>> have them same resource spec calculated based on the > > > >>> >> configuration? I > > > >>> >> >>> think > > > >>> >> >>> we still have wasted resources in this situation. Or we > > could > > > >>> start > > > >>> >> >>> TaskManagers with different spec. > > > >>> >> >>> 2. If a slot is released and returned to SlotPool, does > it > > > >>> could be > > > >>> >> >>> reused by other SlotRequest that the request resource is > > > >>> smaller > > > >>> >> than > > > >>> >> >>> it? > > > >>> >> >>> - If it is yes, what happens to the available resource > > in > > > >>> the > > > >>> >> >>> TaskManager. > > > >>> >> >>> - What is the SlotStatus of the cached slot in > SlotPool? > > > The > > > >>> >> >>> AllocationId is null? > > > >>> >> >>> 3. In a session cluster, some jobs are configured with > > > operator > > > >>> >> >>> resources, meanwhile other jobs are using UNKNOWN. How to > > > deal > > > >>> with > > > >>> >> >>> this > > > >>> >> >>> situation? > > > >>> >> >>> > > > >>> >> >>> > > > >>> >> >>> > > > >>> >> >>> Best, > > > >>> >> >>> Yang > > > >>> >> >>> > > > >>> >> >>> Xintong Song <tonysong...@gmail.com> 于2019年8月16日周五 > 下午8:57写道: > > > >>> >> >>> > > > >>> >> >>> > Thanks for the feedbacks, Yangze and Till. > > > >>> >> >>> > > > > >>> >> >>> > Yangze, > > > >>> >> >>> > > > > >>> >> >>> > I agree with you that we should make scheduling strategy > > > >>> pluggable > > > >>> >> and > > > >>> >> >>> > optimize the strategy to reduce the memory fragmentation > > > >>> problem, > > > >>> >> and > > > >>> >> >>> > thanks for the inputs on the potential algorithmic > > solutions. > > > >>> >> However, > > > >>> >> >>> I'm > > > >>> >> >>> > in favor of keep this FLIP focusing on the overall > mechanism > > > >>> design > > > >>> >> >>> rather > > > >>> >> >>> > than strategies. Solving the fragmentation issue should be > > > >>> >> considered > > > >>> >> >>> as an > > > >>> >> >>> > optimization, and I agree with Till that we probably > should > > > >>> tackle > > > >>> >> this > > > >>> >> >>> > afterwards. > > > >>> >> >>> > > > > >>> >> >>> > Till, > > > >>> >> >>> > > > > >>> >> >>> > - Regarding splitting the FLIP, I think it makes sense. > The > > > >>> operator > > > >>> >> >>> > resource management and dynamic slot allocation do not > have > > > much > > > >>> >> >>> dependency > > > >>> >> >>> > on each other. > > > >>> >> >>> > > > > >>> >> >>> > - Regarding the default slot size, I think this is similar > > to > > > >>> >> FLIP-49 > > > >>> >> >>> [1] > > > >>> >> >>> > where we want all the deriving happens at one place. I > think > > > it > > > >>> >> would > > > >>> >> >>> be > > > >>> >> >>> > nice to pass the default slot size into the task executor > in > > > the > > > >>> >> same > > > >>> >> >>> way > > > >>> >> >>> > that we pass in the memory pool sizes in FLIP-49 [1]. > > > >>> >> >>> > > > > >>> >> >>> > - Regarding the return value of > > > >>> >> TaskExecutorGateway#requestResource, I > > > >>> >> >>> > think you're right. We should avoid using null as the > return > > > >>> value. > > > >>> >> I > > > >>> >> >>> think > > > >>> >> >>> > we probably should thrown an exception here. > > > >>> >> >>> > > > > >>> >> >>> > Thank you~ > > > >>> >> >>> > > > > >>> >> >>> > Xintong Song > > > >>> >> >>> > > > > >>> >> >>> > > > > >>> >> >>> > [1] > > > >>> >> >>> > > > > >>> >> >>> > > > > >>> >> >>> > > > >>> >> > > > >>> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors > > > >>> >> >>> > > > > >>> >> >>> > On Fri, Aug 16, 2019 at 2:18 PM Till Rohrmann < > > > >>> trohrm...@apache.org > > > >>> >> > > > > >>> >> >>> > wrote: > > > >>> >> >>> > > > > >>> >> >>> > > Hi Xintong, > > > >>> >> >>> > > > > > >>> >> >>> > > thanks for drafting this FLIP. I think your proposal > helps > > > to > > > >>> >> >>> improve the > > > >>> >> >>> > > execution of batch jobs more efficiently. Moreover, it > > > >>> enables the > > > >>> >> >>> proper > > > >>> >> >>> > > integration of the Blink planner which is very important > > as > > > >>> well. > > > >>> >> >>> > > > > > >>> >> >>> > > Overall, the FLIP looks good to me. I was wondering > > whether > > > it > > > >>> >> >>> wouldn't > > > >>> >> >>> > > make sense to actually split it up into two FLIPs: > > Operator > > > >>> >> resource > > > >>> >> >>> > > management and dynamic slot allocation. I think these > two > > > >>> FLIPs > > > >>> >> >>> could be > > > >>> >> >>> > > seen as orthogonal and it would decrease the scope of > each > > > >>> >> individual > > > >>> >> >>> > FLIP. > > > >>> >> >>> > > > > > >>> >> >>> > > Some smaller comments: > > > >>> >> >>> > > > > > >>> >> >>> > > - I'm not sure whether we should pass in the default > slot > > > size > > > >>> >> via an > > > >>> >> >>> > > environment variable. Without having unified the way how > > > Flink > > > >>> >> >>> components > > > >>> >> >>> > > are configured [1], I think it would be better to pass > it > > in > > > >>> as > > > >>> >> part > > > >>> >> >>> of > > > >>> >> >>> > the > > > >>> >> >>> > > configuration. > > > >>> >> >>> > > - I would avoid returning a null value from > > > >>> >> >>> > > TaskExecutorGateway#requestResource if it cannot be > > > fulfilled. > > > >>> >> >>> Either we > > > >>> >> >>> > > should introduce an explicit return value saying this or > > > >>> throw an > > > >>> >> >>> > > exception. > > > >>> >> >>> > > > > > >>> >> >>> > > Concerning Yangze's comments: I think you are right that > > it > > > >>> would > > > >>> >> be > > > >>> >> >>> > > helpful to make the selection strategy pluggable. Also > > > >>> batching > > > >>> >> slot > > > >>> >> >>> > > requests to the RM could be a good optimization. For the > > > sake > > > >>> of > > > >>> >> >>> keeping > > > >>> >> >>> > > the scope of this FLIP smaller I would try to tackle > these > > > >>> things > > > >>> >> >>> after > > > >>> >> >>> > the > > > >>> >> >>> > > initial version has been completed (without spoiling > these > > > >>> >> >>> optimization > > > >>> >> >>> > > opportunities). In particular batching the slot requests > > > >>> depends > > > >>> >> on > > > >>> >> >>> the > > > >>> >> >>> > > current scheduler refactoring and could also be realized > > on > > > >>> the RM > > > >>> >> >>> side > > > >>> >> >>> > > only. > > > >>> >> >>> > > > > > >>> >> >>> > > [1] > > > >>> >> >>> > > > > > >>> >> >>> > > > > > >>> >> >>> > > > > >>> >> >>> > > > >>> >> > > > >>> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration > > > >>> >> >>> > > > > > >>> >> >>> > > Cheers, > > > >>> >> >>> > > Till > > > >>> >> >>> > > > > > >>> >> >>> > > > > > >>> >> >>> > > > > > >>> >> >>> > > On Fri, Aug 16, 2019 at 11:11 AM Yangze Guo < > > > >>> karma...@gmail.com> > > > >>> >> >>> wrote: > > > >>> >> >>> > > > > > >>> >> >>> > > > Hi, Xintong > > > >>> >> >>> > > > > > > >>> >> >>> > > > Thanks to propose this FLIP. The general design looks > > good > > > >>> to > > > >>> >> me, > > > >>> >> >>> +1 > > > >>> >> >>> > > > for this feature. > > > >>> >> >>> > > > > > > >>> >> >>> > > > Since slots in the same task executor could have > > different > > > >>> >> resource > > > >>> >> >>> > > > profile, we will > > > >>> >> >>> > > > meet resource fragment problem. Think about this case: > > > >>> >> >>> > > > - request A want 1G memory while request B & C want > > 0.5G > > > >>> memory > > > >>> >> >>> > > > - There are two task executors T1 & T2 with 1G and > 0.5G > > > >>> free > > > >>> >> >>> memory > > > >>> >> >>> > > > respectively > > > >>> >> >>> > > > If B come first and we cut a slot from T1 for B, A > must > > > >>> wait for > > > >>> >> >>> the > > > >>> >> >>> > > > free resource from > > > >>> >> >>> > > > other task. But A could have been scheduled > immediately > > if > > > >>> we > > > >>> >> cut a > > > >>> >> >>> > > > slot from T2 for B. > > > >>> >> >>> > > > > > > >>> >> >>> > > > The logic of findMatchingSlot now become finding a > task > > > >>> executor > > > >>> >> >>> which > > > >>> >> >>> > > > has enough > > > >>> >> >>> > > > resource and then cut a slot from it. Current method > > could > > > >>> be > > > >>> >> seen > > > >>> >> >>> as > > > >>> >> >>> > > > "First-fit strategy", > > > >>> >> >>> > > > which works well in general but sometimes could not be > > the > > > >>> >> >>> optimization > > > >>> >> >>> > > > method. > > > >>> >> >>> > > > > > > >>> >> >>> > > > Actually, this problem could be abstracted as "Bin > > Packing > > > >>> >> >>> Problem"[1]. > > > >>> >> >>> > > > Here are > > > >>> >> >>> > > > some common approximate algorithms: > > > >>> >> >>> > > > - First fit > > > >>> >> >>> > > > - Next fit > > > >>> >> >>> > > > - Best fit > > > >>> >> >>> > > > > > > >>> >> >>> > > > But it become multi-dimensional bin packing problem if > > we > > > >>> take > > > >>> >> CPU > > > >>> >> >>> > > > into account. It hard > > > >>> >> >>> > > > to define which one is best fit now. Some research > > > addressed > > > >>> >> this > > > >>> >> >>> > > > problem, such like Tetris[2]. > > > >>> >> >>> > > > > > > >>> >> >>> > > > Here are some thinking about it: > > > >>> >> >>> > > > 1. We could make the strategy of finding matching task > > > >>> executor > > > >>> >> >>> > > > pluginable. Let user to config the > > > >>> >> >>> > > > best strategy in their scenario. > > > >>> >> >>> > > > 2. We could support batch request interface in RM, > > because > > > >>> we > > > >>> >> have > > > >>> >> >>> > > > opportunities to optimize > > > >>> >> >>> > > > if we have more information. If we know the A, B, C at > > the > > > >>> same > > > >>> >> >>> time, > > > >>> >> >>> > > > we could always make the best decision. > > > >>> >> >>> > > > > > > >>> >> >>> > > > [1] http://www.or.deis.unibo.it/kp/Chapter8.pdf > > > >>> >> >>> > > > [2] > > > >>> >> >>> > > > > >>> >> > > > https://www.cs.cmu.edu/~xia/resources/Documents/grandl_sigcomm14.pdf > > > >>> >> >>> > > > > > > >>> >> >>> > > > Best, > > > >>> >> >>> > > > Yangze Guo > > > >>> >> >>> > > > > > > >>> >> >>> > > > On Thu, Aug 15, 2019 at 10:40 PM Xintong Song < > > > >>> >> >>> tonysong...@gmail.com> > > > >>> >> >>> > > > wrote: > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > Hi everyone, > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > We would like to start a discussion thread on > > "FLIP-53: > > > >>> Fine > > > >>> >> >>> Grained > > > >>> >> >>> > > > > Resource Management"[1], where we propose how to > > improve > > > >>> Flink > > > >>> >> >>> > resource > > > >>> >> >>> > > > > management and scheduling. > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > This FLIP mainly discusses the following issues. > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > - How to support tasks with fine grained resource > > > >>> >> >>> requirements. > > > >>> >> >>> > > > > - How to unify resource management for jobs with > / > > > >>> without > > > >>> >> >>> fine > > > >>> >> >>> > > > grained > > > >>> >> >>> > > > > resource requirements. > > > >>> >> >>> > > > > - How to unify resource management for streaming > / > > > >>> batch > > > >>> >> jobs. > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > Key changes proposed in the FLIP are as follows. > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > - Unify memory management for operators with / > > > without > > > >>> fine > > > >>> >> >>> > grained > > > >>> >> >>> > > > > resource requirements by applying a fraction > based > > > >>> quota > > > >>> >> >>> > mechanism. > > > >>> >> >>> > > > > - Unify resource scheduling for streaming and > batch > > > >>> jobs by > > > >>> >> >>> > setting > > > >>> >> >>> > > > slot > > > >>> >> >>> > > > > sharing groups for pipelined regions during > > compiling > > > >>> >> stage. > > > >>> >> >>> > > > > - Dynamically allocate slots from task executors' > > > >>> available > > > >>> >> >>> > > resources. > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > Please find more details in the FLIP wiki document > > [1]. > > > >>> >> Looking > > > >>> >> >>> > forward > > > >>> >> >>> > > > to > > > >>> >> >>> > > > > your feedbacks. > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > Thank you~ > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > Xintong Song > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > [1] > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > > > >>> >> >>> > > > > > >>> >> >>> > > > > >>> >> >>> > > > >>> >> > > > >>> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Resource+Management > > > >>> >> >>> > > > > > > >>> >> >>> > > > > > >>> >> >>> > > > > >>> >> >>> > > > >>> >> >> > > > >>> >> > > > >>> > > > > >>> > > > >> > > > > > >