Updated the FLIP wiki page [1], with the following changes.

   - Remove the step of converting pipelined edges between different slot
   sharing groups into blocking edges.
   - Set `allSourcesInSamePipelinedRegion` to true by default.

Thank you~

Xintong Song



On Mon, Sep 2, 2019 at 11:50 AM Xintong Song <tonysong...@gmail.com> wrote:

> Regarding changing edge type, I think actually we don't need to do this
> for batch jobs neither, because we don't have public interfaces for users
> to explicitly set slot sharing groups in DataSet API and SQL/Table API. We
> have such interfaces in DataStream API only.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Aug 27, 2019 at 10:16 PM Xintong Song <tonysong...@gmail.com>
> wrote:
>
>> Thanks for the correction, Till.
>>
>> Regarding your comments:
>> - You are right, we should not change the edge type for streaming jobs.
>> Then I think we can change the option 'allSourcesInSamePipelinedRegion' in
>> step 2 to 'isStreamingJob', and implement the current step 2 before the
>> current step 1 so we can use this option to decide whether should change
>> the edge type. What do you think?
>> - Agree. It should be easier to make the default value of
>> 'allSourcesInSamePipelinedRegion' (or 'isStreamingJob') 'true', and set it
>> to 'false' when using DataSet API or blink planner.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Tue, Aug 27, 2019 at 8:59 PM Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Thanks for creating the implementation plan Xintong. Overall, the
>>> implementation plan looks good. I had a couple of comments:
>>>
>>> - What will happen if a user has defined a streaming job with two slot
>>> sharing groups? Would the code insert a blocking data exchange between
>>> these two groups? If yes, then this breaks existing Flink streaming jobs.
>>> - How do we detect unbounded streaming jobs to set
>>> the allSourcesInSamePipelinedRegion to `true`? Wouldn't it be easier to
>>> set
>>> it false if we are using the DataSet API or the Blink planner with a
>>> bounded job?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Aug 27, 2019 at 2:16 PM Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>> > I guess there is a typo since the link to the FLIP-53 is
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> > On Tue, Aug 27, 2019 at 1:42 PM Xintong Song <tonysong...@gmail.com>
>>> > wrote:
>>> >
>>> >> Added implementation steps for this FLIP on the wiki page [1].
>>> >>
>>> >>
>>> >> Thank you~
>>> >>
>>> >> Xintong Song
>>> >>
>>> >>
>>> >> [1]
>>> >>
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
>>> >>
>>> >> On Mon, Aug 19, 2019 at 10:29 PM Xintong Song <tonysong...@gmail.com>
>>> >> wrote:
>>> >>
>>> >> > Hi everyone,
>>> >> >
>>> >> > As Till suggested, the original "FLIP-53: Fine Grained Resource
>>> >> > Management" splits into two separate FLIPs,
>>> >> >
>>> >> >    - FLIP-53: Fine Grained Operator Resource Management [1]
>>> >> >    - FLIP-56: Dynamic Slot Allocation [2]
>>> >> >
>>> >> > We'll continue using this discussion thread for FLIP-53. For
>>> FLIP-56, I
>>> >> > just started a new discussion thread [3].
>>> >> >
>>> >> > Thank you~
>>> >> >
>>> >> > Xintong Song
>>> >> >
>>> >> >
>>> >> > [1]
>>> >> >
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
>>> >> >
>>> >> > [2]
>>> >> >
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
>>> >> >
>>> >> > [3]
>>> >> >
>>> >>
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-56-Dynamic-Slot-Allocation-td31960.html
>>> >> >
>>> >> > On Mon, Aug 19, 2019 at 2:55 PM Xintong Song <tonysong...@gmail.com
>>> >
>>> >> > wrote:
>>> >> >
>>> >> >> Thinks for the comments, Yang.
>>> >> >>
>>> >> >> Regarding your questions:
>>> >> >>
>>> >> >>    1. How to calculate the resource specification of TaskManagers?
>>> Do
>>> >> they
>>> >> >>>    have them same resource spec calculated based on the
>>> >> configuration? I
>>> >> >>> think
>>> >> >>>    we still have wasted resources in this situation. Or we could
>>> start
>>> >> >>>    TaskManagers with different spec.
>>> >> >>>
>>> >> >> I agree with you that we can further improve the resource utility
>>> by
>>> >> >> customizing task executors with different resource specifications.
>>> >> However,
>>> >> >> I'm in favor of limiting the scope of this FLIP and leave it as a
>>> >> future
>>> >> >> optimization. The plan for that part is to move the logic of
>>> deciding
>>> >> task
>>> >> >> executor specifications into the slot manager and make slot manager
>>> >> >> pluggable, so inside the slot manager plugin we can have different
>>> >> logics
>>> >> >> for deciding the task executor specifications.
>>> >> >>
>>> >> >>
>>> >> >>>    2. If a slot is released and returned to SlotPool, does it
>>> could be
>>> >> >>>    reused by other SlotRequest that the request resource is
>>> smaller
>>> >> than
>>> >> >>> it?
>>> >> >>>
>>> >> >> No, I think slot pool should always return slots if they do not
>>> exactly
>>> >> >> match the pending requests, so that resource manager can deal with
>>> the
>>> >> >> extra resources.
>>> >> >>
>>> >> >>>       - If it is yes, what happens to the available resource in
>>> the
>>> >> >>
>>> >> >>       TaskManager.
>>> >> >>>       - What is the SlotStatus of the cached slot in SlotPool? The
>>> >> >>>       AllocationId is null?
>>> >> >>>
>>> >> >> The allocation id does not change as long as the slot is not
>>> returned
>>> >> >> from the job master, no matter its occupied or available in the
>>> slot
>>> >> pool.
>>> >> >> I think we have the same behavior currently. No matter how many
>>> tasks
>>> >> the
>>> >> >> job master deploy into the slot, concurrently or sequentially, it
>>> is
>>> >> one
>>> >> >> allocation from the cluster to the job until the slot is freed from
>>> >> the job
>>> >> >> master.
>>> >> >>
>>> >> >>>    3. In a session cluster, some jobs are configured with operator
>>> >> >>>    resources, meanwhile other jobs are using UNKNOWN. How to deal
>>> with
>>> >> >>> this
>>> >> >>>    situation?
>>> >> >>
>>> >> >> As long as we do not mix unknown / specified resource profiles
>>> within
>>> >> the
>>> >> >> same job / slot, there shouldn't be a problem. Resource manager
>>> >> converts
>>> >> >> unknown resource profiles in slot requests to specified default
>>> >> resource
>>> >> >> profiles, so they can be dynamically allocated from task executors'
>>> >> >> available resources just as other slot requests with specified
>>> resource
>>> >> >> profiles.
>>> >> >>
>>> >> >> Thank you~
>>> >> >>
>>> >> >> Xintong Song
>>> >> >>
>>> >> >>
>>> >> >>
>>> >> >> On Mon, Aug 19, 2019 at 11:39 AM Yang Wang <danrtsey...@gmail.com>
>>> >> wrote:
>>> >> >>
>>> >> >>> Hi Xintong,
>>> >> >>>
>>> >> >>>
>>> >> >>> Thanks for your detailed proposal. I think many users are
>>> suffering
>>> >> from
>>> >> >>> waste of resources. The resource spec of all task managers are
>>> same
>>> >> and
>>> >> >>> we
>>> >> >>> have to increase all task managers to make the heavy one more
>>> stable.
>>> >> So
>>> >> >>> we
>>> >> >>> will benefit from the fine grained resource management a lot. We
>>> could
>>> >> >>> get
>>> >> >>> better resource utilization and stability.
>>> >> >>>
>>> >> >>>
>>> >> >>> Just to share some thoughts.
>>> >> >>>
>>> >> >>>
>>> >> >>>
>>> >> >>>    1. How to calculate the resource specification of
>>> TaskManagers? Do
>>> >> >>> they
>>> >> >>>    have them same resource spec calculated based on the
>>> >> configuration? I
>>> >> >>> think
>>> >> >>>    we still have wasted resources in this situation. Or we could
>>> start
>>> >> >>>    TaskManagers with different spec.
>>> >> >>>    2. If a slot is released and returned to SlotPool, does it
>>> could be
>>> >> >>>    reused by other SlotRequest that the request resource is
>>> smaller
>>> >> than
>>> >> >>> it?
>>> >> >>>       - If it is yes, what happens to the available resource in
>>> the
>>> >> >>>       TaskManager.
>>> >> >>>       - What is the SlotStatus of the cached slot in SlotPool? The
>>> >> >>>       AllocationId is null?
>>> >> >>>    3. In a session cluster, some jobs are configured with operator
>>> >> >>>    resources, meanwhile other jobs are using UNKNOWN. How to deal
>>> with
>>> >> >>> this
>>> >> >>>    situation?
>>> >> >>>
>>> >> >>>
>>> >> >>>
>>> >> >>> Best,
>>> >> >>> Yang
>>> >> >>>
>>> >> >>> Xintong Song <tonysong...@gmail.com> 于2019年8月16日周五 下午8:57写道:
>>> >> >>>
>>> >> >>> > Thanks for the feedbacks, Yangze and Till.
>>> >> >>> >
>>> >> >>> > Yangze,
>>> >> >>> >
>>> >> >>> > I agree with you that we should make scheduling strategy
>>> pluggable
>>> >> and
>>> >> >>> > optimize the strategy to reduce the memory fragmentation
>>> problem,
>>> >> and
>>> >> >>> > thanks for the inputs on the potential algorithmic solutions.
>>> >> However,
>>> >> >>> I'm
>>> >> >>> > in favor of keep this FLIP focusing on the overall mechanism
>>> design
>>> >> >>> rather
>>> >> >>> > than strategies. Solving the fragmentation issue should be
>>> >> considered
>>> >> >>> as an
>>> >> >>> > optimization, and I agree with Till that we probably should
>>> tackle
>>> >> this
>>> >> >>> > afterwards.
>>> >> >>> >
>>> >> >>> > Till,
>>> >> >>> >
>>> >> >>> > - Regarding splitting the FLIP, I think it makes sense. The
>>> operator
>>> >> >>> > resource management and dynamic slot allocation do not have much
>>> >> >>> dependency
>>> >> >>> > on each other.
>>> >> >>> >
>>> >> >>> > - Regarding the default slot size, I think this is similar to
>>> >> FLIP-49
>>> >> >>> [1]
>>> >> >>> > where we want all the deriving happens at one place. I think it
>>> >> would
>>> >> >>> be
>>> >> >>> > nice to pass the default slot size into the task executor in the
>>> >> same
>>> >> >>> way
>>> >> >>> > that we pass in the memory pool sizes in FLIP-49 [1].
>>> >> >>> >
>>> >> >>> > - Regarding the return value of
>>> >> TaskExecutorGateway#requestResource, I
>>> >> >>> > think you're right. We should avoid using null as the return
>>> value.
>>> >> I
>>> >> >>> think
>>> >> >>> > we probably should thrown an exception here.
>>> >> >>> >
>>> >> >>> > Thank you~
>>> >> >>> >
>>> >> >>> > Xintong Song
>>> >> >>> >
>>> >> >>> >
>>> >> >>> > [1]
>>> >> >>> >
>>> >> >>> >
>>> >> >>>
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
>>> >> >>> >
>>> >> >>> > On Fri, Aug 16, 2019 at 2:18 PM Till Rohrmann <
>>> trohrm...@apache.org
>>> >> >
>>> >> >>> > wrote:
>>> >> >>> >
>>> >> >>> > > Hi Xintong,
>>> >> >>> > >
>>> >> >>> > > thanks for drafting this FLIP. I think your proposal helps to
>>> >> >>> improve the
>>> >> >>> > > execution of batch jobs more efficiently. Moreover, it
>>> enables the
>>> >> >>> proper
>>> >> >>> > > integration of the Blink planner which is very important as
>>> well.
>>> >> >>> > >
>>> >> >>> > > Overall, the FLIP looks good to me. I was wondering whether it
>>> >> >>> wouldn't
>>> >> >>> > > make sense to actually split it up into two FLIPs: Operator
>>> >> resource
>>> >> >>> > > management and dynamic slot allocation. I think these two
>>> FLIPs
>>> >> >>> could be
>>> >> >>> > > seen as orthogonal and it would decrease the scope of each
>>> >> individual
>>> >> >>> > FLIP.
>>> >> >>> > >
>>> >> >>> > > Some smaller comments:
>>> >> >>> > >
>>> >> >>> > > - I'm not sure whether we should pass in the default slot size
>>> >> via an
>>> >> >>> > > environment variable. Without having unified the way how Flink
>>> >> >>> components
>>> >> >>> > > are configured [1], I think it would be better to pass it in
>>> as
>>> >> part
>>> >> >>> of
>>> >> >>> > the
>>> >> >>> > > configuration.
>>> >> >>> > > - I would avoid returning a null value from
>>> >> >>> > > TaskExecutorGateway#requestResource if it cannot be fulfilled.
>>> >> >>> Either we
>>> >> >>> > > should introduce an explicit return value saying this or
>>> throw an
>>> >> >>> > > exception.
>>> >> >>> > >
>>> >> >>> > > Concerning Yangze's comments: I think you are right that it
>>> would
>>> >> be
>>> >> >>> > > helpful to make the selection strategy pluggable. Also
>>> batching
>>> >> slot
>>> >> >>> > > requests to the RM could be a good optimization. For the sake
>>> of
>>> >> >>> keeping
>>> >> >>> > > the scope of this FLIP smaller I would try to tackle these
>>> things
>>> >> >>> after
>>> >> >>> > the
>>> >> >>> > > initial version has been completed (without spoiling these
>>> >> >>> optimization
>>> >> >>> > > opportunities). In particular batching the slot requests
>>> depends
>>> >> on
>>> >> >>> the
>>> >> >>> > > current scheduler refactoring and could also be realized on
>>> the RM
>>> >> >>> side
>>> >> >>> > > only.
>>> >> >>> > >
>>> >> >>> > > [1]
>>> >> >>> > >
>>> >> >>> > >
>>> >> >>> >
>>> >> >>>
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
>>> >> >>> > >
>>> >> >>> > > Cheers,
>>> >> >>> > > Till
>>> >> >>> > >
>>> >> >>> > >
>>> >> >>> > >
>>> >> >>> > > On Fri, Aug 16, 2019 at 11:11 AM Yangze Guo <
>>> karma...@gmail.com>
>>> >> >>> wrote:
>>> >> >>> > >
>>> >> >>> > > > Hi, Xintong
>>> >> >>> > > >
>>> >> >>> > > > Thanks to propose this FLIP. The general design looks good
>>> to
>>> >> me,
>>> >> >>> +1
>>> >> >>> > > > for this feature.
>>> >> >>> > > >
>>> >> >>> > > > Since slots in the same task executor could have different
>>> >> resource
>>> >> >>> > > > profile, we will
>>> >> >>> > > > meet resource fragment problem. Think about this case:
>>> >> >>> > > >  - request A want 1G memory while request B & C want 0.5G
>>> memory
>>> >> >>> > > >  - There are two task executors T1 & T2 with 1G and 0.5G
>>> free
>>> >> >>> memory
>>> >> >>> > > > respectively
>>> >> >>> > > > If B come first and we cut a slot from T1 for B, A must
>>> wait for
>>> >> >>> the
>>> >> >>> > > > free resource from
>>> >> >>> > > > other task. But A could have been scheduled immediately if
>>> we
>>> >> cut a
>>> >> >>> > > > slot from T2 for B.
>>> >> >>> > > >
>>> >> >>> > > > The logic of findMatchingSlot now become finding a task
>>> executor
>>> >> >>> which
>>> >> >>> > > > has enough
>>> >> >>> > > > resource and then cut a slot from it. Current method could
>>> be
>>> >> seen
>>> >> >>> as
>>> >> >>> > > > "First-fit strategy",
>>> >> >>> > > > which works well in general but sometimes could not be the
>>> >> >>> optimization
>>> >> >>> > > > method.
>>> >> >>> > > >
>>> >> >>> > > > Actually, this problem could be abstracted as "Bin Packing
>>> >> >>> Problem"[1].
>>> >> >>> > > > Here are
>>> >> >>> > > > some common approximate algorithms:
>>> >> >>> > > > - First fit
>>> >> >>> > > > - Next fit
>>> >> >>> > > > - Best fit
>>> >> >>> > > >
>>> >> >>> > > > But it become multi-dimensional bin packing problem if we
>>> take
>>> >> CPU
>>> >> >>> > > > into account. It hard
>>> >> >>> > > > to define which one is best fit now. Some research addressed
>>> >> this
>>> >> >>> > > > problem, such like Tetris[2].
>>> >> >>> > > >
>>> >> >>> > > > Here are some thinking about it:
>>> >> >>> > > > 1. We could make the strategy of finding matching task
>>> executor
>>> >> >>> > > > pluginable. Let user to config the
>>> >> >>> > > > best strategy in their scenario.
>>> >> >>> > > > 2. We could support batch request interface in RM, because
>>> we
>>> >> have
>>> >> >>> > > > opportunities to optimize
>>> >> >>> > > > if we have more information. If we know the A, B, C at the
>>> same
>>> >> >>> time,
>>> >> >>> > > > we could always make the best decision.
>>> >> >>> > > >
>>> >> >>> > > > [1] http://www.or.deis.unibo.it/kp/Chapter8.pdf
>>> >> >>> > > > [2]
>>> >> >>> >
>>> >> https://www.cs.cmu.edu/~xia/resources/Documents/grandl_sigcomm14.pdf
>>> >> >>> > > >
>>> >> >>> > > > Best,
>>> >> >>> > > > Yangze Guo
>>> >> >>> > > >
>>> >> >>> > > > On Thu, Aug 15, 2019 at 10:40 PM Xintong Song <
>>> >> >>> tonysong...@gmail.com>
>>> >> >>> > > > wrote:
>>> >> >>> > > > >
>>> >> >>> > > > > Hi everyone,
>>> >> >>> > > > >
>>> >> >>> > > > > We would like to start a discussion thread on "FLIP-53:
>>> Fine
>>> >> >>> Grained
>>> >> >>> > > > > Resource Management"[1], where we propose how to improve
>>> Flink
>>> >> >>> > resource
>>> >> >>> > > > > management and scheduling.
>>> >> >>> > > > >
>>> >> >>> > > > > This FLIP mainly discusses the following issues.
>>> >> >>> > > > >
>>> >> >>> > > > >    - How to support tasks with fine grained resource
>>> >> >>> requirements.
>>> >> >>> > > > >    - How to unify resource management for jobs with /
>>> without
>>> >> >>> fine
>>> >> >>> > > > grained
>>> >> >>> > > > >    resource requirements.
>>> >> >>> > > > >    - How to unify resource management for streaming /
>>> batch
>>> >> jobs.
>>> >> >>> > > > >
>>> >> >>> > > > > Key changes proposed in the FLIP are as follows.
>>> >> >>> > > > >
>>> >> >>> > > > >    - Unify memory management for operators with / without
>>> fine
>>> >> >>> > grained
>>> >> >>> > > > >    resource requirements by applying a fraction based
>>> quota
>>> >> >>> > mechanism.
>>> >> >>> > > > >    - Unify resource scheduling for streaming and batch
>>> jobs by
>>> >> >>> > setting
>>> >> >>> > > > slot
>>> >> >>> > > > >    sharing groups for pipelined regions during compiling
>>> >> stage.
>>> >> >>> > > > >    - Dynamically allocate slots from task executors'
>>> available
>>> >> >>> > > resources.
>>> >> >>> > > > >
>>> >> >>> > > > > Please find more details in the FLIP wiki document [1].
>>> >> Looking
>>> >> >>> > forward
>>> >> >>> > > > to
>>> >> >>> > > > > your feedbacks.
>>> >> >>> > > > >
>>> >> >>> > > > > Thank you~
>>> >> >>> > > > >
>>> >> >>> > > > > Xintong Song
>>> >> >>> > > > >
>>> >> >>> > > > >
>>> >> >>> > > > > [1]
>>> >> >>> > > > >
>>> >> >>> > > >
>>> >> >>> > >
>>> >> >>> >
>>> >> >>>
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Resource+Management
>>> >> >>> > > >
>>> >> >>> > >
>>> >> >>> >
>>> >> >>>
>>> >> >>
>>> >>
>>> >
>>>
>>

Reply via email to