Thanks all for joining the discussion. It seems to me that there is a consensus on the current FLIP document. So if there is no objection, I would like to start the voting process for this FLIP.
Thank you~ Xintong Song On Wed, Sep 4, 2019 at 8:23 PM Andrey Zagrebin <and...@ververica.com> wrote: > Thanks for updating the FLIP Xintong. It looks good to me. I would be ok to > start a vote for it. > > Best, > Andrey > > On Wed, Sep 4, 2019 at 10:03 AM Xintong Song <tonysong...@gmail.com> > wrote: > > > @all > > > > The FLIP document [1] has been updated. > > > > Thank you~ > > > > Xintong Song > > > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management > > > > On Tue, Sep 3, 2019 at 7:20 PM Zhu Zhu <reed...@gmail.com> wrote: > > > > > Thanks Xintong for the explanation. > > > > > > For question #1, I think it's good as long as DataSet job behaviors > > remains > > > the same. > > > > > > For question #2, agreed that the resource difference is small enough(at > > > most 1 edge diff) in current supported point-wise execution edge > > connection > > > patterns. > > > > > > Thanks, > > > Zhu Zhu > > > > > > Xintong Song <tonysong...@gmail.com> 于2019年9月3日周二 下午6:58写道: > > > > > > > Thanks for the comments, Zhu & Kurt. > > > > > > > > Andrey and I also had some discussions offline, and I would like to > > first > > > > post a summary of our discussion: > > > > > > > > 1. The motivation of the fraction based approach is to unify > > resource > > > > management for both operators with specified and unknown resource > > > > requirements. > > > > 2. The fraction based approach proposed in this FLIP should only > > > affect > > > > streaming jobs (both bounded and unbounded). For DataSet jobs, > there > > > are > > > > already some fraction based approach (in TaskConfig and > > > ChainedDriver), > > > > and > > > > we do not make any change to the existing approach. > > > > 3. The scope of this FLIP does not include discussion of how to > set > > > > ResourceSpec for operators. > > > > 1. For blink jobs, the optimizer can set operator resources for > > the > > > > users, according to their configurations (default: unknown) > > > > 2. For DataStream jobs, there are no method / interface to set > > > > operator resources at the moment (1.10). We can have in the > > future. > > > > 3. For DataSet jobs, there are existing user interfaces to set > > > > operator resources. > > > > 4. The FLIP should explain more about how ResourceSpecs works > > > > 1. PhysicalTransformations (deployed with operators into the > > > > StreamTasks) get ResourceSpec: unknown by default or known > (e.g. > > > > from the > > > > Blink planner) > > > > 2. While generating stream graph, calculate fractions and set > to > > > > StreamConfig > > > > 3. While scheduling, convert ResourceSpec to ResourceProfile > > > > (ResourceSpec + network memory), and deploy to slots / TMs > > matching > > > > the > > > > resources > > > > 4. While starting Task in TM, each operator gets fraction > > converted > > > > back to the original absolute value requested by user or fair > > > > unknown share > > > > of the slot > > > > 5. We should not set `allSourcesInSamePipelinedRegion` to > `false` > > > for > > > > DataSet jobs. Behaviors of DataSet jobs should not be changed. > > > > 6. The FLIP document should differentiate works planed in this > FLIP > > > and > > > > the future follow-ups more clearly, by put the follow-ups in a > > > separate > > > > section > > > > 7. Another limitation of the rejected alternative setting > fractions > > at > > > > scheduling time is that, the scheduler implementation does not > know > > > > which > > > > tasks will be deployed into the same slot in advance. > > > > > > > > Andrey, Please bring it up if there is anything I missed. > > > > > > > > Zhu, regarding your comments: > > > > > > > > 1. If we do not set `allSourcesInSamePipelinedRegion` to `false` > for > > > > DataSet jobs (point 5 in the discussion summary above), then there > > > > shouldn't be any regression right? > > > > 2. I think it makes sense to set the max possible network memory > for > > > the > > > > JobVertex. When you say parallel instances of the same JobVertex > may > > > > have > > > > need different network memory, I guess you mean the rescale > > scenarios > > > > where > > > > parallelisms of upstream / downstream vertex cannot be exactly > > divided > > > > by > > > > parallelism of downstream / upstream vertex? I would say it's > > > > acceptable to > > > > have slight difference between actually needed and allocated > network > > > > memory. > > > > 3. Yes, by numOpsUseOnHeapManagedMemory I mean > > > > numOpsUseOnHeapManagedMemoryInTheSameSharedGroup. I'll update the > > doc. > > > > 4. Yes, it should be StreamingJobGraphGenerator. Thanks for the > > > > correction. > > > > > > > > > > > > Kurt, regarding your comments: > > > > > > > > 1. I think we don't have network memory in ResourceSpec, which is > > the > > > > user facing API. We only have network memory in ResourceProfile, > > which > > > > is > > > > used internally for scheduling. The reason we do not expose > network > > > > memory > > > > to the user is that, currently how many network buffers each task > > > needs > > > > is > > > > decided by the topology of execution graph (how many input / > output > > > > channels it has). > > > > 2. In the section "Operator Resource Requirements": "For the first > > > > version, we do not support mixing operators with specified / > unknown > > > > resource requirements in the same job. Either all or none of the > > > > operators > > > > of the same job should specify their resource requirements. > > > > StreamGraphGenerator should check this and throw an error when > > mixing > > > of > > > > specified / unknown resource requirements is detected, during the > > > > compilation stage." > > > > 3. If the user set a resource requirement, then it is guaranteed > > that > > > > the task should get at least the much resource, otherwise there > > should > > > > be > > > > an exception. That should be guaranteed by the "Dynamic Slot > > > Allocation" > > > > approach (FLIP-56). > > > > > > > > > > > > I'll update the FLIP document addressing the comments ASAP. > > > > > > > > > > > > Thank you~ > > > > > > > > Xintong Song > > > > > > > > > > > > > > > > On Tue, Sep 3, 2019 at 2:42 PM Kurt Young <ykt...@gmail.com> wrote: > > > > > > > > > Thanks Xingtong for driving this effort, I haven't finished the > whole > > > > > document yet, > > > > > but have couple of questions: > > > > > > > > > > 1. Regarding to network memory, the document said it will be > derived > > by > > > > > framework > > > > > automatically. I'm wondering whether we should delete this > dimension > > > from > > > > > user- > > > > > facing API? > > > > > > > > > > 2. Regarding to fraction based quota, I don't quite get the meaning > > of > > > > > "slotSharingGroupOnHeapManagedMem" and > > > > "slotSharingGroupOffHeapManagedMem". > > > > > What if the sharing group is mixed with specified resource and > > UNKNOWN > > > > > resource > > > > > requirements. > > > > > > > > > > 3 IIUC, even user had set resource requirements, lets say 500MB > > > off-heap > > > > > managed > > > > > memory, during execution the operator may or may not have 500MB > > > off-heap > > > > > managed > > > > > memory, right? > > > > > > > > > > Best, > > > > > Kurt > > > > > > > > > > > > > > > On Mon, Sep 2, 2019 at 8:36 PM Zhu Zhu <reed...@gmail.com> wrote: > > > > > > > > > > > Thanks Xintong for proposing this improvement. Fine grained > > resources > > > > can > > > > > > be very helpful when user has good planning on resources. > > > > > > > > > > > > I have a few questions: > > > > > > 1. Currently in a batch job, vertices from different regions can > > run > > > at > > > > > the > > > > > > same time in slots from the same shared group, as long as they do > > not > > > > > have > > > > > > data dependency on each other and available slot count is not > > smaller > > > > > than > > > > > > the *max* of parallelism of all tasks. > > > > > > With changes in this FLIP however, tasks from different regions > > > cannot > > > > > > share slots anymore. > > > > > > Once available slot count is smaller than the *sum* of all > > > parallelism > > > > of > > > > > > tasks from all regions, tasks may need to be executed > sequentially, > > > > which > > > > > > might result in a performance regression. > > > > > > Is this(performance regression to existing DataSet jobs) > considered > > > as > > > > a > > > > > > necessary and accepted trade off in this FLIP? > > > > > > > > > > > > 2. The network memory depends on the input/output ExecutionEdge > > count > > > > and > > > > > > thus can be different even for parallel instances of the same > > > > JobVertex. > > > > > > Does this mean that when adding task resources to calculating the > > > slot > > > > > > resource for a shared group, the max possible network memory of > the > > > > > vertex > > > > > > instance shall be used? > > > > > > This might result in larger resource required than actually > needed. > > > > > > > > > > > > And some minor comments: > > > > > > 1. Regarding "fracManagedMemOnHeap = 1 / > > > > numOpsUseOnHeapManagedMemory", I > > > > > > guess you mean numOpsUseOnHeapManagedMemoryInTheSameSharedGroup ? > > > > > > 2. I think the *StreamGraphGenerator* in the #Slot Sharing > section > > > and > > > > > > implementation step 4 should be *StreamingJobGraphGenerator*, as > > > > > > *StreamGraphGenerator* is not aware of JobGraph and pipelined > > region. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > Zhu Zhu > > > > > > > > > > > > Xintong Song <tonysong...@gmail.com> 于2019年9月2日周一 上午11:59写道: > > > > > > > > > > > > > Updated the FLIP wiki page [1], with the following changes. > > > > > > > > > > > > > > - Remove the step of converting pipelined edges between > > > different > > > > > slot > > > > > > > sharing groups into blocking edges. > > > > > > > - Set `allSourcesInSamePipelinedRegion` to true by default. > > > > > > > > > > > > > > Thank you~ > > > > > > > > > > > > > > Xintong Song > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Sep 2, 2019 at 11:50 AM Xintong Song < > > > tonysong...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > Regarding changing edge type, I think actually we don't need > to > > > do > > > > > this > > > > > > > > for batch jobs neither, because we don't have public > interfaces > > > for > > > > > > users > > > > > > > > to explicitly set slot sharing groups in DataSet API and > > > SQL/Table > > > > > API. > > > > > > > We > > > > > > > > have such interfaces in DataStream API only. > > > > > > > > > > > > > > > > Thank you~ > > > > > > > > > > > > > > > > Xintong Song > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Aug 27, 2019 at 10:16 PM Xintong Song < > > > > tonysong...@gmail.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > >> Thanks for the correction, Till. > > > > > > > >> > > > > > > > >> Regarding your comments: > > > > > > > >> - You are right, we should not change the edge type for > > > streaming > > > > > > jobs. > > > > > > > >> Then I think we can change the option > > > > > > 'allSourcesInSamePipelinedRegion' > > > > > > > in > > > > > > > >> step 2 to 'isStreamingJob', and implement the current step 2 > > > > before > > > > > > the > > > > > > > >> current step 1 so we can use this option to decide whether > > > should > > > > > > change > > > > > > > >> the edge type. What do you think? > > > > > > > >> - Agree. It should be easier to make the default value of > > > > > > > >> 'allSourcesInSamePipelinedRegion' (or 'isStreamingJob') > > 'true', > > > > and > > > > > > set > > > > > > > it > > > > > > > >> to 'false' when using DataSet API or blink planner. > > > > > > > >> > > > > > > > >> Thank you~ > > > > > > > >> > > > > > > > >> Xintong Song > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> On Tue, Aug 27, 2019 at 8:59 PM Till Rohrmann < > > > > trohrm...@apache.org > > > > > > > > > > > > > >> wrote: > > > > > > > >> > > > > > > > >>> Thanks for creating the implementation plan Xintong. > Overall, > > > the > > > > > > > >>> implementation plan looks good. I had a couple of comments: > > > > > > > >>> > > > > > > > >>> - What will happen if a user has defined a streaming job > with > > > two > > > > > > slot > > > > > > > >>> sharing groups? Would the code insert a blocking data > > exchange > > > > > > between > > > > > > > >>> these two groups? If yes, then this breaks existing Flink > > > > streaming > > > > > > > jobs. > > > > > > > >>> - How do we detect unbounded streaming jobs to set > > > > > > > >>> the allSourcesInSamePipelinedRegion to `true`? Wouldn't it > be > > > > > easier > > > > > > to > > > > > > > >>> set > > > > > > > >>> it false if we are using the DataSet API or the Blink > planner > > > > with > > > > > a > > > > > > > >>> bounded job? > > > > > > > >>> > > > > > > > >>> Cheers, > > > > > > > >>> Till > > > > > > > >>> > > > > > > > >>> On Tue, Aug 27, 2019 at 2:16 PM Till Rohrmann < > > > > > trohrm...@apache.org> > > > > > > > >>> wrote: > > > > > > > >>> > > > > > > > >>> > I guess there is a typo since the link to the FLIP-53 is > > > > > > > >>> > > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management > > > > > > > >>> > > > > > > > > >>> > Cheers, > > > > > > > >>> > Till > > > > > > > >>> > > > > > > > > >>> > On Tue, Aug 27, 2019 at 1:42 PM Xintong Song < > > > > > > tonysong...@gmail.com> > > > > > > > >>> > wrote: > > > > > > > >>> > > > > > > > > >>> >> Added implementation steps for this FLIP on the wiki > page > > > [1]. > > > > > > > >>> >> > > > > > > > >>> >> > > > > > > > >>> >> Thank you~ > > > > > > > >>> >> > > > > > > > >>> >> Xintong Song > > > > > > > >>> >> > > > > > > > >>> >> > > > > > > > >>> >> [1] > > > > > > > >>> >> > > > > > > > >>> >> > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors > > > > > > > >>> >> > > > > > > > >>> >> On Mon, Aug 19, 2019 at 10:29 PM Xintong Song < > > > > > > > tonysong...@gmail.com> > > > > > > > >>> >> wrote: > > > > > > > >>> >> > > > > > > > >>> >> > Hi everyone, > > > > > > > >>> >> > > > > > > > > >>> >> > As Till suggested, the original "FLIP-53: Fine Grained > > > > > Resource > > > > > > > >>> >> > Management" splits into two separate FLIPs, > > > > > > > >>> >> > > > > > > > > >>> >> > - FLIP-53: Fine Grained Operator Resource > Management > > > [1] > > > > > > > >>> >> > - FLIP-56: Dynamic Slot Allocation [2] > > > > > > > >>> >> > > > > > > > > >>> >> > We'll continue using this discussion thread for > FLIP-53. > > > For > > > > > > > >>> FLIP-56, I > > > > > > > >>> >> > just started a new discussion thread [3]. > > > > > > > >>> >> > > > > > > > > >>> >> > Thank you~ > > > > > > > >>> >> > > > > > > > > >>> >> > Xintong Song > > > > > > > >>> >> > > > > > > > > >>> >> > > > > > > > > >>> >> > [1] > > > > > > > >>> >> > > > > > > > > >>> >> > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management > > > > > > > >>> >> > > > > > > > > >>> >> > [2] > > > > > > > >>> >> > > > > > > > > >>> >> > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation > > > > > > > >>> >> > > > > > > > > >>> >> > [3] > > > > > > > >>> >> > > > > > > > > >>> >> > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-56-Dynamic-Slot-Allocation-td31960.html > > > > > > > >>> >> > > > > > > > > >>> >> > On Mon, Aug 19, 2019 at 2:55 PM Xintong Song < > > > > > > > tonysong...@gmail.com > > > > > > > >>> > > > > > > > > >>> >> > wrote: > > > > > > > >>> >> > > > > > > > > >>> >> >> Thinks for the comments, Yang. > > > > > > > >>> >> >> > > > > > > > >>> >> >> Regarding your questions: > > > > > > > >>> >> >> > > > > > > > >>> >> >> 1. How to calculate the resource specification of > > > > > > > TaskManagers? > > > > > > > >>> Do > > > > > > > >>> >> they > > > > > > > >>> >> >>> have them same resource spec calculated based on > > the > > > > > > > >>> >> configuration? I > > > > > > > >>> >> >>> think > > > > > > > >>> >> >>> we still have wasted resources in this situation. > > Or > > > we > > > > > > could > > > > > > > >>> start > > > > > > > >>> >> >>> TaskManagers with different spec. > > > > > > > >>> >> >>> > > > > > > > >>> >> >> I agree with you that we can further improve the > > resource > > > > > > utility > > > > > > > >>> by > > > > > > > >>> >> >> customizing task executors with different resource > > > > > > > specifications. > > > > > > > >>> >> However, > > > > > > > >>> >> >> I'm in favor of limiting the scope of this FLIP and > > leave > > > > it > > > > > > as a > > > > > > > >>> >> future > > > > > > > >>> >> >> optimization. The plan for that part is to move the > > logic > > > > of > > > > > > > >>> deciding > > > > > > > >>> >> task > > > > > > > >>> >> >> executor specifications into the slot manager and > make > > > slot > > > > > > > manager > > > > > > > >>> >> >> pluggable, so inside the slot manager plugin we can > > have > > > > > > > different > > > > > > > >>> >> logics > > > > > > > >>> >> >> for deciding the task executor specifications. > > > > > > > >>> >> >> > > > > > > > >>> >> >> > > > > > > > >>> >> >>> 2. If a slot is released and returned to > SlotPool, > > > does > > > > > it > > > > > > > >>> could be > > > > > > > >>> >> >>> reused by other SlotRequest that the request > > resource > > > > is > > > > > > > >>> smaller > > > > > > > >>> >> than > > > > > > > >>> >> >>> it? > > > > > > > >>> >> >>> > > > > > > > >>> >> >> No, I think slot pool should always return slots if > > they > > > do > > > > > not > > > > > > > >>> exactly > > > > > > > >>> >> >> match the pending requests, so that resource manager > > can > > > > deal > > > > > > > with > > > > > > > >>> the > > > > > > > >>> >> >> extra resources. > > > > > > > >>> >> >> > > > > > > > >>> >> >>> - If it is yes, what happens to the available > > > > resource > > > > > > in > > > > > > > >>> the > > > > > > > >>> >> >> > > > > > > > >>> >> >> TaskManager. > > > > > > > >>> >> >>> - What is the SlotStatus of the cached slot in > > > > > SlotPool? > > > > > > > The > > > > > > > >>> >> >>> AllocationId is null? > > > > > > > >>> >> >>> > > > > > > > >>> >> >> The allocation id does not change as long as the slot > > is > > > > not > > > > > > > >>> returned > > > > > > > >>> >> >> from the job master, no matter its occupied or > > available > > > in > > > > > the > > > > > > > >>> slot > > > > > > > >>> >> pool. > > > > > > > >>> >> >> I think we have the same behavior currently. No > matter > > > how > > > > > many > > > > > > > >>> tasks > > > > > > > >>> >> the > > > > > > > >>> >> >> job master deploy into the slot, concurrently or > > > > > sequentially, > > > > > > it > > > > > > > >>> is > > > > > > > >>> >> one > > > > > > > >>> >> >> allocation from the cluster to the job until the slot > > is > > > > > freed > > > > > > > from > > > > > > > >>> >> the job > > > > > > > >>> >> >> master. > > > > > > > >>> >> >> > > > > > > > >>> >> >>> 3. In a session cluster, some jobs are configured > > > with > > > > > > > operator > > > > > > > >>> >> >>> resources, meanwhile other jobs are using > UNKNOWN. > > > How > > > > to > > > > > > > deal > > > > > > > >>> with > > > > > > > >>> >> >>> this > > > > > > > >>> >> >>> situation? > > > > > > > >>> >> >> > > > > > > > >>> >> >> As long as we do not mix unknown / specified resource > > > > > profiles > > > > > > > >>> within > > > > > > > >>> >> the > > > > > > > >>> >> >> same job / slot, there shouldn't be a problem. > Resource > > > > > manager > > > > > > > >>> >> converts > > > > > > > >>> >> >> unknown resource profiles in slot requests to > specified > > > > > default > > > > > > > >>> >> resource > > > > > > > >>> >> >> profiles, so they can be dynamically allocated from > > task > > > > > > > executors' > > > > > > > >>> >> >> available resources just as other slot requests with > > > > > specified > > > > > > > >>> resource > > > > > > > >>> >> >> profiles. > > > > > > > >>> >> >> > > > > > > > >>> >> >> Thank you~ > > > > > > > >>> >> >> > > > > > > > >>> >> >> Xintong Song > > > > > > > >>> >> >> > > > > > > > >>> >> >> > > > > > > > >>> >> >> > > > > > > > >>> >> >> On Mon, Aug 19, 2019 at 11:39 AM Yang Wang < > > > > > > > danrtsey...@gmail.com> > > > > > > > >>> >> wrote: > > > > > > > >>> >> >> > > > > > > > >>> >> >>> Hi Xintong, > > > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > > > > >>> >> >>> Thanks for your detailed proposal. I think many > users > > > are > > > > > > > >>> suffering > > > > > > > >>> >> from > > > > > > > >>> >> >>> waste of resources. The resource spec of all task > > > managers > > > > > are > > > > > > > >>> same > > > > > > > >>> >> and > > > > > > > >>> >> >>> we > > > > > > > >>> >> >>> have to increase all task managers to make the heavy > > one > > > > > more > > > > > > > >>> stable. > > > > > > > >>> >> So > > > > > > > >>> >> >>> we > > > > > > > >>> >> >>> will benefit from the fine grained resource > > management a > > > > > lot. > > > > > > We > > > > > > > >>> could > > > > > > > >>> >> >>> get > > > > > > > >>> >> >>> better resource utilization and stability. > > > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > > > > >>> >> >>> Just to share some thoughts. > > > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > > > > >>> >> >>> 1. How to calculate the resource specification of > > > > > > > >>> TaskManagers? Do > > > > > > > >>> >> >>> they > > > > > > > >>> >> >>> have them same resource spec calculated based on > > the > > > > > > > >>> >> configuration? I > > > > > > > >>> >> >>> think > > > > > > > >>> >> >>> we still have wasted resources in this situation. > > Or > > > we > > > > > > could > > > > > > > >>> start > > > > > > > >>> >> >>> TaskManagers with different spec. > > > > > > > >>> >> >>> 2. If a slot is released and returned to > SlotPool, > > > does > > > > > it > > > > > > > >>> could be > > > > > > > >>> >> >>> reused by other SlotRequest that the request > > resource > > > > is > > > > > > > >>> smaller > > > > > > > >>> >> than > > > > > > > >>> >> >>> it? > > > > > > > >>> >> >>> - If it is yes, what happens to the available > > > > resource > > > > > > in > > > > > > > >>> the > > > > > > > >>> >> >>> TaskManager. > > > > > > > >>> >> >>> - What is the SlotStatus of the cached slot in > > > > > SlotPool? > > > > > > > The > > > > > > > >>> >> >>> AllocationId is null? > > > > > > > >>> >> >>> 3. In a session cluster, some jobs are configured > > > with > > > > > > > operator > > > > > > > >>> >> >>> resources, meanwhile other jobs are using > UNKNOWN. > > > How > > > > to > > > > > > > deal > > > > > > > >>> with > > > > > > > >>> >> >>> this > > > > > > > >>> >> >>> situation? > > > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > > > > >>> >> >>> > > > > > > > >>> >> >>> Best, > > > > > > > >>> >> >>> Yang > > > > > > > >>> >> >>> > > > > > > > >>> >> >>> Xintong Song <tonysong...@gmail.com> 于2019年8月16日周五 > > > > > 下午8:57写道: > > > > > > > >>> >> >>> > > > > > > > >>> >> >>> > Thanks for the feedbacks, Yangze and Till. > > > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > Yangze, > > > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > I agree with you that we should make scheduling > > > strategy > > > > > > > >>> pluggable > > > > > > > >>> >> and > > > > > > > >>> >> >>> > optimize the strategy to reduce the memory > > > fragmentation > > > > > > > >>> problem, > > > > > > > >>> >> and > > > > > > > >>> >> >>> > thanks for the inputs on the potential algorithmic > > > > > > solutions. > > > > > > > >>> >> However, > > > > > > > >>> >> >>> I'm > > > > > > > >>> >> >>> > in favor of keep this FLIP focusing on the overall > > > > > mechanism > > > > > > > >>> design > > > > > > > >>> >> >>> rather > > > > > > > >>> >> >>> > than strategies. Solving the fragmentation issue > > > should > > > > be > > > > > > > >>> >> considered > > > > > > > >>> >> >>> as an > > > > > > > >>> >> >>> > optimization, and I agree with Till that we > probably > > > > > should > > > > > > > >>> tackle > > > > > > > >>> >> this > > > > > > > >>> >> >>> > afterwards. > > > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > Till, > > > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > - Regarding splitting the FLIP, I think it makes > > > sense. > > > > > The > > > > > > > >>> operator > > > > > > > >>> >> >>> > resource management and dynamic slot allocation do > > not > > > > > have > > > > > > > much > > > > > > > >>> >> >>> dependency > > > > > > > >>> >> >>> > on each other. > > > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > - Regarding the default slot size, I think this is > > > > similar > > > > > > to > > > > > > > >>> >> FLIP-49 > > > > > > > >>> >> >>> [1] > > > > > > > >>> >> >>> > where we want all the deriving happens at one > > place. I > > > > > think > > > > > > > it > > > > > > > >>> >> would > > > > > > > >>> >> >>> be > > > > > > > >>> >> >>> > nice to pass the default slot size into the task > > > > executor > > > > > in > > > > > > > the > > > > > > > >>> >> same > > > > > > > >>> >> >>> way > > > > > > > >>> >> >>> > that we pass in the memory pool sizes in FLIP-49 > > [1]. > > > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > - Regarding the return value of > > > > > > > >>> >> TaskExecutorGateway#requestResource, I > > > > > > > >>> >> >>> > think you're right. We should avoid using null as > > the > > > > > return > > > > > > > >>> value. > > > > > > > >>> >> I > > > > > > > >>> >> >>> think > > > > > > > >>> >> >>> > we probably should thrown an exception here. > > > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > Thank you~ > > > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > Xintong Song > > > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > [1] > > > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > > > > >>> >> > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors > > > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > On Fri, Aug 16, 2019 at 2:18 PM Till Rohrmann < > > > > > > > >>> trohrm...@apache.org > > > > > > > >>> >> > > > > > > > > >>> >> >>> > wrote: > > > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > Hi Xintong, > > > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > thanks for drafting this FLIP. I think your > > proposal > > > > > helps > > > > > > > to > > > > > > > >>> >> >>> improve the > > > > > > > >>> >> >>> > > execution of batch jobs more efficiently. > > Moreover, > > > it > > > > > > > >>> enables the > > > > > > > >>> >> >>> proper > > > > > > > >>> >> >>> > > integration of the Blink planner which is very > > > > important > > > > > > as > > > > > > > >>> well. > > > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > Overall, the FLIP looks good to me. I was > > wondering > > > > > > whether > > > > > > > it > > > > > > > >>> >> >>> wouldn't > > > > > > > >>> >> >>> > > make sense to actually split it up into two > FLIPs: > > > > > > Operator > > > > > > > >>> >> resource > > > > > > > >>> >> >>> > > management and dynamic slot allocation. I think > > > these > > > > > two > > > > > > > >>> FLIPs > > > > > > > >>> >> >>> could be > > > > > > > >>> >> >>> > > seen as orthogonal and it would decrease the > scope > > > of > > > > > each > > > > > > > >>> >> individual > > > > > > > >>> >> >>> > FLIP. > > > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > Some smaller comments: > > > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > - I'm not sure whether we should pass in the > > default > > > > > slot > > > > > > > size > > > > > > > >>> >> via an > > > > > > > >>> >> >>> > > environment variable. Without having unified the > > way > > > > how > > > > > > > Flink > > > > > > > >>> >> >>> components > > > > > > > >>> >> >>> > > are configured [1], I think it would be better > to > > > pass > > > > > it > > > > > > in > > > > > > > >>> as > > > > > > > >>> >> part > > > > > > > >>> >> >>> of > > > > > > > >>> >> >>> > the > > > > > > > >>> >> >>> > > configuration. > > > > > > > >>> >> >>> > > - I would avoid returning a null value from > > > > > > > >>> >> >>> > > TaskExecutorGateway#requestResource if it cannot > > be > > > > > > > fulfilled. > > > > > > > >>> >> >>> Either we > > > > > > > >>> >> >>> > > should introduce an explicit return value saying > > > this > > > > or > > > > > > > >>> throw an > > > > > > > >>> >> >>> > > exception. > > > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > Concerning Yangze's comments: I think you are > > right > > > > that > > > > > > it > > > > > > > >>> would > > > > > > > >>> >> be > > > > > > > >>> >> >>> > > helpful to make the selection strategy > pluggable. > > > Also > > > > > > > >>> batching > > > > > > > >>> >> slot > > > > > > > >>> >> >>> > > requests to the RM could be a good optimization. > > For > > > > the > > > > > > > sake > > > > > > > >>> of > > > > > > > >>> >> >>> keeping > > > > > > > >>> >> >>> > > the scope of this FLIP smaller I would try to > > tackle > > > > > these > > > > > > > >>> things > > > > > > > >>> >> >>> after > > > > > > > >>> >> >>> > the > > > > > > > >>> >> >>> > > initial version has been completed (without > > spoiling > > > > > these > > > > > > > >>> >> >>> optimization > > > > > > > >>> >> >>> > > opportunities). In particular batching the slot > > > > requests > > > > > > > >>> depends > > > > > > > >>> >> on > > > > > > > >>> >> >>> the > > > > > > > >>> >> >>> > > current scheduler refactoring and could also be > > > > realized > > > > > > on > > > > > > > >>> the RM > > > > > > > >>> >> >>> side > > > > > > > >>> >> >>> > > only. > > > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > [1] > > > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > > > > >>> >> > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration > > > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > Cheers, > > > > > > > >>> >> >>> > > Till > > > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > On Fri, Aug 16, 2019 at 11:11 AM Yangze Guo < > > > > > > > >>> karma...@gmail.com> > > > > > > > >>> >> >>> wrote: > > > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > > Hi, Xintong > > > > > > > >>> >> >>> > > > > > > > > > > >>> >> >>> > > > Thanks to propose this FLIP. The general > design > > > > looks > > > > > > good > > > > > > > >>> to > > > > > > > >>> >> me, > > > > > > > >>> >> >>> +1 > > > > > > > >>> >> >>> > > > for this feature. > > > > > > > >>> >> >>> > > > > > > > > > > >>> >> >>> > > > Since slots in the same task executor could > have > > > > > > different > > > > > > > >>> >> resource > > > > > > > >>> >> >>> > > > profile, we will > > > > > > > >>> >> >>> > > > meet resource fragment problem. Think about > this > > > > case: > > > > > > > >>> >> >>> > > > - request A want 1G memory while request B & > C > > > want > > > > > > 0.5G > > > > > > > >>> memory > > > > > > > >>> >> >>> > > > - There are two task executors T1 & T2 with > 1G > > > and > > > > > 0.5G > > > > > > > >>> free > > > > > > > >>> >> >>> memory > > > > > > > >>> >> >>> > > > respectively > > > > > > > >>> >> >>> > > > If B come first and we cut a slot from T1 for > > B, A > > > > > must > > > > > > > >>> wait for > > > > > > > >>> >> >>> the > > > > > > > >>> >> >>> > > > free resource from > > > > > > > >>> >> >>> > > > other task. But A could have been scheduled > > > > > immediately > > > > > > if > > > > > > > >>> we > > > > > > > >>> >> cut a > > > > > > > >>> >> >>> > > > slot from T2 for B. > > > > > > > >>> >> >>> > > > > > > > > > > >>> >> >>> > > > The logic of findMatchingSlot now become > > finding a > > > > > task > > > > > > > >>> executor > > > > > > > >>> >> >>> which > > > > > > > >>> >> >>> > > > has enough > > > > > > > >>> >> >>> > > > resource and then cut a slot from it. Current > > > method > > > > > > could > > > > > > > >>> be > > > > > > > >>> >> seen > > > > > > > >>> >> >>> as > > > > > > > >>> >> >>> > > > "First-fit strategy", > > > > > > > >>> >> >>> > > > which works well in general but sometimes > could > > > not > > > > be > > > > > > the > > > > > > > >>> >> >>> optimization > > > > > > > >>> >> >>> > > > method. > > > > > > > >>> >> >>> > > > > > > > > > > >>> >> >>> > > > Actually, this problem could be abstracted as > > "Bin > > > > > > Packing > > > > > > > >>> >> >>> Problem"[1]. > > > > > > > >>> >> >>> > > > Here are > > > > > > > >>> >> >>> > > > some common approximate algorithms: > > > > > > > >>> >> >>> > > > - First fit > > > > > > > >>> >> >>> > > > - Next fit > > > > > > > >>> >> >>> > > > - Best fit > > > > > > > >>> >> >>> > > > > > > > > > > >>> >> >>> > > > But it become multi-dimensional bin packing > > > problem > > > > if > > > > > > we > > > > > > > >>> take > > > > > > > >>> >> CPU > > > > > > > >>> >> >>> > > > into account. It hard > > > > > > > >>> >> >>> > > > to define which one is best fit now. Some > > research > > > > > > > addressed > > > > > > > >>> >> this > > > > > > > >>> >> >>> > > > problem, such like Tetris[2]. > > > > > > > >>> >> >>> > > > > > > > > > > >>> >> >>> > > > Here are some thinking about it: > > > > > > > >>> >> >>> > > > 1. We could make the strategy of finding > > matching > > > > task > > > > > > > >>> executor > > > > > > > >>> >> >>> > > > pluginable. Let user to config the > > > > > > > >>> >> >>> > > > best strategy in their scenario. > > > > > > > >>> >> >>> > > > 2. We could support batch request interface in > > RM, > > > > > > because > > > > > > > >>> we > > > > > > > >>> >> have > > > > > > > >>> >> >>> > > > opportunities to optimize > > > > > > > >>> >> >>> > > > if we have more information. If we know the A, > > B, > > > C > > > > at > > > > > > the > > > > > > > >>> same > > > > > > > >>> >> >>> time, > > > > > > > >>> >> >>> > > > we could always make the best decision. > > > > > > > >>> >> >>> > > > > > > > > > > >>> >> >>> > > > [1] > http://www.or.deis.unibo.it/kp/Chapter8.pdf > > > > > > > >>> >> >>> > > > [2] > > > > > > > >>> >> >>> > > > > > > > > >>> >> > > > > > > > > > > https://www.cs.cmu.edu/~xia/resources/Documents/grandl_sigcomm14.pdf > > > > > > > >>> >> >>> > > > > > > > > > > >>> >> >>> > > > Best, > > > > > > > >>> >> >>> > > > Yangze Guo > > > > > > > >>> >> >>> > > > > > > > > > > >>> >> >>> > > > On Thu, Aug 15, 2019 at 10:40 PM Xintong Song > < > > > > > > > >>> >> >>> tonysong...@gmail.com> > > > > > > > >>> >> >>> > > > wrote: > > > > > > > >>> >> >>> > > > > > > > > > > > >>> >> >>> > > > > Hi everyone, > > > > > > > >>> >> >>> > > > > > > > > > > > >>> >> >>> > > > > We would like to start a discussion thread > on > > > > > > "FLIP-53: > > > > > > > >>> Fine > > > > > > > >>> >> >>> Grained > > > > > > > >>> >> >>> > > > > Resource Management"[1], where we propose > how > > to > > > > > > improve > > > > > > > >>> Flink > > > > > > > >>> >> >>> > resource > > > > > > > >>> >> >>> > > > > management and scheduling. > > > > > > > >>> >> >>> > > > > > > > > > > > >>> >> >>> > > > > This FLIP mainly discusses the following > > issues. > > > > > > > >>> >> >>> > > > > > > > > > > > >>> >> >>> > > > > - How to support tasks with fine grained > > > > resource > > > > > > > >>> >> >>> requirements. > > > > > > > >>> >> >>> > > > > - How to unify resource management for > jobs > > > > with > > > > > / > > > > > > > >>> without > > > > > > > >>> >> >>> fine > > > > > > > >>> >> >>> > > > grained > > > > > > > >>> >> >>> > > > > resource requirements. > > > > > > > >>> >> >>> > > > > - How to unify resource management for > > > > streaming > > > > > / > > > > > > > >>> batch > > > > > > > >>> >> jobs. > > > > > > > >>> >> >>> > > > > > > > > > > > >>> >> >>> > > > > Key changes proposed in the FLIP are as > > follows. > > > > > > > >>> >> >>> > > > > > > > > > > > >>> >> >>> > > > > - Unify memory management for operators > > with > > > / > > > > > > > without > > > > > > > >>> fine > > > > > > > >>> >> >>> > grained > > > > > > > >>> >> >>> > > > > resource requirements by applying a > > fraction > > > > > based > > > > > > > >>> quota > > > > > > > >>> >> >>> > mechanism. > > > > > > > >>> >> >>> > > > > - Unify resource scheduling for streaming > > and > > > > > batch > > > > > > > >>> jobs by > > > > > > > >>> >> >>> > setting > > > > > > > >>> >> >>> > > > slot > > > > > > > >>> >> >>> > > > > sharing groups for pipelined regions > during > > > > > > compiling > > > > > > > >>> >> stage. > > > > > > > >>> >> >>> > > > > - Dynamically allocate slots from task > > > > executors' > > > > > > > >>> available > > > > > > > >>> >> >>> > > resources. > > > > > > > >>> >> >>> > > > > > > > > > > > >>> >> >>> > > > > Please find more details in the FLIP wiki > > > document > > > > > > [1]. > > > > > > > >>> >> Looking > > > > > > > >>> >> >>> > forward > > > > > > > >>> >> >>> > > > to > > > > > > > >>> >> >>> > > > > your feedbacks. > > > > > > > >>> >> >>> > > > > > > > > > > > >>> >> >>> > > > > Thank you~ > > > > > > > >>> >> >>> > > > > > > > > > > > >>> >> >>> > > > > Xintong Song > > > > > > > >>> >> >>> > > > > > > > > > > > >>> >> >>> > > > > > > > > > > > >>> >> >>> > > > > [1] > > > > > > > >>> >> >>> > > > > > > > > > > > >>> >> >>> > > > > > > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > > > > >>> >> > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Resource+Management > > > > > > > >>> >> >>> > > > > > > > > > > >>> >> >>> > > > > > > > > > >>> >> >>> > > > > > > > > >>> >> >>> > > > > > > > >>> >> >> > > > > > > > >>> >> > > > > > > > >>> > > > > > > > > >>> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >