Hi All,
Thank you for your attention and feedback.
Do you have any other comments? If there are no other questions, I'll vote
on FLIP-235 tomorrow.

Best regards,

Weijie


Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 13:22写道:

> Hi Xintong
>     Thanks for your detailed explanation, I misunderstand the spill
> behavior at first glance,
> I get your point now. I think it will be a good addition to the current
> execution mode.
> Looking forward to it :)
>
> Best,
> Aitozi
>
> Xintong Song <tonysong...@gmail.com> 于2022年5月20日周五 12:26写道:
>
> > Hi Aitozi,
> >
> > In which case we can use the hybrid shuffle mode
> >
> > Just to directly answer this question, in addition to
> > Weijie's explanations. For batch workload, if you want the workload to
> take
> > advantage of as many resources as available, which ranges from a single
> > slot to as many slots as the total tasks, you may consider hybrid shuffle
> > mode. Admittedly, this may not always be wanted, e.g., users may not want
> > to execute a job if there's too few resources available, or may not want
> a
> > job taking too many of the cluster resources. That's why we propose
> hybrid
> > shuffle as an additional option for batch users, rather than a
> replacement
> > for Pipelined or Blocking mode.
> >
> > So you mean the hybrid shuffle mode will limit its usage to the bounded
> > > source, Right ?
> > >
> > Yes.
> >
> > One more question, with the bounded data and partly of the stage is
> running
> > > in the Pipelined shuffle mode, what will be the behavior of the task
> > > failure, Is the checkpoint enabled for these running stages or will it
> > > re-run after the failure?
> > >
> > There's no checkpoints. The failover behavior depends on the spilling
> > strategy.
> > - In the first version, we only consider a selective spilling strategy,
> > which means spill data as little as possible to the disk, which means in
> > case of failover upstream tasks need to be restarted to reproduce the
> > complete intermediate results.
> > - An alternative strategy we may introduce in future if needed is to
> spill
> > the complete intermediate results. That avoids restarting upstream tasks
> in
> > case of failover, because the produced intermediate results can be
> > re-consumed, at the cost of more disk IO load.
> > With both strategies, the trade-off between failover cost and IO load is
> > for the user to decide. This is also discussed in the MemoryDataManager
> > section of the FLIP.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Fri, May 20, 2022 at 12:10 PM Aitozi <gjying1...@gmail.com> wrote:
> >
> > > Thanks Weijie for your answer. So you mean the hybrid shuffle mode will
> > > limit
> > > its usage to the bounded source, Right ?
> > > One more question, with the bounded data and partly of the stage is
> > running
> > > in the Pipelined shuffle mode, what will be the behavior of the task
> > > failure, Is the
> > > checkpoint enabled for these running stages or will it re-run after the
> > > failure?
> > >
> > > Best,
> > > Aitozi
> > >
> > > weijie guo <guoweijieres...@gmail.com> 于2022年5月20日周五 10:45写道:
> > >
> > > > Hi, Aitozi:
> > > >
> > > > Thank you for the feedback!
> > > > Here are some of my thoughts on your question
> > > >
> > > > >>> 1.If there is an unbounded data source, but only have resource to
> > > > schedule the first stage, will it bring the big burden to the
> > > disk/shuffle
> > > > service which will occupy all the resource I think.
> > > > First of all, Hybrid Shuffle Mode is oriented to the batch job
> > scenario,
> > > so
> > > > there is no problem of unbounded data sources. Secondly, if you
> > consider
> > > > the stream scenario, I think Pipelined Shuffle should still be the
> best
> > > > choice at present. For an unbounded data stream, it is not meaningful
> > to
> > > > only run some stages.
> > > >
> > > > >>> 2. Which kind of job will benefit from the hybrid shuffle mode.
> In
> > > > other words, In which case we can use the hybrid shuffle mode:
> > > > Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
> > > > shuffle mode can effectively utilize cluster resources and avoid some
> > > > unnecessary disk IO overhead. For OLAP scenarios, which are
> > characterized
> > > > by a large number of concurrently submitted short batch jobs, hybrid
> > > > shuffle can solve the scheduling deadlock problem of pipelined
> shuffle
> > > and
> > > > achieve similar performance.
> > > >
> > > > Best regards,
> > > >
> > > > Weijie
> > > >
> > > >
> > > > Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 08:05写道:
> > > >
> > > > > Hi Weijie:
> > > > >
> > > > >      Thanks for the nice FLIP, I have couple questions about this:
> > > > >
> > > > > 1) In the hybrid shuffle mode, the shuffle mode is decided by the
> > > > resource.
> > > > > If there
> > > > > is an unbounded data source, but only have resource to schedule the
> > > first
> > > > > stage, will it
> > > > > bring the big burden to the disk/shuffle service which will occupy
> > all
> > > > the
> > > > > resource I think.
> > > > >
> > > > > 2) Which kind of job will benefit from the hybrid shuffle mode. In
> > > other
> > > > > words, In which
> > > > > case we can use the hybrid shuffle mode:
> > > > > - For batch job want to use more resource to reduce the e2e time ?
> > > > > - Or for streaming job which may lack of resource temporarily ?
> > > > > - Or for OLAP job which will try to make best use of available
> > > resources
> > > > as
> > > > > you mentioned to finish the query?
> > > > > Just want to know the typical use case for the Hybrid shuffle mode
> :)
> > > > >
> > > > >
> > > > > Best,
> > > > > Aitozi.
> > > > >
> > > > > weijie guo <guoweijieres...@gmail.com> 于2022年5月19日周四 18:33写道:
> > > > >
> > > > > > Yangze, Thank you for the feedback!
> > > > > > Here's my thoughts for your questions:
> > > > > >
> > > > > > >>> How do we decide the size of the buffer pool in
> > MemoryDataManager
> > > > and
> > > > > > the read buffers in FileDataManager?
> > > > > > The BufferPool in MemoryDataManager is the LocalBufferPool used
> by
> > > > > > ResultPartition, and the size is the same as the current
> > > implementation
> > > > > of
> > > > > > sort-merge shuffle. In other words, the minimum value of
> BufferPool
> > > is
> > > > a
> > > > > > configurable fixed value, and the maximum value is Math.max(min,
> 4
> > *
> > > > > > numSubpartitions). The default value can be determined by running
> > the
> > > > > > TPC-DS tests.
> > > > > > Read buffers in FileDataManager are requested from the
> > > > > > BatchShuffleReadBufferPool shared by TaskManager, it's size
> > > controlled
> > > > by
> > > > > > *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the
> > > default
> > > > > > value is 32M, which is consistent with the current sort-merge
> > shuffle
> > > > > > logic.
> > > > > >
> > > > > > >>> Is there an upper limit for the sum of them? If there is, how
> > > does
> > > > > > MemoryDataManager and FileDataManager sync the memory usage?
> > > > > > The buffers of the MemoryDataManager are limited by the size of
> the
> > > > > > LocalBufferPool, and the upper limit is the size of the Network
> > > Memory.
> > > > > The
> > > > > > buffers of the FileDataManager are directly requested from
> > > > > > UnpooledOffHeapMemory, and are also limited by the size of the
> > > > framework
> > > > > > off-heap memory. I think there should be no need for additional
> > > > > > synchronization mechanisms.
> > > > > >
> > > > > > >>> How do you disable the slot sharing? If user configures both
> > the
> > > > slot
> > > > > > sharing group and hybrid shuffle, what will happen to that job?
> > > > > > I think we can print a warning log when Hybrid Shuffle is enabled
> > and
> > > > SSG
> > > > > > is configured during the JobGraph compilation stage, and fallback
> > to
> > > > the
> > > > > > region slot sharing group by default. Of course, it will be
> > > emphasized
> > > > in
> > > > > > the document that we do not currently support SSG, If configured,
> > it
> > > > will
> > > > > > fall back to the default.
> > > > > >
> > > > > >
> > > > > > Best regards,
> > > > > >
> > > > > > Weijie
> > > > > >
> > > > > >
> > > > > > Yangze Guo <karma...@gmail.com> 于2022年5月19日周四 16:25写道:
> > > > > >
> > > > > > > Thanks for driving this. Xintong and Weijie.
> > > > > > >
> > > > > > > I believe this feature will make Flink a better batch/OLAP
> > engine.
> > > +1
> > > > > > > for the overall design.
> > > > > > >
> > > > > > > Some questions:
> > > > > > > 1. How do we decide the size of the buffer pool in
> > > MemoryDataManager
> > > > > > > and the read buffers in FileDataManager?
> > > > > > > 2. Is there an upper limit for the sum of them? If there is,
> how
> > > does
> > > > > > > MemoryDataManager and FileDataManager sync the memory usage?
> > > > > > > 3. How do you disable the slot sharing? If user configures both
> > the
> > > > > > > slot sharing group and hybrid shuffle, what will happen to that
> > > job?
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Yangze Guo
> > > > > > >
> > > > > > > On Thu, May 19, 2022 at 2:41 PM Xintong Song <
> > > tonysong...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > Thanks for preparing this FLIP, Weijie.
> > > > > > > >
> > > > > > > > I think this is a good improvement on batch resource
> > elasticity.
> > > > > > Looking
> > > > > > > > forward to the community feedback.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > >
> > > > > > > > Xintong
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, May 19, 2022 at 2:31 PM weijie guo <
> > > > > guoweijieres...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > I’d like to start a discussion about FLIP-235[1], which
> > > > introduce a
> > > > > > > new shuffle mode
> > > > > > > > >  can overcome some of the problems of Pipelined Shuffle and
> > > > > Blocking
> > > > > > > Shuffle in batch scenarios.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Currently in Flink, task scheduling is more or less
> > constrained
> > > > by
> > > > > > the
> > > > > > > shuffle implementations.
> > > > > > > > > This will bring the following disadvantages:
> > > > > > > > >
> > > > > > > > >    1. Pipelined Shuffle:
> > > > > > > > >     For pipelined shuffle, the upstream and downstream
> tasks
> > > are
> > > > > > > required to be deployed at the same time, to avoid upstream
> tasks
> > > > being
> > > > > > > blocked forever. This is fine when there are enough resources
> for
> > > > both
> > > > > > > upstream and downstream tasks to run simultaneously, but will
> > cause
> > > > the
> > > > > > > following problems otherwise:
> > > > > > > > >    1.
> > > > > > > > >       Pipelined shuffle connected tasks (i.e., a pipelined
> > > > region)
> > > > > > > cannot be executed until obtaining resources for all of them,
> > > > resulting
> > > > > > in
> > > > > > > longer job finishing time and poorer resource efficiency due to
> > > > holding
> > > > > > > part of the resources idle while waiting for the rest.
> > > > > > > > >       2.
> > > > > > > > >       More severely, if multiple jobs each hold part of the
> > > > cluster
> > > > > > > resources and are waiting for more, a deadlock would occur. The
> > > > chance
> > > > > is
> > > > > > > not trivial, especially for scenarios such as OLAP where
> > concurrent
> > > > job
> > > > > > > submissions are frequent.
> > > > > > > > >       2. Blocking Shuffle:
> > > > > > > > >     For blocking shuffle, execution of downstream tasks
> must
> > > wait
> > > > > for
> > > > > > > all upstream tasks to finish, despite there might be more
> > resources
> > > > > > > available. The sequential execution of upstream and downstream
> > > tasks
> > > > > > > significantly increase the job finishing time, and the disk IO
> > > > workload
> > > > > > for
> > > > > > > spilling and loading full intermediate data also affects the
> > > > > performance.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > We believe the root cause of the above problems is that
> > shuffle
> > > > > > > implementations put unnecessary constraints on task scheduling.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > To solve this problem, Xintong Song and I propose to
> > introduce
> > > > > hybrid
> > > > > > > shuffle to minimize the scheduling constraints. With Hybrid
> > > Shuffle,
> > > > > > Flink
> > > > > > > should:
> > > > > > > > >
> > > > > > > > >    1. Make best use of available resources.
> > > > > > > > >     Ideally, we want Flink to always make progress if
> > possible.
> > > > > That
> > > > > > > is to say, it should always execute a pending task if there are
> > > > > resources
> > > > > > > available for that task.
> > > > > > > > >    2. Minimize disk IO load.
> > > > > > > > >     In-flight data should be consumed directly from memory
> as
> > > > much
> > > > > as
> > > > > > > possible. Only data that is not consumed timely should be
> spilled
> > > to
> > > > > > disk.
> > > > > > > > >
> > > > > > > > > You can find more details in FLIP-235. Looking forward to
> > your
> > > > > > > feedback.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Best regards,
> > > > > > > > >
> > > > > > > > > Weijie
> > > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to