@all The FLIP document [1] has been updated.
Thank you~ Xintong Song [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management On Tue, Sep 3, 2019 at 7:20 PM Zhu Zhu <reed...@gmail.com> wrote: > Thanks Xintong for the explanation. > > For question #1, I think it's good as long as DataSet job behaviors remains > the same. > > For question #2, agreed that the resource difference is small enough(at > most 1 edge diff) in current supported point-wise execution edge connection > patterns. > > Thanks, > Zhu Zhu > > Xintong Song <tonysong...@gmail.com> 于2019年9月3日周二 下午6:58写道: > > > Thanks for the comments, Zhu & Kurt. > > > > Andrey and I also had some discussions offline, and I would like to first > > post a summary of our discussion: > > > > 1. The motivation of the fraction based approach is to unify resource > > management for both operators with specified and unknown resource > > requirements. > > 2. The fraction based approach proposed in this FLIP should only > affect > > streaming jobs (both bounded and unbounded). For DataSet jobs, there > are > > already some fraction based approach (in TaskConfig and > ChainedDriver), > > and > > we do not make any change to the existing approach. > > 3. The scope of this FLIP does not include discussion of how to set > > ResourceSpec for operators. > > 1. For blink jobs, the optimizer can set operator resources for the > > users, according to their configurations (default: unknown) > > 2. For DataStream jobs, there are no method / interface to set > > operator resources at the moment (1.10). We can have in the future. > > 3. For DataSet jobs, there are existing user interfaces to set > > operator resources. > > 4. The FLIP should explain more about how ResourceSpecs works > > 1. PhysicalTransformations (deployed with operators into the > > StreamTasks) get ResourceSpec: unknown by default or known (e.g. > > from the > > Blink planner) > > 2. While generating stream graph, calculate fractions and set to > > StreamConfig > > 3. While scheduling, convert ResourceSpec to ResourceProfile > > (ResourceSpec + network memory), and deploy to slots / TMs matching > > the > > resources > > 4. While starting Task in TM, each operator gets fraction converted > > back to the original absolute value requested by user or fair > > unknown share > > of the slot > > 5. We should not set `allSourcesInSamePipelinedRegion` to `false` > for > > DataSet jobs. Behaviors of DataSet jobs should not be changed. > > 6. The FLIP document should differentiate works planed in this FLIP > and > > the future follow-ups more clearly, by put the follow-ups in a > separate > > section > > 7. Another limitation of the rejected alternative setting fractions at > > scheduling time is that, the scheduler implementation does not know > > which > > tasks will be deployed into the same slot in advance. > > > > Andrey, Please bring it up if there is anything I missed. > > > > Zhu, regarding your comments: > > > > 1. If we do not set `allSourcesInSamePipelinedRegion` to `false` for > > DataSet jobs (point 5 in the discussion summary above), then there > > shouldn't be any regression right? > > 2. I think it makes sense to set the max possible network memory for > the > > JobVertex. When you say parallel instances of the same JobVertex may > > have > > need different network memory, I guess you mean the rescale scenarios > > where > > parallelisms of upstream / downstream vertex cannot be exactly divided > > by > > parallelism of downstream / upstream vertex? I would say it's > > acceptable to > > have slight difference between actually needed and allocated network > > memory. > > 3. Yes, by numOpsUseOnHeapManagedMemory I mean > > numOpsUseOnHeapManagedMemoryInTheSameSharedGroup. I'll update the doc. > > 4. Yes, it should be StreamingJobGraphGenerator. Thanks for the > > correction. > > > > > > Kurt, regarding your comments: > > > > 1. I think we don't have network memory in ResourceSpec, which is the > > user facing API. We only have network memory in ResourceProfile, which > > is > > used internally for scheduling. The reason we do not expose network > > memory > > to the user is that, currently how many network buffers each task > needs > > is > > decided by the topology of execution graph (how many input / output > > channels it has). > > 2. In the section "Operator Resource Requirements": "For the first > > version, we do not support mixing operators with specified / unknown > > resource requirements in the same job. Either all or none of the > > operators > > of the same job should specify their resource requirements. > > StreamGraphGenerator should check this and throw an error when mixing > of > > specified / unknown resource requirements is detected, during the > > compilation stage." > > 3. If the user set a resource requirement, then it is guaranteed that > > the task should get at least the much resource, otherwise there should > > be > > an exception. That should be guaranteed by the "Dynamic Slot > Allocation" > > approach (FLIP-56). > > > > > > I'll update the FLIP document addressing the comments ASAP. > > > > > > Thank you~ > > > > Xintong Song > > > > > > > > On Tue, Sep 3, 2019 at 2:42 PM Kurt Young <ykt...@gmail.com> wrote: > > > > > Thanks Xingtong for driving this effort, I haven't finished the whole > > > document yet, > > > but have couple of questions: > > > > > > 1. Regarding to network memory, the document said it will be derived by > > > framework > > > automatically. I'm wondering whether we should delete this dimension > from > > > user- > > > facing API? > > > > > > 2. Regarding to fraction based quota, I don't quite get the meaning of > > > "slotSharingGroupOnHeapManagedMem" and > > "slotSharingGroupOffHeapManagedMem". > > > What if the sharing group is mixed with specified resource and UNKNOWN > > > resource > > > requirements. > > > > > > 3 IIUC, even user had set resource requirements, lets say 500MB > off-heap > > > managed > > > memory, during execution the operator may or may not have 500MB > off-heap > > > managed > > > memory, right? > > > > > > Best, > > > Kurt > > > > > > > > > On Mon, Sep 2, 2019 at 8:36 PM Zhu Zhu <reed...@gmail.com> wrote: > > > > > > > Thanks Xintong for proposing this improvement. Fine grained resources > > can > > > > be very helpful when user has good planning on resources. > > > > > > > > I have a few questions: > > > > 1. Currently in a batch job, vertices from different regions can run > at > > > the > > > > same time in slots from the same shared group, as long as they do not > > > have > > > > data dependency on each other and available slot count is not smaller > > > than > > > > the *max* of parallelism of all tasks. > > > > With changes in this FLIP however, tasks from different regions > cannot > > > > share slots anymore. > > > > Once available slot count is smaller than the *sum* of all > parallelism > > of > > > > tasks from all regions, tasks may need to be executed sequentially, > > which > > > > might result in a performance regression. > > > > Is this(performance regression to existing DataSet jobs) considered > as > > a > > > > necessary and accepted trade off in this FLIP? > > > > > > > > 2. The network memory depends on the input/output ExecutionEdge count > > and > > > > thus can be different even for parallel instances of the same > > JobVertex. > > > > Does this mean that when adding task resources to calculating the > slot > > > > resource for a shared group, the max possible network memory of the > > > vertex > > > > instance shall be used? > > > > This might result in larger resource required than actually needed. > > > > > > > > And some minor comments: > > > > 1. Regarding "fracManagedMemOnHeap = 1 / > > numOpsUseOnHeapManagedMemory", I > > > > guess you mean numOpsUseOnHeapManagedMemoryInTheSameSharedGroup ? > > > > 2. I think the *StreamGraphGenerator* in the #Slot Sharing section > and > > > > implementation step 4 should be *StreamingJobGraphGenerator*, as > > > > *StreamGraphGenerator* is not aware of JobGraph and pipelined region. > > > > > > > > > > > > Thanks, > > > > Zhu Zhu > > > > > > > > Xintong Song <tonysong...@gmail.com> 于2019年9月2日周一 上午11:59写道: > > > > > > > > > Updated the FLIP wiki page [1], with the following changes. > > > > > > > > > > - Remove the step of converting pipelined edges between > different > > > slot > > > > > sharing groups into blocking edges. > > > > > - Set `allSourcesInSamePipelinedRegion` to true by default. > > > > > > > > > > Thank you~ > > > > > > > > > > Xintong Song > > > > > > > > > > > > > > > > > > > > On Mon, Sep 2, 2019 at 11:50 AM Xintong Song < > tonysong...@gmail.com> > > > > > wrote: > > > > > > > > > > > Regarding changing edge type, I think actually we don't need to > do > > > this > > > > > > for batch jobs neither, because we don't have public interfaces > for > > > > users > > > > > > to explicitly set slot sharing groups in DataSet API and > SQL/Table > > > API. > > > > > We > > > > > > have such interfaces in DataStream API only. > > > > > > > > > > > > Thank you~ > > > > > > > > > > > > Xintong Song > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Aug 27, 2019 at 10:16 PM Xintong Song < > > tonysong...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > >> Thanks for the correction, Till. > > > > > >> > > > > > >> Regarding your comments: > > > > > >> - You are right, we should not change the edge type for > streaming > > > > jobs. > > > > > >> Then I think we can change the option > > > > 'allSourcesInSamePipelinedRegion' > > > > > in > > > > > >> step 2 to 'isStreamingJob', and implement the current step 2 > > before > > > > the > > > > > >> current step 1 so we can use this option to decide whether > should > > > > change > > > > > >> the edge type. What do you think? > > > > > >> - Agree. It should be easier to make the default value of > > > > > >> 'allSourcesInSamePipelinedRegion' (or 'isStreamingJob') 'true', > > and > > > > set > > > > > it > > > > > >> to 'false' when using DataSet API or blink planner. > > > > > >> > > > > > >> Thank you~ > > > > > >> > > > > > >> Xintong Song > > > > > >> > > > > > >> > > > > > >> > > > > > >> On Tue, Aug 27, 2019 at 8:59 PM Till Rohrmann < > > trohrm...@apache.org > > > > > > > > > >> wrote: > > > > > >> > > > > > >>> Thanks for creating the implementation plan Xintong. Overall, > the > > > > > >>> implementation plan looks good. I had a couple of comments: > > > > > >>> > > > > > >>> - What will happen if a user has defined a streaming job with > two > > > > slot > > > > > >>> sharing groups? Would the code insert a blocking data exchange > > > > between > > > > > >>> these two groups? If yes, then this breaks existing Flink > > streaming > > > > > jobs. > > > > > >>> - How do we detect unbounded streaming jobs to set > > > > > >>> the allSourcesInSamePipelinedRegion to `true`? Wouldn't it be > > > easier > > > > to > > > > > >>> set > > > > > >>> it false if we are using the DataSet API or the Blink planner > > with > > > a > > > > > >>> bounded job? > > > > > >>> > > > > > >>> Cheers, > > > > > >>> Till > > > > > >>> > > > > > >>> On Tue, Aug 27, 2019 at 2:16 PM Till Rohrmann < > > > trohrm...@apache.org> > > > > > >>> wrote: > > > > > >>> > > > > > >>> > I guess there is a typo since the link to the FLIP-53 is > > > > > >>> > > > > > > >>> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management > > > > > >>> > > > > > > >>> > Cheers, > > > > > >>> > Till > > > > > >>> > > > > > > >>> > On Tue, Aug 27, 2019 at 1:42 PM Xintong Song < > > > > tonysong...@gmail.com> > > > > > >>> > wrote: > > > > > >>> > > > > > > >>> >> Added implementation steps for this FLIP on the wiki page > [1]. > > > > > >>> >> > > > > > >>> >> > > > > > >>> >> Thank you~ > > > > > >>> >> > > > > > >>> >> Xintong Song > > > > > >>> >> > > > > > >>> >> > > > > > >>> >> [1] > > > > > >>> >> > > > > > >>> >> > > > > > >>> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors > > > > > >>> >> > > > > > >>> >> On Mon, Aug 19, 2019 at 10:29 PM Xintong Song < > > > > > tonysong...@gmail.com> > > > > > >>> >> wrote: > > > > > >>> >> > > > > > >>> >> > Hi everyone, > > > > > >>> >> > > > > > > >>> >> > As Till suggested, the original "FLIP-53: Fine Grained > > > Resource > > > > > >>> >> > Management" splits into two separate FLIPs, > > > > > >>> >> > > > > > > >>> >> > - FLIP-53: Fine Grained Operator Resource Management > [1] > > > > > >>> >> > - FLIP-56: Dynamic Slot Allocation [2] > > > > > >>> >> > > > > > > >>> >> > We'll continue using this discussion thread for FLIP-53. > For > > > > > >>> FLIP-56, I > > > > > >>> >> > just started a new discussion thread [3]. > > > > > >>> >> > > > > > > >>> >> > Thank you~ > > > > > >>> >> > > > > > > >>> >> > Xintong Song > > > > > >>> >> > > > > > > >>> >> > > > > > > >>> >> > [1] > > > > > >>> >> > > > > > > >>> >> > > > > > >>> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management > > > > > >>> >> > > > > > > >>> >> > [2] > > > > > >>> >> > > > > > > >>> >> > > > > > >>> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation > > > > > >>> >> > > > > > > >>> >> > [3] > > > > > >>> >> > > > > > > >>> >> > > > > > >>> > > > > > > > > > > > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-56-Dynamic-Slot-Allocation-td31960.html > > > > > >>> >> > > > > > > >>> >> > On Mon, Aug 19, 2019 at 2:55 PM Xintong Song < > > > > > tonysong...@gmail.com > > > > > >>> > > > > > > >>> >> > wrote: > > > > > >>> >> > > > > > > >>> >> >> Thinks for the comments, Yang. > > > > > >>> >> >> > > > > > >>> >> >> Regarding your questions: > > > > > >>> >> >> > > > > > >>> >> >> 1. How to calculate the resource specification of > > > > > TaskManagers? > > > > > >>> Do > > > > > >>> >> they > > > > > >>> >> >>> have them same resource spec calculated based on the > > > > > >>> >> configuration? I > > > > > >>> >> >>> think > > > > > >>> >> >>> we still have wasted resources in this situation. Or > we > > > > could > > > > > >>> start > > > > > >>> >> >>> TaskManagers with different spec. > > > > > >>> >> >>> > > > > > >>> >> >> I agree with you that we can further improve the resource > > > > utility > > > > > >>> by > > > > > >>> >> >> customizing task executors with different resource > > > > > specifications. > > > > > >>> >> However, > > > > > >>> >> >> I'm in favor of limiting the scope of this FLIP and leave > > it > > > > as a > > > > > >>> >> future > > > > > >>> >> >> optimization. The plan for that part is to move the logic > > of > > > > > >>> deciding > > > > > >>> >> task > > > > > >>> >> >> executor specifications into the slot manager and make > slot > > > > > manager > > > > > >>> >> >> pluggable, so inside the slot manager plugin we can have > > > > > different > > > > > >>> >> logics > > > > > >>> >> >> for deciding the task executor specifications. > > > > > >>> >> >> > > > > > >>> >> >> > > > > > >>> >> >>> 2. If a slot is released and returned to SlotPool, > does > > > it > > > > > >>> could be > > > > > >>> >> >>> reused by other SlotRequest that the request resource > > is > > > > > >>> smaller > > > > > >>> >> than > > > > > >>> >> >>> it? > > > > > >>> >> >>> > > > > > >>> >> >> No, I think slot pool should always return slots if they > do > > > not > > > > > >>> exactly > > > > > >>> >> >> match the pending requests, so that resource manager can > > deal > > > > > with > > > > > >>> the > > > > > >>> >> >> extra resources. > > > > > >>> >> >> > > > > > >>> >> >>> - If it is yes, what happens to the available > > resource > > > > in > > > > > >>> the > > > > > >>> >> >> > > > > > >>> >> >> TaskManager. > > > > > >>> >> >>> - What is the SlotStatus of the cached slot in > > > SlotPool? > > > > > The > > > > > >>> >> >>> AllocationId is null? > > > > > >>> >> >>> > > > > > >>> >> >> The allocation id does not change as long as the slot is > > not > > > > > >>> returned > > > > > >>> >> >> from the job master, no matter its occupied or available > in > > > the > > > > > >>> slot > > > > > >>> >> pool. > > > > > >>> >> >> I think we have the same behavior currently. No matter > how > > > many > > > > > >>> tasks > > > > > >>> >> the > > > > > >>> >> >> job master deploy into the slot, concurrently or > > > sequentially, > > > > it > > > > > >>> is > > > > > >>> >> one > > > > > >>> >> >> allocation from the cluster to the job until the slot is > > > freed > > > > > from > > > > > >>> >> the job > > > > > >>> >> >> master. > > > > > >>> >> >> > > > > > >>> >> >>> 3. In a session cluster, some jobs are configured > with > > > > > operator > > > > > >>> >> >>> resources, meanwhile other jobs are using UNKNOWN. > How > > to > > > > > deal > > > > > >>> with > > > > > >>> >> >>> this > > > > > >>> >> >>> situation? > > > > > >>> >> >> > > > > > >>> >> >> As long as we do not mix unknown / specified resource > > > profiles > > > > > >>> within > > > > > >>> >> the > > > > > >>> >> >> same job / slot, there shouldn't be a problem. Resource > > > manager > > > > > >>> >> converts > > > > > >>> >> >> unknown resource profiles in slot requests to specified > > > default > > > > > >>> >> resource > > > > > >>> >> >> profiles, so they can be dynamically allocated from task > > > > > executors' > > > > > >>> >> >> available resources just as other slot requests with > > > specified > > > > > >>> resource > > > > > >>> >> >> profiles. > > > > > >>> >> >> > > > > > >>> >> >> Thank you~ > > > > > >>> >> >> > > > > > >>> >> >> Xintong Song > > > > > >>> >> >> > > > > > >>> >> >> > > > > > >>> >> >> > > > > > >>> >> >> On Mon, Aug 19, 2019 at 11:39 AM Yang Wang < > > > > > danrtsey...@gmail.com> > > > > > >>> >> wrote: > > > > > >>> >> >> > > > > > >>> >> >>> Hi Xintong, > > > > > >>> >> >>> > > > > > >>> >> >>> > > > > > >>> >> >>> Thanks for your detailed proposal. I think many users > are > > > > > >>> suffering > > > > > >>> >> from > > > > > >>> >> >>> waste of resources. The resource spec of all task > managers > > > are > > > > > >>> same > > > > > >>> >> and > > > > > >>> >> >>> we > > > > > >>> >> >>> have to increase all task managers to make the heavy one > > > more > > > > > >>> stable. > > > > > >>> >> So > > > > > >>> >> >>> we > > > > > >>> >> >>> will benefit from the fine grained resource management a > > > lot. > > > > We > > > > > >>> could > > > > > >>> >> >>> get > > > > > >>> >> >>> better resource utilization and stability. > > > > > >>> >> >>> > > > > > >>> >> >>> > > > > > >>> >> >>> Just to share some thoughts. > > > > > >>> >> >>> > > > > > >>> >> >>> > > > > > >>> >> >>> > > > > > >>> >> >>> 1. How to calculate the resource specification of > > > > > >>> TaskManagers? Do > > > > > >>> >> >>> they > > > > > >>> >> >>> have them same resource spec calculated based on the > > > > > >>> >> configuration? I > > > > > >>> >> >>> think > > > > > >>> >> >>> we still have wasted resources in this situation. Or > we > > > > could > > > > > >>> start > > > > > >>> >> >>> TaskManagers with different spec. > > > > > >>> >> >>> 2. If a slot is released and returned to SlotPool, > does > > > it > > > > > >>> could be > > > > > >>> >> >>> reused by other SlotRequest that the request resource > > is > > > > > >>> smaller > > > > > >>> >> than > > > > > >>> >> >>> it? > > > > > >>> >> >>> - If it is yes, what happens to the available > > resource > > > > in > > > > > >>> the > > > > > >>> >> >>> TaskManager. > > > > > >>> >> >>> - What is the SlotStatus of the cached slot in > > > SlotPool? > > > > > The > > > > > >>> >> >>> AllocationId is null? > > > > > >>> >> >>> 3. In a session cluster, some jobs are configured > with > > > > > operator > > > > > >>> >> >>> resources, meanwhile other jobs are using UNKNOWN. > How > > to > > > > > deal > > > > > >>> with > > > > > >>> >> >>> this > > > > > >>> >> >>> situation? > > > > > >>> >> >>> > > > > > >>> >> >>> > > > > > >>> >> >>> > > > > > >>> >> >>> Best, > > > > > >>> >> >>> Yang > > > > > >>> >> >>> > > > > > >>> >> >>> Xintong Song <tonysong...@gmail.com> 于2019年8月16日周五 > > > 下午8:57写道: > > > > > >>> >> >>> > > > > > >>> >> >>> > Thanks for the feedbacks, Yangze and Till. > > > > > >>> >> >>> > > > > > > >>> >> >>> > Yangze, > > > > > >>> >> >>> > > > > > > >>> >> >>> > I agree with you that we should make scheduling > strategy > > > > > >>> pluggable > > > > > >>> >> and > > > > > >>> >> >>> > optimize the strategy to reduce the memory > fragmentation > > > > > >>> problem, > > > > > >>> >> and > > > > > >>> >> >>> > thanks for the inputs on the potential algorithmic > > > > solutions. > > > > > >>> >> However, > > > > > >>> >> >>> I'm > > > > > >>> >> >>> > in favor of keep this FLIP focusing on the overall > > > mechanism > > > > > >>> design > > > > > >>> >> >>> rather > > > > > >>> >> >>> > than strategies. Solving the fragmentation issue > should > > be > > > > > >>> >> considered > > > > > >>> >> >>> as an > > > > > >>> >> >>> > optimization, and I agree with Till that we probably > > > should > > > > > >>> tackle > > > > > >>> >> this > > > > > >>> >> >>> > afterwards. > > > > > >>> >> >>> > > > > > > >>> >> >>> > Till, > > > > > >>> >> >>> > > > > > > >>> >> >>> > - Regarding splitting the FLIP, I think it makes > sense. > > > The > > > > > >>> operator > > > > > >>> >> >>> > resource management and dynamic slot allocation do not > > > have > > > > > much > > > > > >>> >> >>> dependency > > > > > >>> >> >>> > on each other. > > > > > >>> >> >>> > > > > > > >>> >> >>> > - Regarding the default slot size, I think this is > > similar > > > > to > > > > > >>> >> FLIP-49 > > > > > >>> >> >>> [1] > > > > > >>> >> >>> > where we want all the deriving happens at one place. I > > > think > > > > > it > > > > > >>> >> would > > > > > >>> >> >>> be > > > > > >>> >> >>> > nice to pass the default slot size into the task > > executor > > > in > > > > > the > > > > > >>> >> same > > > > > >>> >> >>> way > > > > > >>> >> >>> > that we pass in the memory pool sizes in FLIP-49 [1]. > > > > > >>> >> >>> > > > > > > >>> >> >>> > - Regarding the return value of > > > > > >>> >> TaskExecutorGateway#requestResource, I > > > > > >>> >> >>> > think you're right. We should avoid using null as the > > > return > > > > > >>> value. > > > > > >>> >> I > > > > > >>> >> >>> think > > > > > >>> >> >>> > we probably should thrown an exception here. > > > > > >>> >> >>> > > > > > > >>> >> >>> > Thank you~ > > > > > >>> >> >>> > > > > > > >>> >> >>> > Xintong Song > > > > > >>> >> >>> > > > > > > >>> >> >>> > > > > > > >>> >> >>> > [1] > > > > > >>> >> >>> > > > > > > >>> >> >>> > > > > > > >>> >> >>> > > > > > >>> >> > > > > > >>> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors > > > > > >>> >> >>> > > > > > > >>> >> >>> > On Fri, Aug 16, 2019 at 2:18 PM Till Rohrmann < > > > > > >>> trohrm...@apache.org > > > > > >>> >> > > > > > > >>> >> >>> > wrote: > > > > > >>> >> >>> > > > > > > >>> >> >>> > > Hi Xintong, > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > thanks for drafting this FLIP. I think your proposal > > > helps > > > > > to > > > > > >>> >> >>> improve the > > > > > >>> >> >>> > > execution of batch jobs more efficiently. Moreover, > it > > > > > >>> enables the > > > > > >>> >> >>> proper > > > > > >>> >> >>> > > integration of the Blink planner which is very > > important > > > > as > > > > > >>> well. > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > Overall, the FLIP looks good to me. I was wondering > > > > whether > > > > > it > > > > > >>> >> >>> wouldn't > > > > > >>> >> >>> > > make sense to actually split it up into two FLIPs: > > > > Operator > > > > > >>> >> resource > > > > > >>> >> >>> > > management and dynamic slot allocation. I think > these > > > two > > > > > >>> FLIPs > > > > > >>> >> >>> could be > > > > > >>> >> >>> > > seen as orthogonal and it would decrease the scope > of > > > each > > > > > >>> >> individual > > > > > >>> >> >>> > FLIP. > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > Some smaller comments: > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > - I'm not sure whether we should pass in the default > > > slot > > > > > size > > > > > >>> >> via an > > > > > >>> >> >>> > > environment variable. Without having unified the way > > how > > > > > Flink > > > > > >>> >> >>> components > > > > > >>> >> >>> > > are configured [1], I think it would be better to > pass > > > it > > > > in > > > > > >>> as > > > > > >>> >> part > > > > > >>> >> >>> of > > > > > >>> >> >>> > the > > > > > >>> >> >>> > > configuration. > > > > > >>> >> >>> > > - I would avoid returning a null value from > > > > > >>> >> >>> > > TaskExecutorGateway#requestResource if it cannot be > > > > > fulfilled. > > > > > >>> >> >>> Either we > > > > > >>> >> >>> > > should introduce an explicit return value saying > this > > or > > > > > >>> throw an > > > > > >>> >> >>> > > exception. > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > Concerning Yangze's comments: I think you are right > > that > > > > it > > > > > >>> would > > > > > >>> >> be > > > > > >>> >> >>> > > helpful to make the selection strategy pluggable. > Also > > > > > >>> batching > > > > > >>> >> slot > > > > > >>> >> >>> > > requests to the RM could be a good optimization. For > > the > > > > > sake > > > > > >>> of > > > > > >>> >> >>> keeping > > > > > >>> >> >>> > > the scope of this FLIP smaller I would try to tackle > > > these > > > > > >>> things > > > > > >>> >> >>> after > > > > > >>> >> >>> > the > > > > > >>> >> >>> > > initial version has been completed (without spoiling > > > these > > > > > >>> >> >>> optimization > > > > > >>> >> >>> > > opportunities). In particular batching the slot > > requests > > > > > >>> depends > > > > > >>> >> on > > > > > >>> >> >>> the > > > > > >>> >> >>> > > current scheduler refactoring and could also be > > realized > > > > on > > > > > >>> the RM > > > > > >>> >> >>> side > > > > > >>> >> >>> > > only. > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > [1] > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > > > >>> >> >>> > > > > > >>> >> > > > > > >>> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > Cheers, > > > > > >>> >> >>> > > Till > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > On Fri, Aug 16, 2019 at 11:11 AM Yangze Guo < > > > > > >>> karma...@gmail.com> > > > > > >>> >> >>> wrote: > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > Hi, Xintong > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > Thanks to propose this FLIP. The general design > > looks > > > > good > > > > > >>> to > > > > > >>> >> me, > > > > > >>> >> >>> +1 > > > > > >>> >> >>> > > > for this feature. > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > Since slots in the same task executor could have > > > > different > > > > > >>> >> resource > > > > > >>> >> >>> > > > profile, we will > > > > > >>> >> >>> > > > meet resource fragment problem. Think about this > > case: > > > > > >>> >> >>> > > > - request A want 1G memory while request B & C > want > > > > 0.5G > > > > > >>> memory > > > > > >>> >> >>> > > > - There are two task executors T1 & T2 with 1G > and > > > 0.5G > > > > > >>> free > > > > > >>> >> >>> memory > > > > > >>> >> >>> > > > respectively > > > > > >>> >> >>> > > > If B come first and we cut a slot from T1 for B, A > > > must > > > > > >>> wait for > > > > > >>> >> >>> the > > > > > >>> >> >>> > > > free resource from > > > > > >>> >> >>> > > > other task. But A could have been scheduled > > > immediately > > > > if > > > > > >>> we > > > > > >>> >> cut a > > > > > >>> >> >>> > > > slot from T2 for B. > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > The logic of findMatchingSlot now become finding a > > > task > > > > > >>> executor > > > > > >>> >> >>> which > > > > > >>> >> >>> > > > has enough > > > > > >>> >> >>> > > > resource and then cut a slot from it. Current > method > > > > could > > > > > >>> be > > > > > >>> >> seen > > > > > >>> >> >>> as > > > > > >>> >> >>> > > > "First-fit strategy", > > > > > >>> >> >>> > > > which works well in general but sometimes could > not > > be > > > > the > > > > > >>> >> >>> optimization > > > > > >>> >> >>> > > > method. > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > Actually, this problem could be abstracted as "Bin > > > > Packing > > > > > >>> >> >>> Problem"[1]. > > > > > >>> >> >>> > > > Here are > > > > > >>> >> >>> > > > some common approximate algorithms: > > > > > >>> >> >>> > > > - First fit > > > > > >>> >> >>> > > > - Next fit > > > > > >>> >> >>> > > > - Best fit > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > But it become multi-dimensional bin packing > problem > > if > > > > we > > > > > >>> take > > > > > >>> >> CPU > > > > > >>> >> >>> > > > into account. It hard > > > > > >>> >> >>> > > > to define which one is best fit now. Some research > > > > > addressed > > > > > >>> >> this > > > > > >>> >> >>> > > > problem, such like Tetris[2]. > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > Here are some thinking about it: > > > > > >>> >> >>> > > > 1. We could make the strategy of finding matching > > task > > > > > >>> executor > > > > > >>> >> >>> > > > pluginable. Let user to config the > > > > > >>> >> >>> > > > best strategy in their scenario. > > > > > >>> >> >>> > > > 2. We could support batch request interface in RM, > > > > because > > > > > >>> we > > > > > >>> >> have > > > > > >>> >> >>> > > > opportunities to optimize > > > > > >>> >> >>> > > > if we have more information. If we know the A, B, > C > > at > > > > the > > > > > >>> same > > > > > >>> >> >>> time, > > > > > >>> >> >>> > > > we could always make the best decision. > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > [1] http://www.or.deis.unibo.it/kp/Chapter8.pdf > > > > > >>> >> >>> > > > [2] > > > > > >>> >> >>> > > > > > > >>> >> > > > > > > https://www.cs.cmu.edu/~xia/resources/Documents/grandl_sigcomm14.pdf > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > Best, > > > > > >>> >> >>> > > > Yangze Guo > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > On Thu, Aug 15, 2019 at 10:40 PM Xintong Song < > > > > > >>> >> >>> tonysong...@gmail.com> > > > > > >>> >> >>> > > > wrote: > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > > > Hi everyone, > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > > > We would like to start a discussion thread on > > > > "FLIP-53: > > > > > >>> Fine > > > > > >>> >> >>> Grained > > > > > >>> >> >>> > > > > Resource Management"[1], where we propose how to > > > > improve > > > > > >>> Flink > > > > > >>> >> >>> > resource > > > > > >>> >> >>> > > > > management and scheduling. > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > > > This FLIP mainly discusses the following issues. > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > > > - How to support tasks with fine grained > > resource > > > > > >>> >> >>> requirements. > > > > > >>> >> >>> > > > > - How to unify resource management for jobs > > with > > > / > > > > > >>> without > > > > > >>> >> >>> fine > > > > > >>> >> >>> > > > grained > > > > > >>> >> >>> > > > > resource requirements. > > > > > >>> >> >>> > > > > - How to unify resource management for > > streaming > > > / > > > > > >>> batch > > > > > >>> >> jobs. > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > > > Key changes proposed in the FLIP are as follows. > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > > > - Unify memory management for operators with > / > > > > > without > > > > > >>> fine > > > > > >>> >> >>> > grained > > > > > >>> >> >>> > > > > resource requirements by applying a fraction > > > based > > > > > >>> quota > > > > > >>> >> >>> > mechanism. > > > > > >>> >> >>> > > > > - Unify resource scheduling for streaming and > > > batch > > > > > >>> jobs by > > > > > >>> >> >>> > setting > > > > > >>> >> >>> > > > slot > > > > > >>> >> >>> > > > > sharing groups for pipelined regions during > > > > compiling > > > > > >>> >> stage. > > > > > >>> >> >>> > > > > - Dynamically allocate slots from task > > executors' > > > > > >>> available > > > > > >>> >> >>> > > resources. > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > > > Please find more details in the FLIP wiki > document > > > > [1]. > > > > > >>> >> Looking > > > > > >>> >> >>> > forward > > > > > >>> >> >>> > > > to > > > > > >>> >> >>> > > > > your feedbacks. > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > > > Thank you~ > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > > > Xintong Song > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > > > [1] > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > > > >>> >> >>> > > > > > >>> >> > > > > > >>> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Resource+Management > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > > > >>> >> >>> > > > > > >>> >> >> > > > > > >>> >> > > > > > >>> > > > > > > >>> > > > > > >> > > > > > > > > > > > > > > >