Thanks for updating the FLIP Xintong. It looks good to me. I would be ok to
start a vote for it.

Best,
Andrey

On Wed, Sep 4, 2019 at 10:03 AM Xintong Song <tonysong...@gmail.com> wrote:

> @all
>
> The FLIP document [1] has been updated.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
>
> On Tue, Sep 3, 2019 at 7:20 PM Zhu Zhu <reed...@gmail.com> wrote:
>
> > Thanks Xintong for the explanation.
> >
> > For question #1, I think it's good as long as DataSet job behaviors
> remains
> > the same.
> >
> > For question #2, agreed that the resource difference is small enough(at
> > most 1 edge diff) in current supported point-wise execution edge
> connection
> > patterns.
> >
> > Thanks,
> > Zhu Zhu
> >
> > Xintong Song <tonysong...@gmail.com> 于2019年9月3日周二 下午6:58写道:
> >
> > >  Thanks for the comments, Zhu & Kurt.
> > >
> > > Andrey and I also had some discussions offline, and I would like to
> first
> > > post a summary of our discussion:
> > >
> > >    1. The motivation of the fraction based approach is to unify
> resource
> > >    management for both operators with specified and unknown resource
> > >    requirements.
> > >    2. The fraction based approach proposed in this FLIP should only
> > affect
> > >    streaming jobs (both bounded and unbounded). For DataSet jobs, there
> > are
> > >    already some fraction based approach (in TaskConfig and
> > ChainedDriver),
> > > and
> > >    we do not make any change to the existing approach.
> > >    3. The scope of this FLIP does not include discussion of how to set
> > >    ResourceSpec for operators.
> > >       1. For blink jobs, the optimizer can set operator resources for
> the
> > >       users, according to their configurations (default: unknown)
> > >       2. For DataStream jobs, there are no method / interface to set
> > >       operator resources at the moment (1.10). We can have in the
> future.
> > >       3. For DataSet jobs, there are existing user interfaces to set
> > >       operator resources.
> > >    4. The FLIP should explain more about how ResourceSpecs works
> > >       1. PhysicalTransformations (deployed with operators into the
> > >       StreamTasks) get ResourceSpec: unknown by default or known (e.g.
> > > from the
> > >       Blink planner)
> > >       2. While generating stream graph, calculate fractions and set to
> > >       StreamConfig
> > >       3. While scheduling, convert ResourceSpec to ResourceProfile
> > >       (ResourceSpec + network memory), and deploy to slots / TMs
> matching
> > > the
> > >       resources
> > >       4. While starting Task in TM, each operator gets fraction
> converted
> > >       back to the original absolute value requested by user or fair
> > > unknown share
> > >       of the slot
> > >       5. We should not set `allSourcesInSamePipelinedRegion` to `false`
> > for
> > >    DataSet jobs. Behaviors of DataSet jobs should not be changed.
> > >    6. The FLIP document should differentiate works planed in this FLIP
> > and
> > >    the future follow-ups more clearly, by put the follow-ups in a
> > separate
> > >    section
> > >    7. Another limitation of the rejected alternative setting fractions
> at
> > >    scheduling time is that, the scheduler implementation does not know
> > > which
> > >    tasks will be deployed into the same slot in advance.
> > >
> > > Andrey, Please bring it up if there is anything I missed.
> > >
> > > Zhu, regarding your comments:
> > >
> > >    1. If we do not set `allSourcesInSamePipelinedRegion` to `false` for
> > >    DataSet jobs (point 5 in the discussion summary above), then there
> > >    shouldn't be any regression right?
> > >    2. I think it makes sense to set the max possible network memory for
> > the
> > >    JobVertex. When you say parallel instances of the same JobVertex may
> > > have
> > >    need different network memory, I guess you mean the rescale
> scenarios
> > > where
> > >    parallelisms of upstream / downstream vertex cannot be exactly
> divided
> > > by
> > >    parallelism of downstream / upstream vertex? I would say it's
> > > acceptable to
> > >    have slight difference between actually needed and allocated network
> > > memory.
> > >    3. Yes, by numOpsUseOnHeapManagedMemory I mean
> > >    numOpsUseOnHeapManagedMemoryInTheSameSharedGroup. I'll update the
> doc.
> > >    4. Yes, it should be StreamingJobGraphGenerator. Thanks for the
> > >    correction.
> > >
> > >
> > > Kurt, regarding your comments:
> > >
> > >    1. I think we don't have network memory in ResourceSpec, which is
> the
> > >    user facing API. We only have network memory in ResourceProfile,
> which
> > > is
> > >    used internally for scheduling. The reason we do not expose network
> > > memory
> > >    to the user is that, currently how many network buffers each task
> > needs
> > > is
> > >    decided by the topology of execution graph (how many input / output
> > >    channels it has).
> > >    2. In the section "Operator Resource Requirements": "For the first
> > >    version, we do not support mixing operators with specified / unknown
> > >    resource requirements in the same job. Either all or none of the
> > > operators
> > >    of the same job should specify their resource requirements.
> > >    StreamGraphGenerator should check this and throw an error when
> mixing
> > of
> > >    specified / unknown resource requirements is detected, during the
> > >    compilation stage."
> > >    3. If the user set a resource requirement, then it is guaranteed
> that
> > >    the task should get at least the much resource, otherwise there
> should
> > > be
> > >    an exception. That should be guaranteed by the "Dynamic Slot
> > Allocation"
> > >    approach (FLIP-56).
> > >
> > >
> > > I'll update the FLIP document addressing the comments ASAP.
> > >
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Tue, Sep 3, 2019 at 2:42 PM Kurt Young <ykt...@gmail.com> wrote:
> > >
> > > > Thanks Xingtong for driving this effort, I haven't finished the whole
> > > > document yet,
> > > > but have couple of questions:
> > > >
> > > > 1. Regarding to network memory, the document said it will be derived
> by
> > > > framework
> > > > automatically. I'm wondering whether we should delete this dimension
> > from
> > > > user-
> > > > facing API?
> > > >
> > > > 2. Regarding to fraction based quota, I don't quite get the meaning
> of
> > > > "slotSharingGroupOnHeapManagedMem" and
> > > "slotSharingGroupOffHeapManagedMem".
> > > > What if the sharing group is mixed with specified resource and
> UNKNOWN
> > > > resource
> > > > requirements.
> > > >
> > > > 3 IIUC, even user had set resource requirements, lets say 500MB
> > off-heap
> > > > managed
> > > > memory, during execution the operator may or may not have 500MB
> > off-heap
> > > > managed
> > > > memory, right?
> > > >
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Mon, Sep 2, 2019 at 8:36 PM Zhu Zhu <reed...@gmail.com> wrote:
> > > >
> > > > > Thanks Xintong for proposing this improvement. Fine grained
> resources
> > > can
> > > > > be very helpful when user has good planning on resources.
> > > > >
> > > > > I have a few questions:
> > > > > 1. Currently in a batch job, vertices from different regions can
> run
> > at
> > > > the
> > > > > same time in slots from the same shared group, as long as they do
> not
> > > > have
> > > > > data dependency on each other and available slot count is not
> smaller
> > > > than
> > > > > the *max* of parallelism of all tasks.
> > > > > With changes in this FLIP however, tasks from different regions
> > cannot
> > > > > share slots anymore.
> > > > > Once available slot count is smaller than the *sum* of all
> > parallelism
> > > of
> > > > > tasks from all regions, tasks may need to be executed sequentially,
> > > which
> > > > > might result in a performance regression.
> > > > > Is this(performance regression to existing DataSet jobs) considered
> > as
> > > a
> > > > > necessary and accepted trade off in this FLIP?
> > > > >
> > > > > 2. The network memory depends on the input/output ExecutionEdge
> count
> > > and
> > > > > thus can be different even for parallel instances of the same
> > > JobVertex.
> > > > > Does this mean that when adding task resources to calculating the
> > slot
> > > > > resource for a shared group, the max possible network memory of the
> > > > vertex
> > > > > instance shall be used?
> > > > > This might result in larger resource required than actually needed.
> > > > >
> > > > > And some minor comments:
> > > > > 1. Regarding "fracManagedMemOnHeap = 1 /
> > > numOpsUseOnHeapManagedMemory", I
> > > > > guess you mean numOpsUseOnHeapManagedMemoryInTheSameSharedGroup ?
> > > > > 2. I think the *StreamGraphGenerator* in the #Slot Sharing section
> > and
> > > > > implementation step 4 should be *StreamingJobGraphGenerator*, as
> > > > > *StreamGraphGenerator* is not aware of JobGraph and pipelined
> region.
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Zhu Zhu
> > > > >
> > > > > Xintong Song <tonysong...@gmail.com> 于2019年9月2日周一 上午11:59写道:
> > > > >
> > > > > > Updated the FLIP wiki page [1], with the following changes.
> > > > > >
> > > > > >    - Remove the step of converting pipelined edges between
> > different
> > > > slot
> > > > > >    sharing groups into blocking edges.
> > > > > >    - Set `allSourcesInSamePipelinedRegion` to true by default.
> > > > > >
> > > > > > Thank you~
> > > > > >
> > > > > > Xintong Song
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Sep 2, 2019 at 11:50 AM Xintong Song <
> > tonysong...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Regarding changing edge type, I think actually we don't need to
> > do
> > > > this
> > > > > > > for batch jobs neither, because we don't have public interfaces
> > for
> > > > > users
> > > > > > > to explicitly set slot sharing groups in DataSet API and
> > SQL/Table
> > > > API.
> > > > > > We
> > > > > > > have such interfaces in DataStream API only.
> > > > > > >
> > > > > > > Thank you~
> > > > > > >
> > > > > > > Xintong Song
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Aug 27, 2019 at 10:16 PM Xintong Song <
> > > tonysong...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Thanks for the correction, Till.
> > > > > > >>
> > > > > > >> Regarding your comments:
> > > > > > >> - You are right, we should not change the edge type for
> > streaming
> > > > > jobs.
> > > > > > >> Then I think we can change the option
> > > > > 'allSourcesInSamePipelinedRegion'
> > > > > > in
> > > > > > >> step 2 to 'isStreamingJob', and implement the current step 2
> > > before
> > > > > the
> > > > > > >> current step 1 so we can use this option to decide whether
> > should
> > > > > change
> > > > > > >> the edge type. What do you think?
> > > > > > >> - Agree. It should be easier to make the default value of
> > > > > > >> 'allSourcesInSamePipelinedRegion' (or 'isStreamingJob')
> 'true',
> > > and
> > > > > set
> > > > > > it
> > > > > > >> to 'false' when using DataSet API or blink planner.
> > > > > > >>
> > > > > > >> Thank you~
> > > > > > >>
> > > > > > >> Xintong Song
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >> On Tue, Aug 27, 2019 at 8:59 PM Till Rohrmann <
> > > trohrm...@apache.org
> > > > >
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >>> Thanks for creating the implementation plan Xintong. Overall,
> > the
> > > > > > >>> implementation plan looks good. I had a couple of comments:
> > > > > > >>>
> > > > > > >>> - What will happen if a user has defined a streaming job with
> > two
> > > > > slot
> > > > > > >>> sharing groups? Would the code insert a blocking data
> exchange
> > > > > between
> > > > > > >>> these two groups? If yes, then this breaks existing Flink
> > > streaming
> > > > > > jobs.
> > > > > > >>> - How do we detect unbounded streaming jobs to set
> > > > > > >>> the allSourcesInSamePipelinedRegion to `true`? Wouldn't it be
> > > > easier
> > > > > to
> > > > > > >>> set
> > > > > > >>> it false if we are using the DataSet API or the Blink planner
> > > with
> > > > a
> > > > > > >>> bounded job?
> > > > > > >>>
> > > > > > >>> Cheers,
> > > > > > >>> Till
> > > > > > >>>
> > > > > > >>> On Tue, Aug 27, 2019 at 2:16 PM Till Rohrmann <
> > > > trohrm...@apache.org>
> > > > > > >>> wrote:
> > > > > > >>>
> > > > > > >>> > I guess there is a typo since the link to the FLIP-53 is
> > > > > > >>> >
> > > > > > >>>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
> > > > > > >>> >
> > > > > > >>> > Cheers,
> > > > > > >>> > Till
> > > > > > >>> >
> > > > > > >>> > On Tue, Aug 27, 2019 at 1:42 PM Xintong Song <
> > > > > tonysong...@gmail.com>
> > > > > > >>> > wrote:
> > > > > > >>> >
> > > > > > >>> >> Added implementation steps for this FLIP on the wiki page
> > [1].
> > > > > > >>> >>
> > > > > > >>> >>
> > > > > > >>> >> Thank you~
> > > > > > >>> >>
> > > > > > >>> >> Xintong Song
> > > > > > >>> >>
> > > > > > >>> >>
> > > > > > >>> >> [1]
> > > > > > >>> >>
> > > > > > >>> >>
> > > > > > >>>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
> > > > > > >>> >>
> > > > > > >>> >> On Mon, Aug 19, 2019 at 10:29 PM Xintong Song <
> > > > > > tonysong...@gmail.com>
> > > > > > >>> >> wrote:
> > > > > > >>> >>
> > > > > > >>> >> > Hi everyone,
> > > > > > >>> >> >
> > > > > > >>> >> > As Till suggested, the original "FLIP-53: Fine Grained
> > > > Resource
> > > > > > >>> >> > Management" splits into two separate FLIPs,
> > > > > > >>> >> >
> > > > > > >>> >> >    - FLIP-53: Fine Grained Operator Resource Management
> > [1]
> > > > > > >>> >> >    - FLIP-56: Dynamic Slot Allocation [2]
> > > > > > >>> >> >
> > > > > > >>> >> > We'll continue using this discussion thread for FLIP-53.
> > For
> > > > > > >>> FLIP-56, I
> > > > > > >>> >> > just started a new discussion thread [3].
> > > > > > >>> >> >
> > > > > > >>> >> > Thank you~
> > > > > > >>> >> >
> > > > > > >>> >> > Xintong Song
> > > > > > >>> >> >
> > > > > > >>> >> >
> > > > > > >>> >> > [1]
> > > > > > >>> >> >
> > > > > > >>> >>
> > > > > > >>>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
> > > > > > >>> >> >
> > > > > > >>> >> > [2]
> > > > > > >>> >> >
> > > > > > >>> >>
> > > > > > >>>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
> > > > > > >>> >> >
> > > > > > >>> >> > [3]
> > > > > > >>> >> >
> > > > > > >>> >>
> > > > > > >>>
> > > > > >
> > > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-56-Dynamic-Slot-Allocation-td31960.html
> > > > > > >>> >> >
> > > > > > >>> >> > On Mon, Aug 19, 2019 at 2:55 PM Xintong Song <
> > > > > > tonysong...@gmail.com
> > > > > > >>> >
> > > > > > >>> >> > wrote:
> > > > > > >>> >> >
> > > > > > >>> >> >> Thinks for the comments, Yang.
> > > > > > >>> >> >>
> > > > > > >>> >> >> Regarding your questions:
> > > > > > >>> >> >>
> > > > > > >>> >> >>    1. How to calculate the resource specification of
> > > > > > TaskManagers?
> > > > > > >>> Do
> > > > > > >>> >> they
> > > > > > >>> >> >>>    have them same resource spec calculated based on
> the
> > > > > > >>> >> configuration? I
> > > > > > >>> >> >>> think
> > > > > > >>> >> >>>    we still have wasted resources in this situation.
> Or
> > we
> > > > > could
> > > > > > >>> start
> > > > > > >>> >> >>>    TaskManagers with different spec.
> > > > > > >>> >> >>>
> > > > > > >>> >> >> I agree with you that we can further improve the
> resource
> > > > > utility
> > > > > > >>> by
> > > > > > >>> >> >> customizing task executors with different resource
> > > > > > specifications.
> > > > > > >>> >> However,
> > > > > > >>> >> >> I'm in favor of limiting the scope of this FLIP and
> leave
> > > it
> > > > > as a
> > > > > > >>> >> future
> > > > > > >>> >> >> optimization. The plan for that part is to move the
> logic
> > > of
> > > > > > >>> deciding
> > > > > > >>> >> task
> > > > > > >>> >> >> executor specifications into the slot manager and make
> > slot
> > > > > > manager
> > > > > > >>> >> >> pluggable, so inside the slot manager plugin we can
> have
> > > > > > different
> > > > > > >>> >> logics
> > > > > > >>> >> >> for deciding the task executor specifications.
> > > > > > >>> >> >>
> > > > > > >>> >> >>
> > > > > > >>> >> >>>    2. If a slot is released and returned to SlotPool,
> > does
> > > > it
> > > > > > >>> could be
> > > > > > >>> >> >>>    reused by other SlotRequest that the request
> resource
> > > is
> > > > > > >>> smaller
> > > > > > >>> >> than
> > > > > > >>> >> >>> it?
> > > > > > >>> >> >>>
> > > > > > >>> >> >> No, I think slot pool should always return slots if
> they
> > do
> > > > not
> > > > > > >>> exactly
> > > > > > >>> >> >> match the pending requests, so that resource manager
> can
> > > deal
> > > > > > with
> > > > > > >>> the
> > > > > > >>> >> >> extra resources.
> > > > > > >>> >> >>
> > > > > > >>> >> >>>       - If it is yes, what happens to the available
> > > resource
> > > > > in
> > > > > > >>> the
> > > > > > >>> >> >>
> > > > > > >>> >> >>       TaskManager.
> > > > > > >>> >> >>>       - What is the SlotStatus of the cached slot in
> > > > SlotPool?
> > > > > > The
> > > > > > >>> >> >>>       AllocationId is null?
> > > > > > >>> >> >>>
> > > > > > >>> >> >> The allocation id does not change as long as the slot
> is
> > > not
> > > > > > >>> returned
> > > > > > >>> >> >> from the job master, no matter its occupied or
> available
> > in
> > > > the
> > > > > > >>> slot
> > > > > > >>> >> pool.
> > > > > > >>> >> >> I think we have the same behavior currently. No matter
> > how
> > > > many
> > > > > > >>> tasks
> > > > > > >>> >> the
> > > > > > >>> >> >> job master deploy into the slot, concurrently or
> > > > sequentially,
> > > > > it
> > > > > > >>> is
> > > > > > >>> >> one
> > > > > > >>> >> >> allocation from the cluster to the job until the slot
> is
> > > > freed
> > > > > > from
> > > > > > >>> >> the job
> > > > > > >>> >> >> master.
> > > > > > >>> >> >>
> > > > > > >>> >> >>>    3. In a session cluster, some jobs are configured
> > with
> > > > > > operator
> > > > > > >>> >> >>>    resources, meanwhile other jobs are using UNKNOWN.
> > How
> > > to
> > > > > > deal
> > > > > > >>> with
> > > > > > >>> >> >>> this
> > > > > > >>> >> >>>    situation?
> > > > > > >>> >> >>
> > > > > > >>> >> >> As long as we do not mix unknown / specified resource
> > > > profiles
> > > > > > >>> within
> > > > > > >>> >> the
> > > > > > >>> >> >> same job / slot, there shouldn't be a problem. Resource
> > > > manager
> > > > > > >>> >> converts
> > > > > > >>> >> >> unknown resource profiles in slot requests to specified
> > > > default
> > > > > > >>> >> resource
> > > > > > >>> >> >> profiles, so they can be dynamically allocated from
> task
> > > > > > executors'
> > > > > > >>> >> >> available resources just as other slot requests with
> > > > specified
> > > > > > >>> resource
> > > > > > >>> >> >> profiles.
> > > > > > >>> >> >>
> > > > > > >>> >> >> Thank you~
> > > > > > >>> >> >>
> > > > > > >>> >> >> Xintong Song
> > > > > > >>> >> >>
> > > > > > >>> >> >>
> > > > > > >>> >> >>
> > > > > > >>> >> >> On Mon, Aug 19, 2019 at 11:39 AM Yang Wang <
> > > > > > danrtsey...@gmail.com>
> > > > > > >>> >> wrote:
> > > > > > >>> >> >>
> > > > > > >>> >> >>> Hi Xintong,
> > > > > > >>> >> >>>
> > > > > > >>> >> >>>
> > > > > > >>> >> >>> Thanks for your detailed proposal. I think many users
> > are
> > > > > > >>> suffering
> > > > > > >>> >> from
> > > > > > >>> >> >>> waste of resources. The resource spec of all task
> > managers
> > > > are
> > > > > > >>> same
> > > > > > >>> >> and
> > > > > > >>> >> >>> we
> > > > > > >>> >> >>> have to increase all task managers to make the heavy
> one
> > > > more
> > > > > > >>> stable.
> > > > > > >>> >> So
> > > > > > >>> >> >>> we
> > > > > > >>> >> >>> will benefit from the fine grained resource
> management a
> > > > lot.
> > > > > We
> > > > > > >>> could
> > > > > > >>> >> >>> get
> > > > > > >>> >> >>> better resource utilization and stability.
> > > > > > >>> >> >>>
> > > > > > >>> >> >>>
> > > > > > >>> >> >>> Just to share some thoughts.
> > > > > > >>> >> >>>
> > > > > > >>> >> >>>
> > > > > > >>> >> >>>
> > > > > > >>> >> >>>    1. How to calculate the resource specification of
> > > > > > >>> TaskManagers? Do
> > > > > > >>> >> >>> they
> > > > > > >>> >> >>>    have them same resource spec calculated based on
> the
> > > > > > >>> >> configuration? I
> > > > > > >>> >> >>> think
> > > > > > >>> >> >>>    we still have wasted resources in this situation.
> Or
> > we
> > > > > could
> > > > > > >>> start
> > > > > > >>> >> >>>    TaskManagers with different spec.
> > > > > > >>> >> >>>    2. If a slot is released and returned to SlotPool,
> > does
> > > > it
> > > > > > >>> could be
> > > > > > >>> >> >>>    reused by other SlotRequest that the request
> resource
> > > is
> > > > > > >>> smaller
> > > > > > >>> >> than
> > > > > > >>> >> >>> it?
> > > > > > >>> >> >>>       - If it is yes, what happens to the available
> > > resource
> > > > > in
> > > > > > >>> the
> > > > > > >>> >> >>>       TaskManager.
> > > > > > >>> >> >>>       - What is the SlotStatus of the cached slot in
> > > > SlotPool?
> > > > > > The
> > > > > > >>> >> >>>       AllocationId is null?
> > > > > > >>> >> >>>    3. In a session cluster, some jobs are configured
> > with
> > > > > > operator
> > > > > > >>> >> >>>    resources, meanwhile other jobs are using UNKNOWN.
> > How
> > > to
> > > > > > deal
> > > > > > >>> with
> > > > > > >>> >> >>> this
> > > > > > >>> >> >>>    situation?
> > > > > > >>> >> >>>
> > > > > > >>> >> >>>
> > > > > > >>> >> >>>
> > > > > > >>> >> >>> Best,
> > > > > > >>> >> >>> Yang
> > > > > > >>> >> >>>
> > > > > > >>> >> >>> Xintong Song <tonysong...@gmail.com> 于2019年8月16日周五
> > > > 下午8:57写道:
> > > > > > >>> >> >>>
> > > > > > >>> >> >>> > Thanks for the feedbacks, Yangze and Till.
> > > > > > >>> >> >>> >
> > > > > > >>> >> >>> > Yangze,
> > > > > > >>> >> >>> >
> > > > > > >>> >> >>> > I agree with you that we should make scheduling
> > strategy
> > > > > > >>> pluggable
> > > > > > >>> >> and
> > > > > > >>> >> >>> > optimize the strategy to reduce the memory
> > fragmentation
> > > > > > >>> problem,
> > > > > > >>> >> and
> > > > > > >>> >> >>> > thanks for the inputs on the potential algorithmic
> > > > > solutions.
> > > > > > >>> >> However,
> > > > > > >>> >> >>> I'm
> > > > > > >>> >> >>> > in favor of keep this FLIP focusing on the overall
> > > > mechanism
> > > > > > >>> design
> > > > > > >>> >> >>> rather
> > > > > > >>> >> >>> > than strategies. Solving the fragmentation issue
> > should
> > > be
> > > > > > >>> >> considered
> > > > > > >>> >> >>> as an
> > > > > > >>> >> >>> > optimization, and I agree with Till that we probably
> > > > should
> > > > > > >>> tackle
> > > > > > >>> >> this
> > > > > > >>> >> >>> > afterwards.
> > > > > > >>> >> >>> >
> > > > > > >>> >> >>> > Till,
> > > > > > >>> >> >>> >
> > > > > > >>> >> >>> > - Regarding splitting the FLIP, I think it makes
> > sense.
> > > > The
> > > > > > >>> operator
> > > > > > >>> >> >>> > resource management and dynamic slot allocation do
> not
> > > > have
> > > > > > much
> > > > > > >>> >> >>> dependency
> > > > > > >>> >> >>> > on each other.
> > > > > > >>> >> >>> >
> > > > > > >>> >> >>> > - Regarding the default slot size, I think this is
> > > similar
> > > > > to
> > > > > > >>> >> FLIP-49
> > > > > > >>> >> >>> [1]
> > > > > > >>> >> >>> > where we want all the deriving happens at one
> place. I
> > > > think
> > > > > > it
> > > > > > >>> >> would
> > > > > > >>> >> >>> be
> > > > > > >>> >> >>> > nice to pass the default slot size into the task
> > > executor
> > > > in
> > > > > > the
> > > > > > >>> >> same
> > > > > > >>> >> >>> way
> > > > > > >>> >> >>> > that we pass in the memory pool sizes in FLIP-49
> [1].
> > > > > > >>> >> >>> >
> > > > > > >>> >> >>> > - Regarding the return value of
> > > > > > >>> >> TaskExecutorGateway#requestResource, I
> > > > > > >>> >> >>> > think you're right. We should avoid using null as
> the
> > > > return
> > > > > > >>> value.
> > > > > > >>> >> I
> > > > > > >>> >> >>> think
> > > > > > >>> >> >>> > we probably should thrown an exception here.
> > > > > > >>> >> >>> >
> > > > > > >>> >> >>> > Thank you~
> > > > > > >>> >> >>> >
> > > > > > >>> >> >>> > Xintong Song
> > > > > > >>> >> >>> >
> > > > > > >>> >> >>> >
> > > > > > >>> >> >>> > [1]
> > > > > > >>> >> >>> >
> > > > > > >>> >> >>> >
> > > > > > >>> >> >>>
> > > > > > >>> >>
> > > > > > >>>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
> > > > > > >>> >> >>> >
> > > > > > >>> >> >>> > On Fri, Aug 16, 2019 at 2:18 PM Till Rohrmann <
> > > > > > >>> trohrm...@apache.org
> > > > > > >>> >> >
> > > > > > >>> >> >>> > wrote:
> > > > > > >>> >> >>> >
> > > > > > >>> >> >>> > > Hi Xintong,
> > > > > > >>> >> >>> > >
> > > > > > >>> >> >>> > > thanks for drafting this FLIP. I think your
> proposal
> > > > helps
> > > > > > to
> > > > > > >>> >> >>> improve the
> > > > > > >>> >> >>> > > execution of batch jobs more efficiently.
> Moreover,
> > it
> > > > > > >>> enables the
> > > > > > >>> >> >>> proper
> > > > > > >>> >> >>> > > integration of the Blink planner which is very
> > > important
> > > > > as
> > > > > > >>> well.
> > > > > > >>> >> >>> > >
> > > > > > >>> >> >>> > > Overall, the FLIP looks good to me. I was
> wondering
> > > > > whether
> > > > > > it
> > > > > > >>> >> >>> wouldn't
> > > > > > >>> >> >>> > > make sense to actually split it up into two FLIPs:
> > > > > Operator
> > > > > > >>> >> resource
> > > > > > >>> >> >>> > > management and dynamic slot allocation. I think
> > these
> > > > two
> > > > > > >>> FLIPs
> > > > > > >>> >> >>> could be
> > > > > > >>> >> >>> > > seen as orthogonal and it would decrease the scope
> > of
> > > > each
> > > > > > >>> >> individual
> > > > > > >>> >> >>> > FLIP.
> > > > > > >>> >> >>> > >
> > > > > > >>> >> >>> > > Some smaller comments:
> > > > > > >>> >> >>> > >
> > > > > > >>> >> >>> > > - I'm not sure whether we should pass in the
> default
> > > > slot
> > > > > > size
> > > > > > >>> >> via an
> > > > > > >>> >> >>> > > environment variable. Without having unified the
> way
> > > how
> > > > > > Flink
> > > > > > >>> >> >>> components
> > > > > > >>> >> >>> > > are configured [1], I think it would be better to
> > pass
> > > > it
> > > > > in
> > > > > > >>> as
> > > > > > >>> >> part
> > > > > > >>> >> >>> of
> > > > > > >>> >> >>> > the
> > > > > > >>> >> >>> > > configuration.
> > > > > > >>> >> >>> > > - I would avoid returning a null value from
> > > > > > >>> >> >>> > > TaskExecutorGateway#requestResource if it cannot
> be
> > > > > > fulfilled.
> > > > > > >>> >> >>> Either we
> > > > > > >>> >> >>> > > should introduce an explicit return value saying
> > this
> > > or
> > > > > > >>> throw an
> > > > > > >>> >> >>> > > exception.
> > > > > > >>> >> >>> > >
> > > > > > >>> >> >>> > > Concerning Yangze's comments: I think you are
> right
> > > that
> > > > > it
> > > > > > >>> would
> > > > > > >>> >> be
> > > > > > >>> >> >>> > > helpful to make the selection strategy pluggable.
> > Also
> > > > > > >>> batching
> > > > > > >>> >> slot
> > > > > > >>> >> >>> > > requests to the RM could be a good optimization.
> For
> > > the
> > > > > > sake
> > > > > > >>> of
> > > > > > >>> >> >>> keeping
> > > > > > >>> >> >>> > > the scope of this FLIP smaller I would try to
> tackle
> > > > these
> > > > > > >>> things
> > > > > > >>> >> >>> after
> > > > > > >>> >> >>> > the
> > > > > > >>> >> >>> > > initial version has been completed (without
> spoiling
> > > > these
> > > > > > >>> >> >>> optimization
> > > > > > >>> >> >>> > > opportunities). In particular batching the slot
> > > requests
> > > > > > >>> depends
> > > > > > >>> >> on
> > > > > > >>> >> >>> the
> > > > > > >>> >> >>> > > current scheduler refactoring and could also be
> > > realized
> > > > > on
> > > > > > >>> the RM
> > > > > > >>> >> >>> side
> > > > > > >>> >> >>> > > only.
> > > > > > >>> >> >>> > >
> > > > > > >>> >> >>> > > [1]
> > > > > > >>> >> >>> > >
> > > > > > >>> >> >>> > >
> > > > > > >>> >> >>> >
> > > > > > >>> >> >>>
> > > > > > >>> >>
> > > > > > >>>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
> > > > > > >>> >> >>> > >
> > > > > > >>> >> >>> > > Cheers,
> > > > > > >>> >> >>> > > Till
> > > > > > >>> >> >>> > >
> > > > > > >>> >> >>> > >
> > > > > > >>> >> >>> > >
> > > > > > >>> >> >>> > > On Fri, Aug 16, 2019 at 11:11 AM Yangze Guo <
> > > > > > >>> karma...@gmail.com>
> > > > > > >>> >> >>> wrote:
> > > > > > >>> >> >>> > >
> > > > > > >>> >> >>> > > > Hi, Xintong
> > > > > > >>> >> >>> > > >
> > > > > > >>> >> >>> > > > Thanks to propose this FLIP. The general design
> > > looks
> > > > > good
> > > > > > >>> to
> > > > > > >>> >> me,
> > > > > > >>> >> >>> +1
> > > > > > >>> >> >>> > > > for this feature.
> > > > > > >>> >> >>> > > >
> > > > > > >>> >> >>> > > > Since slots in the same task executor could have
> > > > > different
> > > > > > >>> >> resource
> > > > > > >>> >> >>> > > > profile, we will
> > > > > > >>> >> >>> > > > meet resource fragment problem. Think about this
> > > case:
> > > > > > >>> >> >>> > > >  - request A want 1G memory while request B & C
> > want
> > > > > 0.5G
> > > > > > >>> memory
> > > > > > >>> >> >>> > > >  - There are two task executors T1 & T2 with 1G
> > and
> > > > 0.5G
> > > > > > >>> free
> > > > > > >>> >> >>> memory
> > > > > > >>> >> >>> > > > respectively
> > > > > > >>> >> >>> > > > If B come first and we cut a slot from T1 for
> B, A
> > > > must
> > > > > > >>> wait for
> > > > > > >>> >> >>> the
> > > > > > >>> >> >>> > > > free resource from
> > > > > > >>> >> >>> > > > other task. But A could have been scheduled
> > > > immediately
> > > > > if
> > > > > > >>> we
> > > > > > >>> >> cut a
> > > > > > >>> >> >>> > > > slot from T2 for B.
> > > > > > >>> >> >>> > > >
> > > > > > >>> >> >>> > > > The logic of findMatchingSlot now become
> finding a
> > > > task
> > > > > > >>> executor
> > > > > > >>> >> >>> which
> > > > > > >>> >> >>> > > > has enough
> > > > > > >>> >> >>> > > > resource and then cut a slot from it. Current
> > method
> > > > > could
> > > > > > >>> be
> > > > > > >>> >> seen
> > > > > > >>> >> >>> as
> > > > > > >>> >> >>> > > > "First-fit strategy",
> > > > > > >>> >> >>> > > > which works well in general but sometimes could
> > not
> > > be
> > > > > the
> > > > > > >>> >> >>> optimization
> > > > > > >>> >> >>> > > > method.
> > > > > > >>> >> >>> > > >
> > > > > > >>> >> >>> > > > Actually, this problem could be abstracted as
> "Bin
> > > > > Packing
> > > > > > >>> >> >>> Problem"[1].
> > > > > > >>> >> >>> > > > Here are
> > > > > > >>> >> >>> > > > some common approximate algorithms:
> > > > > > >>> >> >>> > > > - First fit
> > > > > > >>> >> >>> > > > - Next fit
> > > > > > >>> >> >>> > > > - Best fit
> > > > > > >>> >> >>> > > >
> > > > > > >>> >> >>> > > > But it become multi-dimensional bin packing
> > problem
> > > if
> > > > > we
> > > > > > >>> take
> > > > > > >>> >> CPU
> > > > > > >>> >> >>> > > > into account. It hard
> > > > > > >>> >> >>> > > > to define which one is best fit now. Some
> research
> > > > > > addressed
> > > > > > >>> >> this
> > > > > > >>> >> >>> > > > problem, such like Tetris[2].
> > > > > > >>> >> >>> > > >
> > > > > > >>> >> >>> > > > Here are some thinking about it:
> > > > > > >>> >> >>> > > > 1. We could make the strategy of finding
> matching
> > > task
> > > > > > >>> executor
> > > > > > >>> >> >>> > > > pluginable. Let user to config the
> > > > > > >>> >> >>> > > > best strategy in their scenario.
> > > > > > >>> >> >>> > > > 2. We could support batch request interface in
> RM,
> > > > > because
> > > > > > >>> we
> > > > > > >>> >> have
> > > > > > >>> >> >>> > > > opportunities to optimize
> > > > > > >>> >> >>> > > > if we have more information. If we know the A,
> B,
> > C
> > > at
> > > > > the
> > > > > > >>> same
> > > > > > >>> >> >>> time,
> > > > > > >>> >> >>> > > > we could always make the best decision.
> > > > > > >>> >> >>> > > >
> > > > > > >>> >> >>> > > > [1] http://www.or.deis.unibo.it/kp/Chapter8.pdf
> > > > > > >>> >> >>> > > > [2]
> > > > > > >>> >> >>> >
> > > > > > >>> >>
> > > > > >
> > https://www.cs.cmu.edu/~xia/resources/Documents/grandl_sigcomm14.pdf
> > > > > > >>> >> >>> > > >
> > > > > > >>> >> >>> > > > Best,
> > > > > > >>> >> >>> > > > Yangze Guo
> > > > > > >>> >> >>> > > >
> > > > > > >>> >> >>> > > > On Thu, Aug 15, 2019 at 10:40 PM Xintong Song <
> > > > > > >>> >> >>> tonysong...@gmail.com>
> > > > > > >>> >> >>> > > > wrote:
> > > > > > >>> >> >>> > > > >
> > > > > > >>> >> >>> > > > > Hi everyone,
> > > > > > >>> >> >>> > > > >
> > > > > > >>> >> >>> > > > > We would like to start a discussion thread on
> > > > > "FLIP-53:
> > > > > > >>> Fine
> > > > > > >>> >> >>> Grained
> > > > > > >>> >> >>> > > > > Resource Management"[1], where we propose how
> to
> > > > > improve
> > > > > > >>> Flink
> > > > > > >>> >> >>> > resource
> > > > > > >>> >> >>> > > > > management and scheduling.
> > > > > > >>> >> >>> > > > >
> > > > > > >>> >> >>> > > > > This FLIP mainly discusses the following
> issues.
> > > > > > >>> >> >>> > > > >
> > > > > > >>> >> >>> > > > >    - How to support tasks with fine grained
> > > resource
> > > > > > >>> >> >>> requirements.
> > > > > > >>> >> >>> > > > >    - How to unify resource management for jobs
> > > with
> > > > /
> > > > > > >>> without
> > > > > > >>> >> >>> fine
> > > > > > >>> >> >>> > > > grained
> > > > > > >>> >> >>> > > > >    resource requirements.
> > > > > > >>> >> >>> > > > >    - How to unify resource management for
> > > streaming
> > > > /
> > > > > > >>> batch
> > > > > > >>> >> jobs.
> > > > > > >>> >> >>> > > > >
> > > > > > >>> >> >>> > > > > Key changes proposed in the FLIP are as
> follows.
> > > > > > >>> >> >>> > > > >
> > > > > > >>> >> >>> > > > >    - Unify memory management for operators
> with
> > /
> > > > > > without
> > > > > > >>> fine
> > > > > > >>> >> >>> > grained
> > > > > > >>> >> >>> > > > >    resource requirements by applying a
> fraction
> > > > based
> > > > > > >>> quota
> > > > > > >>> >> >>> > mechanism.
> > > > > > >>> >> >>> > > > >    - Unify resource scheduling for streaming
> and
> > > > batch
> > > > > > >>> jobs by
> > > > > > >>> >> >>> > setting
> > > > > > >>> >> >>> > > > slot
> > > > > > >>> >> >>> > > > >    sharing groups for pipelined regions during
> > > > > compiling
> > > > > > >>> >> stage.
> > > > > > >>> >> >>> > > > >    - Dynamically allocate slots from task
> > > executors'
> > > > > > >>> available
> > > > > > >>> >> >>> > > resources.
> > > > > > >>> >> >>> > > > >
> > > > > > >>> >> >>> > > > > Please find more details in the FLIP wiki
> > document
> > > > > [1].
> > > > > > >>> >> Looking
> > > > > > >>> >> >>> > forward
> > > > > > >>> >> >>> > > > to
> > > > > > >>> >> >>> > > > > your feedbacks.
> > > > > > >>> >> >>> > > > >
> > > > > > >>> >> >>> > > > > Thank you~
> > > > > > >>> >> >>> > > > >
> > > > > > >>> >> >>> > > > > Xintong Song
> > > > > > >>> >> >>> > > > >
> > > > > > >>> >> >>> > > > >
> > > > > > >>> >> >>> > > > > [1]
> > > > > > >>> >> >>> > > > >
> > > > > > >>> >> >>> > > >
> > > > > > >>> >> >>> > >
> > > > > > >>> >> >>> >
> > > > > > >>> >> >>>
> > > > > > >>> >>
> > > > > > >>>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Resource+Management
> > > > > > >>> >> >>> > > >
> > > > > > >>> >> >>> > >
> > > > > > >>> >> >>> >
> > > > > > >>> >> >>>
> > > > > > >>> >> >>
> > > > > > >>> >>
> > > > > > >>> >
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to