Thanks Xintong for the explanation. For question #1, I think it's good as long as DataSet job behaviors remains the same.
For question #2, agreed that the resource difference is small enough(at most 1 edge diff) in current supported point-wise execution edge connection patterns. Thanks, Zhu Zhu Xintong Song <tonysong...@gmail.com> 于2019年9月3日周二 下午6:58写道: > Thanks for the comments, Zhu & Kurt. > > Andrey and I also had some discussions offline, and I would like to first > post a summary of our discussion: > > 1. The motivation of the fraction based approach is to unify resource > management for both operators with specified and unknown resource > requirements. > 2. The fraction based approach proposed in this FLIP should only affect > streaming jobs (both bounded and unbounded). For DataSet jobs, there are > already some fraction based approach (in TaskConfig and ChainedDriver), > and > we do not make any change to the existing approach. > 3. The scope of this FLIP does not include discussion of how to set > ResourceSpec for operators. > 1. For blink jobs, the optimizer can set operator resources for the > users, according to their configurations (default: unknown) > 2. For DataStream jobs, there are no method / interface to set > operator resources at the moment (1.10). We can have in the future. > 3. For DataSet jobs, there are existing user interfaces to set > operator resources. > 4. The FLIP should explain more about how ResourceSpecs works > 1. PhysicalTransformations (deployed with operators into the > StreamTasks) get ResourceSpec: unknown by default or known (e.g. > from the > Blink planner) > 2. While generating stream graph, calculate fractions and set to > StreamConfig > 3. While scheduling, convert ResourceSpec to ResourceProfile > (ResourceSpec + network memory), and deploy to slots / TMs matching > the > resources > 4. While starting Task in TM, each operator gets fraction converted > back to the original absolute value requested by user or fair > unknown share > of the slot > 5. We should not set `allSourcesInSamePipelinedRegion` to `false` for > DataSet jobs. Behaviors of DataSet jobs should not be changed. > 6. The FLIP document should differentiate works planed in this FLIP and > the future follow-ups more clearly, by put the follow-ups in a separate > section > 7. Another limitation of the rejected alternative setting fractions at > scheduling time is that, the scheduler implementation does not know > which > tasks will be deployed into the same slot in advance. > > Andrey, Please bring it up if there is anything I missed. > > Zhu, regarding your comments: > > 1. If we do not set `allSourcesInSamePipelinedRegion` to `false` for > DataSet jobs (point 5 in the discussion summary above), then there > shouldn't be any regression right? > 2. I think it makes sense to set the max possible network memory for the > JobVertex. When you say parallel instances of the same JobVertex may > have > need different network memory, I guess you mean the rescale scenarios > where > parallelisms of upstream / downstream vertex cannot be exactly divided > by > parallelism of downstream / upstream vertex? I would say it's > acceptable to > have slight difference between actually needed and allocated network > memory. > 3. Yes, by numOpsUseOnHeapManagedMemory I mean > numOpsUseOnHeapManagedMemoryInTheSameSharedGroup. I'll update the doc. > 4. Yes, it should be StreamingJobGraphGenerator. Thanks for the > correction. > > > Kurt, regarding your comments: > > 1. I think we don't have network memory in ResourceSpec, which is the > user facing API. We only have network memory in ResourceProfile, which > is > used internally for scheduling. The reason we do not expose network > memory > to the user is that, currently how many network buffers each task needs > is > decided by the topology of execution graph (how many input / output > channels it has). > 2. In the section "Operator Resource Requirements": "For the first > version, we do not support mixing operators with specified / unknown > resource requirements in the same job. Either all or none of the > operators > of the same job should specify their resource requirements. > StreamGraphGenerator should check this and throw an error when mixing of > specified / unknown resource requirements is detected, during the > compilation stage." > 3. If the user set a resource requirement, then it is guaranteed that > the task should get at least the much resource, otherwise there should > be > an exception. That should be guaranteed by the "Dynamic Slot Allocation" > approach (FLIP-56). > > > I'll update the FLIP document addressing the comments ASAP. > > > Thank you~ > > Xintong Song > > > > On Tue, Sep 3, 2019 at 2:42 PM Kurt Young <ykt...@gmail.com> wrote: > > > Thanks Xingtong for driving this effort, I haven't finished the whole > > document yet, > > but have couple of questions: > > > > 1. Regarding to network memory, the document said it will be derived by > > framework > > automatically. I'm wondering whether we should delete this dimension from > > user- > > facing API? > > > > 2. Regarding to fraction based quota, I don't quite get the meaning of > > "slotSharingGroupOnHeapManagedMem" and > "slotSharingGroupOffHeapManagedMem". > > What if the sharing group is mixed with specified resource and UNKNOWN > > resource > > requirements. > > > > 3 IIUC, even user had set resource requirements, lets say 500MB off-heap > > managed > > memory, during execution the operator may or may not have 500MB off-heap > > managed > > memory, right? > > > > Best, > > Kurt > > > > > > On Mon, Sep 2, 2019 at 8:36 PM Zhu Zhu <reed...@gmail.com> wrote: > > > > > Thanks Xintong for proposing this improvement. Fine grained resources > can > > > be very helpful when user has good planning on resources. > > > > > > I have a few questions: > > > 1. Currently in a batch job, vertices from different regions can run at > > the > > > same time in slots from the same shared group, as long as they do not > > have > > > data dependency on each other and available slot count is not smaller > > than > > > the *max* of parallelism of all tasks. > > > With changes in this FLIP however, tasks from different regions cannot > > > share slots anymore. > > > Once available slot count is smaller than the *sum* of all parallelism > of > > > tasks from all regions, tasks may need to be executed sequentially, > which > > > might result in a performance regression. > > > Is this(performance regression to existing DataSet jobs) considered as > a > > > necessary and accepted trade off in this FLIP? > > > > > > 2. The network memory depends on the input/output ExecutionEdge count > and > > > thus can be different even for parallel instances of the same > JobVertex. > > > Does this mean that when adding task resources to calculating the slot > > > resource for a shared group, the max possible network memory of the > > vertex > > > instance shall be used? > > > This might result in larger resource required than actually needed. > > > > > > And some minor comments: > > > 1. Regarding "fracManagedMemOnHeap = 1 / > numOpsUseOnHeapManagedMemory", I > > > guess you mean numOpsUseOnHeapManagedMemoryInTheSameSharedGroup ? > > > 2. I think the *StreamGraphGenerator* in the #Slot Sharing section and > > > implementation step 4 should be *StreamingJobGraphGenerator*, as > > > *StreamGraphGenerator* is not aware of JobGraph and pipelined region. > > > > > > > > > Thanks, > > > Zhu Zhu > > > > > > Xintong Song <tonysong...@gmail.com> 于2019年9月2日周一 上午11:59写道: > > > > > > > Updated the FLIP wiki page [1], with the following changes. > > > > > > > > - Remove the step of converting pipelined edges between different > > slot > > > > sharing groups into blocking edges. > > > > - Set `allSourcesInSamePipelinedRegion` to true by default. > > > > > > > > Thank you~ > > > > > > > > Xintong Song > > > > > > > > > > > > > > > > On Mon, Sep 2, 2019 at 11:50 AM Xintong Song <tonysong...@gmail.com> > > > > wrote: > > > > > > > > > Regarding changing edge type, I think actually we don't need to do > > this > > > > > for batch jobs neither, because we don't have public interfaces for > > > users > > > > > to explicitly set slot sharing groups in DataSet API and SQL/Table > > API. > > > > We > > > > > have such interfaces in DataStream API only. > > > > > > > > > > Thank you~ > > > > > > > > > > Xintong Song > > > > > > > > > > > > > > > > > > > > On Tue, Aug 27, 2019 at 10:16 PM Xintong Song < > tonysong...@gmail.com > > > > > > > > wrote: > > > > > > > > > >> Thanks for the correction, Till. > > > > >> > > > > >> Regarding your comments: > > > > >> - You are right, we should not change the edge type for streaming > > > jobs. > > > > >> Then I think we can change the option > > > 'allSourcesInSamePipelinedRegion' > > > > in > > > > >> step 2 to 'isStreamingJob', and implement the current step 2 > before > > > the > > > > >> current step 1 so we can use this option to decide whether should > > > change > > > > >> the edge type. What do you think? > > > > >> - Agree. It should be easier to make the default value of > > > > >> 'allSourcesInSamePipelinedRegion' (or 'isStreamingJob') 'true', > and > > > set > > > > it > > > > >> to 'false' when using DataSet API or blink planner. > > > > >> > > > > >> Thank you~ > > > > >> > > > > >> Xintong Song > > > > >> > > > > >> > > > > >> > > > > >> On Tue, Aug 27, 2019 at 8:59 PM Till Rohrmann < > trohrm...@apache.org > > > > > > > >> wrote: > > > > >> > > > > >>> Thanks for creating the implementation plan Xintong. Overall, the > > > > >>> implementation plan looks good. I had a couple of comments: > > > > >>> > > > > >>> - What will happen if a user has defined a streaming job with two > > > slot > > > > >>> sharing groups? Would the code insert a blocking data exchange > > > between > > > > >>> these two groups? If yes, then this breaks existing Flink > streaming > > > > jobs. > > > > >>> - How do we detect unbounded streaming jobs to set > > > > >>> the allSourcesInSamePipelinedRegion to `true`? Wouldn't it be > > easier > > > to > > > > >>> set > > > > >>> it false if we are using the DataSet API or the Blink planner > with > > a > > > > >>> bounded job? > > > > >>> > > > > >>> Cheers, > > > > >>> Till > > > > >>> > > > > >>> On Tue, Aug 27, 2019 at 2:16 PM Till Rohrmann < > > trohrm...@apache.org> > > > > >>> wrote: > > > > >>> > > > > >>> > I guess there is a typo since the link to the FLIP-53 is > > > > >>> > > > > > >>> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management > > > > >>> > > > > > >>> > Cheers, > > > > >>> > Till > > > > >>> > > > > > >>> > On Tue, Aug 27, 2019 at 1:42 PM Xintong Song < > > > tonysong...@gmail.com> > > > > >>> > wrote: > > > > >>> > > > > > >>> >> Added implementation steps for this FLIP on the wiki page [1]. > > > > >>> >> > > > > >>> >> > > > > >>> >> Thank you~ > > > > >>> >> > > > > >>> >> Xintong Song > > > > >>> >> > > > > >>> >> > > > > >>> >> [1] > > > > >>> >> > > > > >>> >> > > > > >>> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors > > > > >>> >> > > > > >>> >> On Mon, Aug 19, 2019 at 10:29 PM Xintong Song < > > > > tonysong...@gmail.com> > > > > >>> >> wrote: > > > > >>> >> > > > > >>> >> > Hi everyone, > > > > >>> >> > > > > > >>> >> > As Till suggested, the original "FLIP-53: Fine Grained > > Resource > > > > >>> >> > Management" splits into two separate FLIPs, > > > > >>> >> > > > > > >>> >> > - FLIP-53: Fine Grained Operator Resource Management [1] > > > > >>> >> > - FLIP-56: Dynamic Slot Allocation [2] > > > > >>> >> > > > > > >>> >> > We'll continue using this discussion thread for FLIP-53. For > > > > >>> FLIP-56, I > > > > >>> >> > just started a new discussion thread [3]. > > > > >>> >> > > > > > >>> >> > Thank you~ > > > > >>> >> > > > > > >>> >> > Xintong Song > > > > >>> >> > > > > > >>> >> > > > > > >>> >> > [1] > > > > >>> >> > > > > > >>> >> > > > > >>> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management > > > > >>> >> > > > > > >>> >> > [2] > > > > >>> >> > > > > > >>> >> > > > > >>> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation > > > > >>> >> > > > > > >>> >> > [3] > > > > >>> >> > > > > > >>> >> > > > > >>> > > > > > > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-56-Dynamic-Slot-Allocation-td31960.html > > > > >>> >> > > > > > >>> >> > On Mon, Aug 19, 2019 at 2:55 PM Xintong Song < > > > > tonysong...@gmail.com > > > > >>> > > > > > >>> >> > wrote: > > > > >>> >> > > > > > >>> >> >> Thinks for the comments, Yang. > > > > >>> >> >> > > > > >>> >> >> Regarding your questions: > > > > >>> >> >> > > > > >>> >> >> 1. How to calculate the resource specification of > > > > TaskManagers? > > > > >>> Do > > > > >>> >> they > > > > >>> >> >>> have them same resource spec calculated based on the > > > > >>> >> configuration? I > > > > >>> >> >>> think > > > > >>> >> >>> we still have wasted resources in this situation. Or we > > > could > > > > >>> start > > > > >>> >> >>> TaskManagers with different spec. > > > > >>> >> >>> > > > > >>> >> >> I agree with you that we can further improve the resource > > > utility > > > > >>> by > > > > >>> >> >> customizing task executors with different resource > > > > specifications. > > > > >>> >> However, > > > > >>> >> >> I'm in favor of limiting the scope of this FLIP and leave > it > > > as a > > > > >>> >> future > > > > >>> >> >> optimization. The plan for that part is to move the logic > of > > > > >>> deciding > > > > >>> >> task > > > > >>> >> >> executor specifications into the slot manager and make slot > > > > manager > > > > >>> >> >> pluggable, so inside the slot manager plugin we can have > > > > different > > > > >>> >> logics > > > > >>> >> >> for deciding the task executor specifications. > > > > >>> >> >> > > > > >>> >> >> > > > > >>> >> >>> 2. If a slot is released and returned to SlotPool, does > > it > > > > >>> could be > > > > >>> >> >>> reused by other SlotRequest that the request resource > is > > > > >>> smaller > > > > >>> >> than > > > > >>> >> >>> it? > > > > >>> >> >>> > > > > >>> >> >> No, I think slot pool should always return slots if they do > > not > > > > >>> exactly > > > > >>> >> >> match the pending requests, so that resource manager can > deal > > > > with > > > > >>> the > > > > >>> >> >> extra resources. > > > > >>> >> >> > > > > >>> >> >>> - If it is yes, what happens to the available > resource > > > in > > > > >>> the > > > > >>> >> >> > > > > >>> >> >> TaskManager. > > > > >>> >> >>> - What is the SlotStatus of the cached slot in > > SlotPool? > > > > The > > > > >>> >> >>> AllocationId is null? > > > > >>> >> >>> > > > > >>> >> >> The allocation id does not change as long as the slot is > not > > > > >>> returned > > > > >>> >> >> from the job master, no matter its occupied or available in > > the > > > > >>> slot > > > > >>> >> pool. > > > > >>> >> >> I think we have the same behavior currently. No matter how > > many > > > > >>> tasks > > > > >>> >> the > > > > >>> >> >> job master deploy into the slot, concurrently or > > sequentially, > > > it > > > > >>> is > > > > >>> >> one > > > > >>> >> >> allocation from the cluster to the job until the slot is > > freed > > > > from > > > > >>> >> the job > > > > >>> >> >> master. > > > > >>> >> >> > > > > >>> >> >>> 3. In a session cluster, some jobs are configured with > > > > operator > > > > >>> >> >>> resources, meanwhile other jobs are using UNKNOWN. How > to > > > > deal > > > > >>> with > > > > >>> >> >>> this > > > > >>> >> >>> situation? > > > > >>> >> >> > > > > >>> >> >> As long as we do not mix unknown / specified resource > > profiles > > > > >>> within > > > > >>> >> the > > > > >>> >> >> same job / slot, there shouldn't be a problem. Resource > > manager > > > > >>> >> converts > > > > >>> >> >> unknown resource profiles in slot requests to specified > > default > > > > >>> >> resource > > > > >>> >> >> profiles, so they can be dynamically allocated from task > > > > executors' > > > > >>> >> >> available resources just as other slot requests with > > specified > > > > >>> resource > > > > >>> >> >> profiles. > > > > >>> >> >> > > > > >>> >> >> Thank you~ > > > > >>> >> >> > > > > >>> >> >> Xintong Song > > > > >>> >> >> > > > > >>> >> >> > > > > >>> >> >> > > > > >>> >> >> On Mon, Aug 19, 2019 at 11:39 AM Yang Wang < > > > > danrtsey...@gmail.com> > > > > >>> >> wrote: > > > > >>> >> >> > > > > >>> >> >>> Hi Xintong, > > > > >>> >> >>> > > > > >>> >> >>> > > > > >>> >> >>> Thanks for your detailed proposal. I think many users are > > > > >>> suffering > > > > >>> >> from > > > > >>> >> >>> waste of resources. The resource spec of all task managers > > are > > > > >>> same > > > > >>> >> and > > > > >>> >> >>> we > > > > >>> >> >>> have to increase all task managers to make the heavy one > > more > > > > >>> stable. > > > > >>> >> So > > > > >>> >> >>> we > > > > >>> >> >>> will benefit from the fine grained resource management a > > lot. > > > We > > > > >>> could > > > > >>> >> >>> get > > > > >>> >> >>> better resource utilization and stability. > > > > >>> >> >>> > > > > >>> >> >>> > > > > >>> >> >>> Just to share some thoughts. > > > > >>> >> >>> > > > > >>> >> >>> > > > > >>> >> >>> > > > > >>> >> >>> 1. How to calculate the resource specification of > > > > >>> TaskManagers? Do > > > > >>> >> >>> they > > > > >>> >> >>> have them same resource spec calculated based on the > > > > >>> >> configuration? I > > > > >>> >> >>> think > > > > >>> >> >>> we still have wasted resources in this situation. Or we > > > could > > > > >>> start > > > > >>> >> >>> TaskManagers with different spec. > > > > >>> >> >>> 2. If a slot is released and returned to SlotPool, does > > it > > > > >>> could be > > > > >>> >> >>> reused by other SlotRequest that the request resource > is > > > > >>> smaller > > > > >>> >> than > > > > >>> >> >>> it? > > > > >>> >> >>> - If it is yes, what happens to the available > resource > > > in > > > > >>> the > > > > >>> >> >>> TaskManager. > > > > >>> >> >>> - What is the SlotStatus of the cached slot in > > SlotPool? > > > > The > > > > >>> >> >>> AllocationId is null? > > > > >>> >> >>> 3. In a session cluster, some jobs are configured with > > > > operator > > > > >>> >> >>> resources, meanwhile other jobs are using UNKNOWN. How > to > > > > deal > > > > >>> with > > > > >>> >> >>> this > > > > >>> >> >>> situation? > > > > >>> >> >>> > > > > >>> >> >>> > > > > >>> >> >>> > > > > >>> >> >>> Best, > > > > >>> >> >>> Yang > > > > >>> >> >>> > > > > >>> >> >>> Xintong Song <tonysong...@gmail.com> 于2019年8月16日周五 > > 下午8:57写道: > > > > >>> >> >>> > > > > >>> >> >>> > Thanks for the feedbacks, Yangze and Till. > > > > >>> >> >>> > > > > > >>> >> >>> > Yangze, > > > > >>> >> >>> > > > > > >>> >> >>> > I agree with you that we should make scheduling strategy > > > > >>> pluggable > > > > >>> >> and > > > > >>> >> >>> > optimize the strategy to reduce the memory fragmentation > > > > >>> problem, > > > > >>> >> and > > > > >>> >> >>> > thanks for the inputs on the potential algorithmic > > > solutions. > > > > >>> >> However, > > > > >>> >> >>> I'm > > > > >>> >> >>> > in favor of keep this FLIP focusing on the overall > > mechanism > > > > >>> design > > > > >>> >> >>> rather > > > > >>> >> >>> > than strategies. Solving the fragmentation issue should > be > > > > >>> >> considered > > > > >>> >> >>> as an > > > > >>> >> >>> > optimization, and I agree with Till that we probably > > should > > > > >>> tackle > > > > >>> >> this > > > > >>> >> >>> > afterwards. > > > > >>> >> >>> > > > > > >>> >> >>> > Till, > > > > >>> >> >>> > > > > > >>> >> >>> > - Regarding splitting the FLIP, I think it makes sense. > > The > > > > >>> operator > > > > >>> >> >>> > resource management and dynamic slot allocation do not > > have > > > > much > > > > >>> >> >>> dependency > > > > >>> >> >>> > on each other. > > > > >>> >> >>> > > > > > >>> >> >>> > - Regarding the default slot size, I think this is > similar > > > to > > > > >>> >> FLIP-49 > > > > >>> >> >>> [1] > > > > >>> >> >>> > where we want all the deriving happens at one place. I > > think > > > > it > > > > >>> >> would > > > > >>> >> >>> be > > > > >>> >> >>> > nice to pass the default slot size into the task > executor > > in > > > > the > > > > >>> >> same > > > > >>> >> >>> way > > > > >>> >> >>> > that we pass in the memory pool sizes in FLIP-49 [1]. > > > > >>> >> >>> > > > > > >>> >> >>> > - Regarding the return value of > > > > >>> >> TaskExecutorGateway#requestResource, I > > > > >>> >> >>> > think you're right. We should avoid using null as the > > return > > > > >>> value. > > > > >>> >> I > > > > >>> >> >>> think > > > > >>> >> >>> > we probably should thrown an exception here. > > > > >>> >> >>> > > > > > >>> >> >>> > Thank you~ > > > > >>> >> >>> > > > > > >>> >> >>> > Xintong Song > > > > >>> >> >>> > > > > > >>> >> >>> > > > > > >>> >> >>> > [1] > > > > >>> >> >>> > > > > > >>> >> >>> > > > > > >>> >> >>> > > > > >>> >> > > > > >>> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors > > > > >>> >> >>> > > > > > >>> >> >>> > On Fri, Aug 16, 2019 at 2:18 PM Till Rohrmann < > > > > >>> trohrm...@apache.org > > > > >>> >> > > > > > >>> >> >>> > wrote: > > > > >>> >> >>> > > > > > >>> >> >>> > > Hi Xintong, > > > > >>> >> >>> > > > > > > >>> >> >>> > > thanks for drafting this FLIP. I think your proposal > > helps > > > > to > > > > >>> >> >>> improve the > > > > >>> >> >>> > > execution of batch jobs more efficiently. Moreover, it > > > > >>> enables the > > > > >>> >> >>> proper > > > > >>> >> >>> > > integration of the Blink planner which is very > important > > > as > > > > >>> well. > > > > >>> >> >>> > > > > > > >>> >> >>> > > Overall, the FLIP looks good to me. I was wondering > > > whether > > > > it > > > > >>> >> >>> wouldn't > > > > >>> >> >>> > > make sense to actually split it up into two FLIPs: > > > Operator > > > > >>> >> resource > > > > >>> >> >>> > > management and dynamic slot allocation. I think these > > two > > > > >>> FLIPs > > > > >>> >> >>> could be > > > > >>> >> >>> > > seen as orthogonal and it would decrease the scope of > > each > > > > >>> >> individual > > > > >>> >> >>> > FLIP. > > > > >>> >> >>> > > > > > > >>> >> >>> > > Some smaller comments: > > > > >>> >> >>> > > > > > > >>> >> >>> > > - I'm not sure whether we should pass in the default > > slot > > > > size > > > > >>> >> via an > > > > >>> >> >>> > > environment variable. Without having unified the way > how > > > > Flink > > > > >>> >> >>> components > > > > >>> >> >>> > > are configured [1], I think it would be better to pass > > it > > > in > > > > >>> as > > > > >>> >> part > > > > >>> >> >>> of > > > > >>> >> >>> > the > > > > >>> >> >>> > > configuration. > > > > >>> >> >>> > > - I would avoid returning a null value from > > > > >>> >> >>> > > TaskExecutorGateway#requestResource if it cannot be > > > > fulfilled. > > > > >>> >> >>> Either we > > > > >>> >> >>> > > should introduce an explicit return value saying this > or > > > > >>> throw an > > > > >>> >> >>> > > exception. > > > > >>> >> >>> > > > > > > >>> >> >>> > > Concerning Yangze's comments: I think you are right > that > > > it > > > > >>> would > > > > >>> >> be > > > > >>> >> >>> > > helpful to make the selection strategy pluggable. Also > > > > >>> batching > > > > >>> >> slot > > > > >>> >> >>> > > requests to the RM could be a good optimization. For > the > > > > sake > > > > >>> of > > > > >>> >> >>> keeping > > > > >>> >> >>> > > the scope of this FLIP smaller I would try to tackle > > these > > > > >>> things > > > > >>> >> >>> after > > > > >>> >> >>> > the > > > > >>> >> >>> > > initial version has been completed (without spoiling > > these > > > > >>> >> >>> optimization > > > > >>> >> >>> > > opportunities). In particular batching the slot > requests > > > > >>> depends > > > > >>> >> on > > > > >>> >> >>> the > > > > >>> >> >>> > > current scheduler refactoring and could also be > realized > > > on > > > > >>> the RM > > > > >>> >> >>> side > > > > >>> >> >>> > > only. > > > > >>> >> >>> > > > > > > >>> >> >>> > > [1] > > > > >>> >> >>> > > > > > > >>> >> >>> > > > > > > >>> >> >>> > > > > > >>> >> >>> > > > > >>> >> > > > > >>> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration > > > > >>> >> >>> > > > > > > >>> >> >>> > > Cheers, > > > > >>> >> >>> > > Till > > > > >>> >> >>> > > > > > > >>> >> >>> > > > > > > >>> >> >>> > > > > > > >>> >> >>> > > On Fri, Aug 16, 2019 at 11:11 AM Yangze Guo < > > > > >>> karma...@gmail.com> > > > > >>> >> >>> wrote: > > > > >>> >> >>> > > > > > > >>> >> >>> > > > Hi, Xintong > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > Thanks to propose this FLIP. The general design > looks > > > good > > > > >>> to > > > > >>> >> me, > > > > >>> >> >>> +1 > > > > >>> >> >>> > > > for this feature. > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > Since slots in the same task executor could have > > > different > > > > >>> >> resource > > > > >>> >> >>> > > > profile, we will > > > > >>> >> >>> > > > meet resource fragment problem. Think about this > case: > > > > >>> >> >>> > > > - request A want 1G memory while request B & C want > > > 0.5G > > > > >>> memory > > > > >>> >> >>> > > > - There are two task executors T1 & T2 with 1G and > > 0.5G > > > > >>> free > > > > >>> >> >>> memory > > > > >>> >> >>> > > > respectively > > > > >>> >> >>> > > > If B come first and we cut a slot from T1 for B, A > > must > > > > >>> wait for > > > > >>> >> >>> the > > > > >>> >> >>> > > > free resource from > > > > >>> >> >>> > > > other task. But A could have been scheduled > > immediately > > > if > > > > >>> we > > > > >>> >> cut a > > > > >>> >> >>> > > > slot from T2 for B. > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > The logic of findMatchingSlot now become finding a > > task > > > > >>> executor > > > > >>> >> >>> which > > > > >>> >> >>> > > > has enough > > > > >>> >> >>> > > > resource and then cut a slot from it. Current method > > > could > > > > >>> be > > > > >>> >> seen > > > > >>> >> >>> as > > > > >>> >> >>> > > > "First-fit strategy", > > > > >>> >> >>> > > > which works well in general but sometimes could not > be > > > the > > > > >>> >> >>> optimization > > > > >>> >> >>> > > > method. > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > Actually, this problem could be abstracted as "Bin > > > Packing > > > > >>> >> >>> Problem"[1]. > > > > >>> >> >>> > > > Here are > > > > >>> >> >>> > > > some common approximate algorithms: > > > > >>> >> >>> > > > - First fit > > > > >>> >> >>> > > > - Next fit > > > > >>> >> >>> > > > - Best fit > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > But it become multi-dimensional bin packing problem > if > > > we > > > > >>> take > > > > >>> >> CPU > > > > >>> >> >>> > > > into account. It hard > > > > >>> >> >>> > > > to define which one is best fit now. Some research > > > > addressed > > > > >>> >> this > > > > >>> >> >>> > > > problem, such like Tetris[2]. > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > Here are some thinking about it: > > > > >>> >> >>> > > > 1. We could make the strategy of finding matching > task > > > > >>> executor > > > > >>> >> >>> > > > pluginable. Let user to config the > > > > >>> >> >>> > > > best strategy in their scenario. > > > > >>> >> >>> > > > 2. We could support batch request interface in RM, > > > because > > > > >>> we > > > > >>> >> have > > > > >>> >> >>> > > > opportunities to optimize > > > > >>> >> >>> > > > if we have more information. If we know the A, B, C > at > > > the > > > > >>> same > > > > >>> >> >>> time, > > > > >>> >> >>> > > > we could always make the best decision. > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > [1] http://www.or.deis.unibo.it/kp/Chapter8.pdf > > > > >>> >> >>> > > > [2] > > > > >>> >> >>> > > > > > >>> >> > > > > https://www.cs.cmu.edu/~xia/resources/Documents/grandl_sigcomm14.pdf > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > Best, > > > > >>> >> >>> > > > Yangze Guo > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > On Thu, Aug 15, 2019 at 10:40 PM Xintong Song < > > > > >>> >> >>> tonysong...@gmail.com> > > > > >>> >> >>> > > > wrote: > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > Hi everyone, > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > We would like to start a discussion thread on > > > "FLIP-53: > > > > >>> Fine > > > > >>> >> >>> Grained > > > > >>> >> >>> > > > > Resource Management"[1], where we propose how to > > > improve > > > > >>> Flink > > > > >>> >> >>> > resource > > > > >>> >> >>> > > > > management and scheduling. > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > This FLIP mainly discusses the following issues. > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > - How to support tasks with fine grained > resource > > > > >>> >> >>> requirements. > > > > >>> >> >>> > > > > - How to unify resource management for jobs > with > > / > > > > >>> without > > > > >>> >> >>> fine > > > > >>> >> >>> > > > grained > > > > >>> >> >>> > > > > resource requirements. > > > > >>> >> >>> > > > > - How to unify resource management for > streaming > > / > > > > >>> batch > > > > >>> >> jobs. > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > Key changes proposed in the FLIP are as follows. > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > - Unify memory management for operators with / > > > > without > > > > >>> fine > > > > >>> >> >>> > grained > > > > >>> >> >>> > > > > resource requirements by applying a fraction > > based > > > > >>> quota > > > > >>> >> >>> > mechanism. > > > > >>> >> >>> > > > > - Unify resource scheduling for streaming and > > batch > > > > >>> jobs by > > > > >>> >> >>> > setting > > > > >>> >> >>> > > > slot > > > > >>> >> >>> > > > > sharing groups for pipelined regions during > > > compiling > > > > >>> >> stage. > > > > >>> >> >>> > > > > - Dynamically allocate slots from task > executors' > > > > >>> available > > > > >>> >> >>> > > resources. > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > Please find more details in the FLIP wiki document > > > [1]. > > > > >>> >> Looking > > > > >>> >> >>> > forward > > > > >>> >> >>> > > > to > > > > >>> >> >>> > > > > your feedbacks. > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > Thank you~ > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > Xintong Song > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > [1] > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > > > >>> >> >>> > > > > > >>> >> >>> > > > > >>> >> > > > > >>> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Resource+Management > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > > > >>> >> >>> > > > > > >>> >> >>> > > > > >>> >> >> > > > > >>> >> > > > > >>> > > > > > >>> > > > > >> > > > > > > > > > >