Thanks Xintong for the explanation.

For question #1, I think it's good as long as DataSet job behaviors remains
the same.

For question #2, agreed that the resource difference is small enough(at
most 1 edge diff) in current supported point-wise execution edge connection
patterns.

Thanks,
Zhu Zhu

Xintong Song <tonysong...@gmail.com> 于2019年9月3日周二 下午6:58写道:

>  Thanks for the comments, Zhu & Kurt.
>
> Andrey and I also had some discussions offline, and I would like to first
> post a summary of our discussion:
>
>    1. The motivation of the fraction based approach is to unify resource
>    management for both operators with specified and unknown resource
>    requirements.
>    2. The fraction based approach proposed in this FLIP should only affect
>    streaming jobs (both bounded and unbounded). For DataSet jobs, there are
>    already some fraction based approach (in TaskConfig and ChainedDriver),
> and
>    we do not make any change to the existing approach.
>    3. The scope of this FLIP does not include discussion of how to set
>    ResourceSpec for operators.
>       1. For blink jobs, the optimizer can set operator resources for the
>       users, according to their configurations (default: unknown)
>       2. For DataStream jobs, there are no method / interface to set
>       operator resources at the moment (1.10). We can have in the future.
>       3. For DataSet jobs, there are existing user interfaces to set
>       operator resources.
>    4. The FLIP should explain more about how ResourceSpecs works
>       1. PhysicalTransformations (deployed with operators into the
>       StreamTasks) get ResourceSpec: unknown by default or known (e.g.
> from the
>       Blink planner)
>       2. While generating stream graph, calculate fractions and set to
>       StreamConfig
>       3. While scheduling, convert ResourceSpec to ResourceProfile
>       (ResourceSpec + network memory), and deploy to slots / TMs matching
> the
>       resources
>       4. While starting Task in TM, each operator gets fraction converted
>       back to the original absolute value requested by user or fair
> unknown share
>       of the slot
>       5. We should not set `allSourcesInSamePipelinedRegion` to `false` for
>    DataSet jobs. Behaviors of DataSet jobs should not be changed.
>    6. The FLIP document should differentiate works planed in this FLIP and
>    the future follow-ups more clearly, by put the follow-ups in a separate
>    section
>    7. Another limitation of the rejected alternative setting fractions at
>    scheduling time is that, the scheduler implementation does not know
> which
>    tasks will be deployed into the same slot in advance.
>
> Andrey, Please bring it up if there is anything I missed.
>
> Zhu, regarding your comments:
>
>    1. If we do not set `allSourcesInSamePipelinedRegion` to `false` for
>    DataSet jobs (point 5 in the discussion summary above), then there
>    shouldn't be any regression right?
>    2. I think it makes sense to set the max possible network memory for the
>    JobVertex. When you say parallel instances of the same JobVertex may
> have
>    need different network memory, I guess you mean the rescale scenarios
> where
>    parallelisms of upstream / downstream vertex cannot be exactly divided
> by
>    parallelism of downstream / upstream vertex? I would say it's
> acceptable to
>    have slight difference between actually needed and allocated network
> memory.
>    3. Yes, by numOpsUseOnHeapManagedMemory I mean
>    numOpsUseOnHeapManagedMemoryInTheSameSharedGroup. I'll update the doc.
>    4. Yes, it should be StreamingJobGraphGenerator. Thanks for the
>    correction.
>
>
> Kurt, regarding your comments:
>
>    1. I think we don't have network memory in ResourceSpec, which is the
>    user facing API. We only have network memory in ResourceProfile, which
> is
>    used internally for scheduling. The reason we do not expose network
> memory
>    to the user is that, currently how many network buffers each task needs
> is
>    decided by the topology of execution graph (how many input / output
>    channels it has).
>    2. In the section "Operator Resource Requirements": "For the first
>    version, we do not support mixing operators with specified / unknown
>    resource requirements in the same job. Either all or none of the
> operators
>    of the same job should specify their resource requirements.
>    StreamGraphGenerator should check this and throw an error when mixing of
>    specified / unknown resource requirements is detected, during the
>    compilation stage."
>    3. If the user set a resource requirement, then it is guaranteed that
>    the task should get at least the much resource, otherwise there should
> be
>    an exception. That should be guaranteed by the "Dynamic Slot Allocation"
>    approach (FLIP-56).
>
>
> I'll update the FLIP document addressing the comments ASAP.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Sep 3, 2019 at 2:42 PM Kurt Young <ykt...@gmail.com> wrote:
>
> > Thanks Xingtong for driving this effort, I haven't finished the whole
> > document yet,
> > but have couple of questions:
> >
> > 1. Regarding to network memory, the document said it will be derived by
> > framework
> > automatically. I'm wondering whether we should delete this dimension from
> > user-
> > facing API?
> >
> > 2. Regarding to fraction based quota, I don't quite get the meaning of
> > "slotSharingGroupOnHeapManagedMem" and
> "slotSharingGroupOffHeapManagedMem".
> > What if the sharing group is mixed with specified resource and UNKNOWN
> > resource
> > requirements.
> >
> > 3 IIUC, even user had set resource requirements, lets say 500MB off-heap
> > managed
> > memory, during execution the operator may or may not have 500MB off-heap
> > managed
> > memory, right?
> >
> > Best,
> > Kurt
> >
> >
> > On Mon, Sep 2, 2019 at 8:36 PM Zhu Zhu <reed...@gmail.com> wrote:
> >
> > > Thanks Xintong for proposing this improvement. Fine grained resources
> can
> > > be very helpful when user has good planning on resources.
> > >
> > > I have a few questions:
> > > 1. Currently in a batch job, vertices from different regions can run at
> > the
> > > same time in slots from the same shared group, as long as they do not
> > have
> > > data dependency on each other and available slot count is not smaller
> > than
> > > the *max* of parallelism of all tasks.
> > > With changes in this FLIP however, tasks from different regions cannot
> > > share slots anymore.
> > > Once available slot count is smaller than the *sum* of all parallelism
> of
> > > tasks from all regions, tasks may need to be executed sequentially,
> which
> > > might result in a performance regression.
> > > Is this(performance regression to existing DataSet jobs) considered as
> a
> > > necessary and accepted trade off in this FLIP?
> > >
> > > 2. The network memory depends on the input/output ExecutionEdge count
> and
> > > thus can be different even for parallel instances of the same
> JobVertex.
> > > Does this mean that when adding task resources to calculating the slot
> > > resource for a shared group, the max possible network memory of the
> > vertex
> > > instance shall be used?
> > > This might result in larger resource required than actually needed.
> > >
> > > And some minor comments:
> > > 1. Regarding "fracManagedMemOnHeap = 1 /
> numOpsUseOnHeapManagedMemory", I
> > > guess you mean numOpsUseOnHeapManagedMemoryInTheSameSharedGroup ?
> > > 2. I think the *StreamGraphGenerator* in the #Slot Sharing section and
> > > implementation step 4 should be *StreamingJobGraphGenerator*, as
> > > *StreamGraphGenerator* is not aware of JobGraph and pipelined region.
> > >
> > >
> > > Thanks,
> > > Zhu Zhu
> > >
> > > Xintong Song <tonysong...@gmail.com> 于2019年9月2日周一 上午11:59写道:
> > >
> > > > Updated the FLIP wiki page [1], with the following changes.
> > > >
> > > >    - Remove the step of converting pipelined edges between different
> > slot
> > > >    sharing groups into blocking edges.
> > > >    - Set `allSourcesInSamePipelinedRegion` to true by default.
> > > >
> > > > Thank you~
> > > >
> > > > Xintong Song
> > > >
> > > >
> > > >
> > > > On Mon, Sep 2, 2019 at 11:50 AM Xintong Song <tonysong...@gmail.com>
> > > > wrote:
> > > >
> > > > > Regarding changing edge type, I think actually we don't need to do
> > this
> > > > > for batch jobs neither, because we don't have public interfaces for
> > > users
> > > > > to explicitly set slot sharing groups in DataSet API and SQL/Table
> > API.
> > > > We
> > > > > have such interfaces in DataStream API only.
> > > > >
> > > > > Thank you~
> > > > >
> > > > > Xintong Song
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Aug 27, 2019 at 10:16 PM Xintong Song <
> tonysong...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > >> Thanks for the correction, Till.
> > > > >>
> > > > >> Regarding your comments:
> > > > >> - You are right, we should not change the edge type for streaming
> > > jobs.
> > > > >> Then I think we can change the option
> > > 'allSourcesInSamePipelinedRegion'
> > > > in
> > > > >> step 2 to 'isStreamingJob', and implement the current step 2
> before
> > > the
> > > > >> current step 1 so we can use this option to decide whether should
> > > change
> > > > >> the edge type. What do you think?
> > > > >> - Agree. It should be easier to make the default value of
> > > > >> 'allSourcesInSamePipelinedRegion' (or 'isStreamingJob') 'true',
> and
> > > set
> > > > it
> > > > >> to 'false' when using DataSet API or blink planner.
> > > > >>
> > > > >> Thank you~
> > > > >>
> > > > >> Xintong Song
> > > > >>
> > > > >>
> > > > >>
> > > > >> On Tue, Aug 27, 2019 at 8:59 PM Till Rohrmann <
> trohrm...@apache.org
> > >
> > > > >> wrote:
> > > > >>
> > > > >>> Thanks for creating the implementation plan Xintong. Overall, the
> > > > >>> implementation plan looks good. I had a couple of comments:
> > > > >>>
> > > > >>> - What will happen if a user has defined a streaming job with two
> > > slot
> > > > >>> sharing groups? Would the code insert a blocking data exchange
> > > between
> > > > >>> these two groups? If yes, then this breaks existing Flink
> streaming
> > > > jobs.
> > > > >>> - How do we detect unbounded streaming jobs to set
> > > > >>> the allSourcesInSamePipelinedRegion to `true`? Wouldn't it be
> > easier
> > > to
> > > > >>> set
> > > > >>> it false if we are using the DataSet API or the Blink planner
> with
> > a
> > > > >>> bounded job?
> > > > >>>
> > > > >>> Cheers,
> > > > >>> Till
> > > > >>>
> > > > >>> On Tue, Aug 27, 2019 at 2:16 PM Till Rohrmann <
> > trohrm...@apache.org>
> > > > >>> wrote:
> > > > >>>
> > > > >>> > I guess there is a typo since the link to the FLIP-53 is
> > > > >>> >
> > > > >>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
> > > > >>> >
> > > > >>> > Cheers,
> > > > >>> > Till
> > > > >>> >
> > > > >>> > On Tue, Aug 27, 2019 at 1:42 PM Xintong Song <
> > > tonysong...@gmail.com>
> > > > >>> > wrote:
> > > > >>> >
> > > > >>> >> Added implementation steps for this FLIP on the wiki page [1].
> > > > >>> >>
> > > > >>> >>
> > > > >>> >> Thank you~
> > > > >>> >>
> > > > >>> >> Xintong Song
> > > > >>> >>
> > > > >>> >>
> > > > >>> >> [1]
> > > > >>> >>
> > > > >>> >>
> > > > >>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
> > > > >>> >>
> > > > >>> >> On Mon, Aug 19, 2019 at 10:29 PM Xintong Song <
> > > > tonysong...@gmail.com>
> > > > >>> >> wrote:
> > > > >>> >>
> > > > >>> >> > Hi everyone,
> > > > >>> >> >
> > > > >>> >> > As Till suggested, the original "FLIP-53: Fine Grained
> > Resource
> > > > >>> >> > Management" splits into two separate FLIPs,
> > > > >>> >> >
> > > > >>> >> >    - FLIP-53: Fine Grained Operator Resource Management [1]
> > > > >>> >> >    - FLIP-56: Dynamic Slot Allocation [2]
> > > > >>> >> >
> > > > >>> >> > We'll continue using this discussion thread for FLIP-53. For
> > > > >>> FLIP-56, I
> > > > >>> >> > just started a new discussion thread [3].
> > > > >>> >> >
> > > > >>> >> > Thank you~
> > > > >>> >> >
> > > > >>> >> > Xintong Song
> > > > >>> >> >
> > > > >>> >> >
> > > > >>> >> > [1]
> > > > >>> >> >
> > > > >>> >>
> > > > >>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
> > > > >>> >> >
> > > > >>> >> > [2]
> > > > >>> >> >
> > > > >>> >>
> > > > >>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
> > > > >>> >> >
> > > > >>> >> > [3]
> > > > >>> >> >
> > > > >>> >>
> > > > >>>
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-56-Dynamic-Slot-Allocation-td31960.html
> > > > >>> >> >
> > > > >>> >> > On Mon, Aug 19, 2019 at 2:55 PM Xintong Song <
> > > > tonysong...@gmail.com
> > > > >>> >
> > > > >>> >> > wrote:
> > > > >>> >> >
> > > > >>> >> >> Thinks for the comments, Yang.
> > > > >>> >> >>
> > > > >>> >> >> Regarding your questions:
> > > > >>> >> >>
> > > > >>> >> >>    1. How to calculate the resource specification of
> > > > TaskManagers?
> > > > >>> Do
> > > > >>> >> they
> > > > >>> >> >>>    have them same resource spec calculated based on the
> > > > >>> >> configuration? I
> > > > >>> >> >>> think
> > > > >>> >> >>>    we still have wasted resources in this situation. Or we
> > > could
> > > > >>> start
> > > > >>> >> >>>    TaskManagers with different spec.
> > > > >>> >> >>>
> > > > >>> >> >> I agree with you that we can further improve the resource
> > > utility
> > > > >>> by
> > > > >>> >> >> customizing task executors with different resource
> > > > specifications.
> > > > >>> >> However,
> > > > >>> >> >> I'm in favor of limiting the scope of this FLIP and leave
> it
> > > as a
> > > > >>> >> future
> > > > >>> >> >> optimization. The plan for that part is to move the logic
> of
> > > > >>> deciding
> > > > >>> >> task
> > > > >>> >> >> executor specifications into the slot manager and make slot
> > > > manager
> > > > >>> >> >> pluggable, so inside the slot manager plugin we can have
> > > > different
> > > > >>> >> logics
> > > > >>> >> >> for deciding the task executor specifications.
> > > > >>> >> >>
> > > > >>> >> >>
> > > > >>> >> >>>    2. If a slot is released and returned to SlotPool, does
> > it
> > > > >>> could be
> > > > >>> >> >>>    reused by other SlotRequest that the request resource
> is
> > > > >>> smaller
> > > > >>> >> than
> > > > >>> >> >>> it?
> > > > >>> >> >>>
> > > > >>> >> >> No, I think slot pool should always return slots if they do
> > not
> > > > >>> exactly
> > > > >>> >> >> match the pending requests, so that resource manager can
> deal
> > > > with
> > > > >>> the
> > > > >>> >> >> extra resources.
> > > > >>> >> >>
> > > > >>> >> >>>       - If it is yes, what happens to the available
> resource
> > > in
> > > > >>> the
> > > > >>> >> >>
> > > > >>> >> >>       TaskManager.
> > > > >>> >> >>>       - What is the SlotStatus of the cached slot in
> > SlotPool?
> > > > The
> > > > >>> >> >>>       AllocationId is null?
> > > > >>> >> >>>
> > > > >>> >> >> The allocation id does not change as long as the slot is
> not
> > > > >>> returned
> > > > >>> >> >> from the job master, no matter its occupied or available in
> > the
> > > > >>> slot
> > > > >>> >> pool.
> > > > >>> >> >> I think we have the same behavior currently. No matter how
> > many
> > > > >>> tasks
> > > > >>> >> the
> > > > >>> >> >> job master deploy into the slot, concurrently or
> > sequentially,
> > > it
> > > > >>> is
> > > > >>> >> one
> > > > >>> >> >> allocation from the cluster to the job until the slot is
> > freed
> > > > from
> > > > >>> >> the job
> > > > >>> >> >> master.
> > > > >>> >> >>
> > > > >>> >> >>>    3. In a session cluster, some jobs are configured with
> > > > operator
> > > > >>> >> >>>    resources, meanwhile other jobs are using UNKNOWN. How
> to
> > > > deal
> > > > >>> with
> > > > >>> >> >>> this
> > > > >>> >> >>>    situation?
> > > > >>> >> >>
> > > > >>> >> >> As long as we do not mix unknown / specified resource
> > profiles
> > > > >>> within
> > > > >>> >> the
> > > > >>> >> >> same job / slot, there shouldn't be a problem. Resource
> > manager
> > > > >>> >> converts
> > > > >>> >> >> unknown resource profiles in slot requests to specified
> > default
> > > > >>> >> resource
> > > > >>> >> >> profiles, so they can be dynamically allocated from task
> > > > executors'
> > > > >>> >> >> available resources just as other slot requests with
> > specified
> > > > >>> resource
> > > > >>> >> >> profiles.
> > > > >>> >> >>
> > > > >>> >> >> Thank you~
> > > > >>> >> >>
> > > > >>> >> >> Xintong Song
> > > > >>> >> >>
> > > > >>> >> >>
> > > > >>> >> >>
> > > > >>> >> >> On Mon, Aug 19, 2019 at 11:39 AM Yang Wang <
> > > > danrtsey...@gmail.com>
> > > > >>> >> wrote:
> > > > >>> >> >>
> > > > >>> >> >>> Hi Xintong,
> > > > >>> >> >>>
> > > > >>> >> >>>
> > > > >>> >> >>> Thanks for your detailed proposal. I think many users are
> > > > >>> suffering
> > > > >>> >> from
> > > > >>> >> >>> waste of resources. The resource spec of all task managers
> > are
> > > > >>> same
> > > > >>> >> and
> > > > >>> >> >>> we
> > > > >>> >> >>> have to increase all task managers to make the heavy one
> > more
> > > > >>> stable.
> > > > >>> >> So
> > > > >>> >> >>> we
> > > > >>> >> >>> will benefit from the fine grained resource management a
> > lot.
> > > We
> > > > >>> could
> > > > >>> >> >>> get
> > > > >>> >> >>> better resource utilization and stability.
> > > > >>> >> >>>
> > > > >>> >> >>>
> > > > >>> >> >>> Just to share some thoughts.
> > > > >>> >> >>>
> > > > >>> >> >>>
> > > > >>> >> >>>
> > > > >>> >> >>>    1. How to calculate the resource specification of
> > > > >>> TaskManagers? Do
> > > > >>> >> >>> they
> > > > >>> >> >>>    have them same resource spec calculated based on the
> > > > >>> >> configuration? I
> > > > >>> >> >>> think
> > > > >>> >> >>>    we still have wasted resources in this situation. Or we
> > > could
> > > > >>> start
> > > > >>> >> >>>    TaskManagers with different spec.
> > > > >>> >> >>>    2. If a slot is released and returned to SlotPool, does
> > it
> > > > >>> could be
> > > > >>> >> >>>    reused by other SlotRequest that the request resource
> is
> > > > >>> smaller
> > > > >>> >> than
> > > > >>> >> >>> it?
> > > > >>> >> >>>       - If it is yes, what happens to the available
> resource
> > > in
> > > > >>> the
> > > > >>> >> >>>       TaskManager.
> > > > >>> >> >>>       - What is the SlotStatus of the cached slot in
> > SlotPool?
> > > > The
> > > > >>> >> >>>       AllocationId is null?
> > > > >>> >> >>>    3. In a session cluster, some jobs are configured with
> > > > operator
> > > > >>> >> >>>    resources, meanwhile other jobs are using UNKNOWN. How
> to
> > > > deal
> > > > >>> with
> > > > >>> >> >>> this
> > > > >>> >> >>>    situation?
> > > > >>> >> >>>
> > > > >>> >> >>>
> > > > >>> >> >>>
> > > > >>> >> >>> Best,
> > > > >>> >> >>> Yang
> > > > >>> >> >>>
> > > > >>> >> >>> Xintong Song <tonysong...@gmail.com> 于2019年8月16日周五
> > 下午8:57写道:
> > > > >>> >> >>>
> > > > >>> >> >>> > Thanks for the feedbacks, Yangze and Till.
> > > > >>> >> >>> >
> > > > >>> >> >>> > Yangze,
> > > > >>> >> >>> >
> > > > >>> >> >>> > I agree with you that we should make scheduling strategy
> > > > >>> pluggable
> > > > >>> >> and
> > > > >>> >> >>> > optimize the strategy to reduce the memory fragmentation
> > > > >>> problem,
> > > > >>> >> and
> > > > >>> >> >>> > thanks for the inputs on the potential algorithmic
> > > solutions.
> > > > >>> >> However,
> > > > >>> >> >>> I'm
> > > > >>> >> >>> > in favor of keep this FLIP focusing on the overall
> > mechanism
> > > > >>> design
> > > > >>> >> >>> rather
> > > > >>> >> >>> > than strategies. Solving the fragmentation issue should
> be
> > > > >>> >> considered
> > > > >>> >> >>> as an
> > > > >>> >> >>> > optimization, and I agree with Till that we probably
> > should
> > > > >>> tackle
> > > > >>> >> this
> > > > >>> >> >>> > afterwards.
> > > > >>> >> >>> >
> > > > >>> >> >>> > Till,
> > > > >>> >> >>> >
> > > > >>> >> >>> > - Regarding splitting the FLIP, I think it makes sense.
> > The
> > > > >>> operator
> > > > >>> >> >>> > resource management and dynamic slot allocation do not
> > have
> > > > much
> > > > >>> >> >>> dependency
> > > > >>> >> >>> > on each other.
> > > > >>> >> >>> >
> > > > >>> >> >>> > - Regarding the default slot size, I think this is
> similar
> > > to
> > > > >>> >> FLIP-49
> > > > >>> >> >>> [1]
> > > > >>> >> >>> > where we want all the deriving happens at one place. I
> > think
> > > > it
> > > > >>> >> would
> > > > >>> >> >>> be
> > > > >>> >> >>> > nice to pass the default slot size into the task
> executor
> > in
> > > > the
> > > > >>> >> same
> > > > >>> >> >>> way
> > > > >>> >> >>> > that we pass in the memory pool sizes in FLIP-49 [1].
> > > > >>> >> >>> >
> > > > >>> >> >>> > - Regarding the return value of
> > > > >>> >> TaskExecutorGateway#requestResource, I
> > > > >>> >> >>> > think you're right. We should avoid using null as the
> > return
> > > > >>> value.
> > > > >>> >> I
> > > > >>> >> >>> think
> > > > >>> >> >>> > we probably should thrown an exception here.
> > > > >>> >> >>> >
> > > > >>> >> >>> > Thank you~
> > > > >>> >> >>> >
> > > > >>> >> >>> > Xintong Song
> > > > >>> >> >>> >
> > > > >>> >> >>> >
> > > > >>> >> >>> > [1]
> > > > >>> >> >>> >
> > > > >>> >> >>> >
> > > > >>> >> >>>
> > > > >>> >>
> > > > >>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
> > > > >>> >> >>> >
> > > > >>> >> >>> > On Fri, Aug 16, 2019 at 2:18 PM Till Rohrmann <
> > > > >>> trohrm...@apache.org
> > > > >>> >> >
> > > > >>> >> >>> > wrote:
> > > > >>> >> >>> >
> > > > >>> >> >>> > > Hi Xintong,
> > > > >>> >> >>> > >
> > > > >>> >> >>> > > thanks for drafting this FLIP. I think your proposal
> > helps
> > > > to
> > > > >>> >> >>> improve the
> > > > >>> >> >>> > > execution of batch jobs more efficiently. Moreover, it
> > > > >>> enables the
> > > > >>> >> >>> proper
> > > > >>> >> >>> > > integration of the Blink planner which is very
> important
> > > as
> > > > >>> well.
> > > > >>> >> >>> > >
> > > > >>> >> >>> > > Overall, the FLIP looks good to me. I was wondering
> > > whether
> > > > it
> > > > >>> >> >>> wouldn't
> > > > >>> >> >>> > > make sense to actually split it up into two FLIPs:
> > > Operator
> > > > >>> >> resource
> > > > >>> >> >>> > > management and dynamic slot allocation. I think these
> > two
> > > > >>> FLIPs
> > > > >>> >> >>> could be
> > > > >>> >> >>> > > seen as orthogonal and it would decrease the scope of
> > each
> > > > >>> >> individual
> > > > >>> >> >>> > FLIP.
> > > > >>> >> >>> > >
> > > > >>> >> >>> > > Some smaller comments:
> > > > >>> >> >>> > >
> > > > >>> >> >>> > > - I'm not sure whether we should pass in the default
> > slot
> > > > size
> > > > >>> >> via an
> > > > >>> >> >>> > > environment variable. Without having unified the way
> how
> > > > Flink
> > > > >>> >> >>> components
> > > > >>> >> >>> > > are configured [1], I think it would be better to pass
> > it
> > > in
> > > > >>> as
> > > > >>> >> part
> > > > >>> >> >>> of
> > > > >>> >> >>> > the
> > > > >>> >> >>> > > configuration.
> > > > >>> >> >>> > > - I would avoid returning a null value from
> > > > >>> >> >>> > > TaskExecutorGateway#requestResource if it cannot be
> > > > fulfilled.
> > > > >>> >> >>> Either we
> > > > >>> >> >>> > > should introduce an explicit return value saying this
> or
> > > > >>> throw an
> > > > >>> >> >>> > > exception.
> > > > >>> >> >>> > >
> > > > >>> >> >>> > > Concerning Yangze's comments: I think you are right
> that
> > > it
> > > > >>> would
> > > > >>> >> be
> > > > >>> >> >>> > > helpful to make the selection strategy pluggable. Also
> > > > >>> batching
> > > > >>> >> slot
> > > > >>> >> >>> > > requests to the RM could be a good optimization. For
> the
> > > > sake
> > > > >>> of
> > > > >>> >> >>> keeping
> > > > >>> >> >>> > > the scope of this FLIP smaller I would try to tackle
> > these
> > > > >>> things
> > > > >>> >> >>> after
> > > > >>> >> >>> > the
> > > > >>> >> >>> > > initial version has been completed (without spoiling
> > these
> > > > >>> >> >>> optimization
> > > > >>> >> >>> > > opportunities). In particular batching the slot
> requests
> > > > >>> depends
> > > > >>> >> on
> > > > >>> >> >>> the
> > > > >>> >> >>> > > current scheduler refactoring and could also be
> realized
> > > on
> > > > >>> the RM
> > > > >>> >> >>> side
> > > > >>> >> >>> > > only.
> > > > >>> >> >>> > >
> > > > >>> >> >>> > > [1]
> > > > >>> >> >>> > >
> > > > >>> >> >>> > >
> > > > >>> >> >>> >
> > > > >>> >> >>>
> > > > >>> >>
> > > > >>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
> > > > >>> >> >>> > >
> > > > >>> >> >>> > > Cheers,
> > > > >>> >> >>> > > Till
> > > > >>> >> >>> > >
> > > > >>> >> >>> > >
> > > > >>> >> >>> > >
> > > > >>> >> >>> > > On Fri, Aug 16, 2019 at 11:11 AM Yangze Guo <
> > > > >>> karma...@gmail.com>
> > > > >>> >> >>> wrote:
> > > > >>> >> >>> > >
> > > > >>> >> >>> > > > Hi, Xintong
> > > > >>> >> >>> > > >
> > > > >>> >> >>> > > > Thanks to propose this FLIP. The general design
> looks
> > > good
> > > > >>> to
> > > > >>> >> me,
> > > > >>> >> >>> +1
> > > > >>> >> >>> > > > for this feature.
> > > > >>> >> >>> > > >
> > > > >>> >> >>> > > > Since slots in the same task executor could have
> > > different
> > > > >>> >> resource
> > > > >>> >> >>> > > > profile, we will
> > > > >>> >> >>> > > > meet resource fragment problem. Think about this
> case:
> > > > >>> >> >>> > > >  - request A want 1G memory while request B & C want
> > > 0.5G
> > > > >>> memory
> > > > >>> >> >>> > > >  - There are two task executors T1 & T2 with 1G and
> > 0.5G
> > > > >>> free
> > > > >>> >> >>> memory
> > > > >>> >> >>> > > > respectively
> > > > >>> >> >>> > > > If B come first and we cut a slot from T1 for B, A
> > must
> > > > >>> wait for
> > > > >>> >> >>> the
> > > > >>> >> >>> > > > free resource from
> > > > >>> >> >>> > > > other task. But A could have been scheduled
> > immediately
> > > if
> > > > >>> we
> > > > >>> >> cut a
> > > > >>> >> >>> > > > slot from T2 for B.
> > > > >>> >> >>> > > >
> > > > >>> >> >>> > > > The logic of findMatchingSlot now become finding a
> > task
> > > > >>> executor
> > > > >>> >> >>> which
> > > > >>> >> >>> > > > has enough
> > > > >>> >> >>> > > > resource and then cut a slot from it. Current method
> > > could
> > > > >>> be
> > > > >>> >> seen
> > > > >>> >> >>> as
> > > > >>> >> >>> > > > "First-fit strategy",
> > > > >>> >> >>> > > > which works well in general but sometimes could not
> be
> > > the
> > > > >>> >> >>> optimization
> > > > >>> >> >>> > > > method.
> > > > >>> >> >>> > > >
> > > > >>> >> >>> > > > Actually, this problem could be abstracted as "Bin
> > > Packing
> > > > >>> >> >>> Problem"[1].
> > > > >>> >> >>> > > > Here are
> > > > >>> >> >>> > > > some common approximate algorithms:
> > > > >>> >> >>> > > > - First fit
> > > > >>> >> >>> > > > - Next fit
> > > > >>> >> >>> > > > - Best fit
> > > > >>> >> >>> > > >
> > > > >>> >> >>> > > > But it become multi-dimensional bin packing problem
> if
> > > we
> > > > >>> take
> > > > >>> >> CPU
> > > > >>> >> >>> > > > into account. It hard
> > > > >>> >> >>> > > > to define which one is best fit now. Some research
> > > > addressed
> > > > >>> >> this
> > > > >>> >> >>> > > > problem, such like Tetris[2].
> > > > >>> >> >>> > > >
> > > > >>> >> >>> > > > Here are some thinking about it:
> > > > >>> >> >>> > > > 1. We could make the strategy of finding matching
> task
> > > > >>> executor
> > > > >>> >> >>> > > > pluginable. Let user to config the
> > > > >>> >> >>> > > > best strategy in their scenario.
> > > > >>> >> >>> > > > 2. We could support batch request interface in RM,
> > > because
> > > > >>> we
> > > > >>> >> have
> > > > >>> >> >>> > > > opportunities to optimize
> > > > >>> >> >>> > > > if we have more information. If we know the A, B, C
> at
> > > the
> > > > >>> same
> > > > >>> >> >>> time,
> > > > >>> >> >>> > > > we could always make the best decision.
> > > > >>> >> >>> > > >
> > > > >>> >> >>> > > > [1] http://www.or.deis.unibo.it/kp/Chapter8.pdf
> > > > >>> >> >>> > > > [2]
> > > > >>> >> >>> >
> > > > >>> >>
> > > > https://www.cs.cmu.edu/~xia/resources/Documents/grandl_sigcomm14.pdf
> > > > >>> >> >>> > > >
> > > > >>> >> >>> > > > Best,
> > > > >>> >> >>> > > > Yangze Guo
> > > > >>> >> >>> > > >
> > > > >>> >> >>> > > > On Thu, Aug 15, 2019 at 10:40 PM Xintong Song <
> > > > >>> >> >>> tonysong...@gmail.com>
> > > > >>> >> >>> > > > wrote:
> > > > >>> >> >>> > > > >
> > > > >>> >> >>> > > > > Hi everyone,
> > > > >>> >> >>> > > > >
> > > > >>> >> >>> > > > > We would like to start a discussion thread on
> > > "FLIP-53:
> > > > >>> Fine
> > > > >>> >> >>> Grained
> > > > >>> >> >>> > > > > Resource Management"[1], where we propose how to
> > > improve
> > > > >>> Flink
> > > > >>> >> >>> > resource
> > > > >>> >> >>> > > > > management and scheduling.
> > > > >>> >> >>> > > > >
> > > > >>> >> >>> > > > > This FLIP mainly discusses the following issues.
> > > > >>> >> >>> > > > >
> > > > >>> >> >>> > > > >    - How to support tasks with fine grained
> resource
> > > > >>> >> >>> requirements.
> > > > >>> >> >>> > > > >    - How to unify resource management for jobs
> with
> > /
> > > > >>> without
> > > > >>> >> >>> fine
> > > > >>> >> >>> > > > grained
> > > > >>> >> >>> > > > >    resource requirements.
> > > > >>> >> >>> > > > >    - How to unify resource management for
> streaming
> > /
> > > > >>> batch
> > > > >>> >> jobs.
> > > > >>> >> >>> > > > >
> > > > >>> >> >>> > > > > Key changes proposed in the FLIP are as follows.
> > > > >>> >> >>> > > > >
> > > > >>> >> >>> > > > >    - Unify memory management for operators with /
> > > > without
> > > > >>> fine
> > > > >>> >> >>> > grained
> > > > >>> >> >>> > > > >    resource requirements by applying a fraction
> > based
> > > > >>> quota
> > > > >>> >> >>> > mechanism.
> > > > >>> >> >>> > > > >    - Unify resource scheduling for streaming and
> > batch
> > > > >>> jobs by
> > > > >>> >> >>> > setting
> > > > >>> >> >>> > > > slot
> > > > >>> >> >>> > > > >    sharing groups for pipelined regions during
> > > compiling
> > > > >>> >> stage.
> > > > >>> >> >>> > > > >    - Dynamically allocate slots from task
> executors'
> > > > >>> available
> > > > >>> >> >>> > > resources.
> > > > >>> >> >>> > > > >
> > > > >>> >> >>> > > > > Please find more details in the FLIP wiki document
> > > [1].
> > > > >>> >> Looking
> > > > >>> >> >>> > forward
> > > > >>> >> >>> > > > to
> > > > >>> >> >>> > > > > your feedbacks.
> > > > >>> >> >>> > > > >
> > > > >>> >> >>> > > > > Thank you~
> > > > >>> >> >>> > > > >
> > > > >>> >> >>> > > > > Xintong Song
> > > > >>> >> >>> > > > >
> > > > >>> >> >>> > > > >
> > > > >>> >> >>> > > > > [1]
> > > > >>> >> >>> > > > >
> > > > >>> >> >>> > > >
> > > > >>> >> >>> > >
> > > > >>> >> >>> >
> > > > >>> >> >>>
> > > > >>> >>
> > > > >>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Resource+Management
> > > > >>> >> >>> > > >
> > > > >>> >> >>> > >
> > > > >>> >> >>> >
> > > > >>> >> >>>
> > > > >>> >> >>
> > > > >>> >>
> > > > >>> >
> > > > >>>
> > > > >>
> > > >
> > >
> >
>

Reply via email to