Thanks for the correction, Till.

Regarding your comments:
- You are right, we should not change the edge type for streaming jobs.
Then I think we can change the option 'allSourcesInSamePipelinedRegion' in
step 2 to 'isStreamingJob', and implement the current step 2 before the
current step 1 so we can use this option to decide whether should change
the edge type. What do you think?
- Agree. It should be easier to make the default value of
'allSourcesInSamePipelinedRegion' (or 'isStreamingJob') 'true', and set it
to 'false' when using DataSet API or blink planner.

Thank you~

Xintong Song



On Tue, Aug 27, 2019 at 8:59 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Thanks for creating the implementation plan Xintong. Overall, the
> implementation plan looks good. I had a couple of comments:
>
> - What will happen if a user has defined a streaming job with two slot
> sharing groups? Would the code insert a blocking data exchange between
> these two groups? If yes, then this breaks existing Flink streaming jobs.
> - How do we detect unbounded streaming jobs to set
> the allSourcesInSamePipelinedRegion to `true`? Wouldn't it be easier to set
> it false if we are using the DataSet API or the Blink planner with a
> bounded job?
>
> Cheers,
> Till
>
> On Tue, Aug 27, 2019 at 2:16 PM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
> > I guess there is a typo since the link to the FLIP-53 is
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
> >
> > Cheers,
> > Till
> >
> > On Tue, Aug 27, 2019 at 1:42 PM Xintong Song <tonysong...@gmail.com>
> > wrote:
> >
> >> Added implementation steps for this FLIP on the wiki page [1].
> >>
> >>
> >> Thank you~
> >>
> >> Xintong Song
> >>
> >>
> >> [1]
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
> >>
> >> On Mon, Aug 19, 2019 at 10:29 PM Xintong Song <tonysong...@gmail.com>
> >> wrote:
> >>
> >> > Hi everyone,
> >> >
> >> > As Till suggested, the original "FLIP-53: Fine Grained Resource
> >> > Management" splits into two separate FLIPs,
> >> >
> >> >    - FLIP-53: Fine Grained Operator Resource Management [1]
> >> >    - FLIP-56: Dynamic Slot Allocation [2]
> >> >
> >> > We'll continue using this discussion thread for FLIP-53. For FLIP-56,
> I
> >> > just started a new discussion thread [3].
> >> >
> >> > Thank you~
> >> >
> >> > Xintong Song
> >> >
> >> >
> >> > [1]
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
> >> >
> >> > [2]
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
> >> >
> >> > [3]
> >> >
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-56-Dynamic-Slot-Allocation-td31960.html
> >> >
> >> > On Mon, Aug 19, 2019 at 2:55 PM Xintong Song <tonysong...@gmail.com>
> >> > wrote:
> >> >
> >> >> Thinks for the comments, Yang.
> >> >>
> >> >> Regarding your questions:
> >> >>
> >> >>    1. How to calculate the resource specification of TaskManagers? Do
> >> they
> >> >>>    have them same resource spec calculated based on the
> >> configuration? I
> >> >>> think
> >> >>>    we still have wasted resources in this situation. Or we could
> start
> >> >>>    TaskManagers with different spec.
> >> >>>
> >> >> I agree with you that we can further improve the resource utility by
> >> >> customizing task executors with different resource specifications.
> >> However,
> >> >> I'm in favor of limiting the scope of this FLIP and leave it as a
> >> future
> >> >> optimization. The plan for that part is to move the logic of deciding
> >> task
> >> >> executor specifications into the slot manager and make slot manager
> >> >> pluggable, so inside the slot manager plugin we can have different
> >> logics
> >> >> for deciding the task executor specifications.
> >> >>
> >> >>
> >> >>>    2. If a slot is released and returned to SlotPool, does it could
> be
> >> >>>    reused by other SlotRequest that the request resource is smaller
> >> than
> >> >>> it?
> >> >>>
> >> >> No, I think slot pool should always return slots if they do not
> exactly
> >> >> match the pending requests, so that resource manager can deal with
> the
> >> >> extra resources.
> >> >>
> >> >>>       - If it is yes, what happens to the available resource in the
> >> >>
> >> >>       TaskManager.
> >> >>>       - What is the SlotStatus of the cached slot in SlotPool? The
> >> >>>       AllocationId is null?
> >> >>>
> >> >> The allocation id does not change as long as the slot is not returned
> >> >> from the job master, no matter its occupied or available in the slot
> >> pool.
> >> >> I think we have the same behavior currently. No matter how many tasks
> >> the
> >> >> job master deploy into the slot, concurrently or sequentially, it is
> >> one
> >> >> allocation from the cluster to the job until the slot is freed from
> >> the job
> >> >> master.
> >> >>
> >> >>>    3. In a session cluster, some jobs are configured with operator
> >> >>>    resources, meanwhile other jobs are using UNKNOWN. How to deal
> with
> >> >>> this
> >> >>>    situation?
> >> >>
> >> >> As long as we do not mix unknown / specified resource profiles within
> >> the
> >> >> same job / slot, there shouldn't be a problem. Resource manager
> >> converts
> >> >> unknown resource profiles in slot requests to specified default
> >> resource
> >> >> profiles, so they can be dynamically allocated from task executors'
> >> >> available resources just as other slot requests with specified
> resource
> >> >> profiles.
> >> >>
> >> >> Thank you~
> >> >>
> >> >> Xintong Song
> >> >>
> >> >>
> >> >>
> >> >> On Mon, Aug 19, 2019 at 11:39 AM Yang Wang <danrtsey...@gmail.com>
> >> wrote:
> >> >>
> >> >>> Hi Xintong,
> >> >>>
> >> >>>
> >> >>> Thanks for your detailed proposal. I think many users are suffering
> >> from
> >> >>> waste of resources. The resource spec of all task managers are same
> >> and
> >> >>> we
> >> >>> have to increase all task managers to make the heavy one more
> stable.
> >> So
> >> >>> we
> >> >>> will benefit from the fine grained resource management a lot. We
> could
> >> >>> get
> >> >>> better resource utilization and stability.
> >> >>>
> >> >>>
> >> >>> Just to share some thoughts.
> >> >>>
> >> >>>
> >> >>>
> >> >>>    1. How to calculate the resource specification of TaskManagers?
> Do
> >> >>> they
> >> >>>    have them same resource spec calculated based on the
> >> configuration? I
> >> >>> think
> >> >>>    we still have wasted resources in this situation. Or we could
> start
> >> >>>    TaskManagers with different spec.
> >> >>>    2. If a slot is released and returned to SlotPool, does it could
> be
> >> >>>    reused by other SlotRequest that the request resource is smaller
> >> than
> >> >>> it?
> >> >>>       - If it is yes, what happens to the available resource in the
> >> >>>       TaskManager.
> >> >>>       - What is the SlotStatus of the cached slot in SlotPool? The
> >> >>>       AllocationId is null?
> >> >>>    3. In a session cluster, some jobs are configured with operator
> >> >>>    resources, meanwhile other jobs are using UNKNOWN. How to deal
> with
> >> >>> this
> >> >>>    situation?
> >> >>>
> >> >>>
> >> >>>
> >> >>> Best,
> >> >>> Yang
> >> >>>
> >> >>> Xintong Song <tonysong...@gmail.com> 于2019年8月16日周五 下午8:57写道:
> >> >>>
> >> >>> > Thanks for the feedbacks, Yangze and Till.
> >> >>> >
> >> >>> > Yangze,
> >> >>> >
> >> >>> > I agree with you that we should make scheduling strategy pluggable
> >> and
> >> >>> > optimize the strategy to reduce the memory fragmentation problem,
> >> and
> >> >>> > thanks for the inputs on the potential algorithmic solutions.
> >> However,
> >> >>> I'm
> >> >>> > in favor of keep this FLIP focusing on the overall mechanism
> design
> >> >>> rather
> >> >>> > than strategies. Solving the fragmentation issue should be
> >> considered
> >> >>> as an
> >> >>> > optimization, and I agree with Till that we probably should tackle
> >> this
> >> >>> > afterwards.
> >> >>> >
> >> >>> > Till,
> >> >>> >
> >> >>> > - Regarding splitting the FLIP, I think it makes sense. The
> operator
> >> >>> > resource management and dynamic slot allocation do not have much
> >> >>> dependency
> >> >>> > on each other.
> >> >>> >
> >> >>> > - Regarding the default slot size, I think this is similar to
> >> FLIP-49
> >> >>> [1]
> >> >>> > where we want all the deriving happens at one place. I think it
> >> would
> >> >>> be
> >> >>> > nice to pass the default slot size into the task executor in the
> >> same
> >> >>> way
> >> >>> > that we pass in the memory pool sizes in FLIP-49 [1].
> >> >>> >
> >> >>> > - Regarding the return value of
> >> TaskExecutorGateway#requestResource, I
> >> >>> > think you're right. We should avoid using null as the return
> value.
> >> I
> >> >>> think
> >> >>> > we probably should thrown an exception here.
> >> >>> >
> >> >>> > Thank you~
> >> >>> >
> >> >>> > Xintong Song
> >> >>> >
> >> >>> >
> >> >>> > [1]
> >> >>> >
> >> >>> >
> >> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
> >> >>> >
> >> >>> > On Fri, Aug 16, 2019 at 2:18 PM Till Rohrmann <
> trohrm...@apache.org
> >> >
> >> >>> > wrote:
> >> >>> >
> >> >>> > > Hi Xintong,
> >> >>> > >
> >> >>> > > thanks for drafting this FLIP. I think your proposal helps to
> >> >>> improve the
> >> >>> > > execution of batch jobs more efficiently. Moreover, it enables
> the
> >> >>> proper
> >> >>> > > integration of the Blink planner which is very important as
> well.
> >> >>> > >
> >> >>> > > Overall, the FLIP looks good to me. I was wondering whether it
> >> >>> wouldn't
> >> >>> > > make sense to actually split it up into two FLIPs: Operator
> >> resource
> >> >>> > > management and dynamic slot allocation. I think these two FLIPs
> >> >>> could be
> >> >>> > > seen as orthogonal and it would decrease the scope of each
> >> individual
> >> >>> > FLIP.
> >> >>> > >
> >> >>> > > Some smaller comments:
> >> >>> > >
> >> >>> > > - I'm not sure whether we should pass in the default slot size
> >> via an
> >> >>> > > environment variable. Without having unified the way how Flink
> >> >>> components
> >> >>> > > are configured [1], I think it would be better to pass it in as
> >> part
> >> >>> of
> >> >>> > the
> >> >>> > > configuration.
> >> >>> > > - I would avoid returning a null value from
> >> >>> > > TaskExecutorGateway#requestResource if it cannot be fulfilled.
> >> >>> Either we
> >> >>> > > should introduce an explicit return value saying this or throw
> an
> >> >>> > > exception.
> >> >>> > >
> >> >>> > > Concerning Yangze's comments: I think you are right that it
> would
> >> be
> >> >>> > > helpful to make the selection strategy pluggable. Also batching
> >> slot
> >> >>> > > requests to the RM could be a good optimization. For the sake of
> >> >>> keeping
> >> >>> > > the scope of this FLIP smaller I would try to tackle these
> things
> >> >>> after
> >> >>> > the
> >> >>> > > initial version has been completed (without spoiling these
> >> >>> optimization
> >> >>> > > opportunities). In particular batching the slot requests depends
> >> on
> >> >>> the
> >> >>> > > current scheduler refactoring and could also be realized on the
> RM
> >> >>> side
> >> >>> > > only.
> >> >>> > >
> >> >>> > > [1]
> >> >>> > >
> >> >>> > >
> >> >>> >
> >> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
> >> >>> > >
> >> >>> > > Cheers,
> >> >>> > > Till
> >> >>> > >
> >> >>> > >
> >> >>> > >
> >> >>> > > On Fri, Aug 16, 2019 at 11:11 AM Yangze Guo <karma...@gmail.com
> >
> >> >>> wrote:
> >> >>> > >
> >> >>> > > > Hi, Xintong
> >> >>> > > >
> >> >>> > > > Thanks to propose this FLIP. The general design looks good to
> >> me,
> >> >>> +1
> >> >>> > > > for this feature.
> >> >>> > > >
> >> >>> > > > Since slots in the same task executor could have different
> >> resource
> >> >>> > > > profile, we will
> >> >>> > > > meet resource fragment problem. Think about this case:
> >> >>> > > >  - request A want 1G memory while request B & C want 0.5G
> memory
> >> >>> > > >  - There are two task executors T1 & T2 with 1G and 0.5G free
> >> >>> memory
> >> >>> > > > respectively
> >> >>> > > > If B come first and we cut a slot from T1 for B, A must wait
> for
> >> >>> the
> >> >>> > > > free resource from
> >> >>> > > > other task. But A could have been scheduled immediately if we
> >> cut a
> >> >>> > > > slot from T2 for B.
> >> >>> > > >
> >> >>> > > > The logic of findMatchingSlot now become finding a task
> executor
> >> >>> which
> >> >>> > > > has enough
> >> >>> > > > resource and then cut a slot from it. Current method could be
> >> seen
> >> >>> as
> >> >>> > > > "First-fit strategy",
> >> >>> > > > which works well in general but sometimes could not be the
> >> >>> optimization
> >> >>> > > > method.
> >> >>> > > >
> >> >>> > > > Actually, this problem could be abstracted as "Bin Packing
> >> >>> Problem"[1].
> >> >>> > > > Here are
> >> >>> > > > some common approximate algorithms:
> >> >>> > > > - First fit
> >> >>> > > > - Next fit
> >> >>> > > > - Best fit
> >> >>> > > >
> >> >>> > > > But it become multi-dimensional bin packing problem if we take
> >> CPU
> >> >>> > > > into account. It hard
> >> >>> > > > to define which one is best fit now. Some research addressed
> >> this
> >> >>> > > > problem, such like Tetris[2].
> >> >>> > > >
> >> >>> > > > Here are some thinking about it:
> >> >>> > > > 1. We could make the strategy of finding matching task
> executor
> >> >>> > > > pluginable. Let user to config the
> >> >>> > > > best strategy in their scenario.
> >> >>> > > > 2. We could support batch request interface in RM, because we
> >> have
> >> >>> > > > opportunities to optimize
> >> >>> > > > if we have more information. If we know the A, B, C at the
> same
> >> >>> time,
> >> >>> > > > we could always make the best decision.
> >> >>> > > >
> >> >>> > > > [1] http://www.or.deis.unibo.it/kp/Chapter8.pdf
> >> >>> > > > [2]
> >> >>> >
> >> https://www.cs.cmu.edu/~xia/resources/Documents/grandl_sigcomm14.pdf
> >> >>> > > >
> >> >>> > > > Best,
> >> >>> > > > Yangze Guo
> >> >>> > > >
> >> >>> > > > On Thu, Aug 15, 2019 at 10:40 PM Xintong Song <
> >> >>> tonysong...@gmail.com>
> >> >>> > > > wrote:
> >> >>> > > > >
> >> >>> > > > > Hi everyone,
> >> >>> > > > >
> >> >>> > > > > We would like to start a discussion thread on "FLIP-53: Fine
> >> >>> Grained
> >> >>> > > > > Resource Management"[1], where we propose how to improve
> Flink
> >> >>> > resource
> >> >>> > > > > management and scheduling.
> >> >>> > > > >
> >> >>> > > > > This FLIP mainly discusses the following issues.
> >> >>> > > > >
> >> >>> > > > >    - How to support tasks with fine grained resource
> >> >>> requirements.
> >> >>> > > > >    - How to unify resource management for jobs with /
> without
> >> >>> fine
> >> >>> > > > grained
> >> >>> > > > >    resource requirements.
> >> >>> > > > >    - How to unify resource management for streaming / batch
> >> jobs.
> >> >>> > > > >
> >> >>> > > > > Key changes proposed in the FLIP are as follows.
> >> >>> > > > >
> >> >>> > > > >    - Unify memory management for operators with / without
> fine
> >> >>> > grained
> >> >>> > > > >    resource requirements by applying a fraction based quota
> >> >>> > mechanism.
> >> >>> > > > >    - Unify resource scheduling for streaming and batch jobs
> by
> >> >>> > setting
> >> >>> > > > slot
> >> >>> > > > >    sharing groups for pipelined regions during compiling
> >> stage.
> >> >>> > > > >    - Dynamically allocate slots from task executors'
> available
> >> >>> > > resources.
> >> >>> > > > >
> >> >>> > > > > Please find more details in the FLIP wiki document [1].
> >> Looking
> >> >>> > forward
> >> >>> > > > to
> >> >>> > > > > your feedbacks.
> >> >>> > > > >
> >> >>> > > > > Thank you~
> >> >>> > > > >
> >> >>> > > > > Xintong Song
> >> >>> > > > >
> >> >>> > > > >
> >> >>> > > > > [1]
> >> >>> > > > >
> >> >>> > > >
> >> >>> > >
> >> >>> >
> >> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Resource+Management
> >> >>> > > >
> >> >>> > >
> >> >>> >
> >> >>>
> >> >>
> >>
> >
>

Reply via email to