Hi Xintong
    Thanks for your detailed explanation, I misunderstand the spill
behavior at first glance,
I get your point now. I think it will be a good addition to the current
execution mode.
Looking forward to it :)

Best,
Aitozi

Xintong Song <tonysong...@gmail.com> 于2022年5月20日周五 12:26写道:

> Hi Aitozi,
>
> In which case we can use the hybrid shuffle mode
>
> Just to directly answer this question, in addition to
> Weijie's explanations. For batch workload, if you want the workload to take
> advantage of as many resources as available, which ranges from a single
> slot to as many slots as the total tasks, you may consider hybrid shuffle
> mode. Admittedly, this may not always be wanted, e.g., users may not want
> to execute a job if there's too few resources available, or may not want a
> job taking too many of the cluster resources. That's why we propose hybrid
> shuffle as an additional option for batch users, rather than a replacement
> for Pipelined or Blocking mode.
>
> So you mean the hybrid shuffle mode will limit its usage to the bounded
> > source, Right ?
> >
> Yes.
>
> One more question, with the bounded data and partly of the stage is running
> > in the Pipelined shuffle mode, what will be the behavior of the task
> > failure, Is the checkpoint enabled for these running stages or will it
> > re-run after the failure?
> >
> There's no checkpoints. The failover behavior depends on the spilling
> strategy.
> - In the first version, we only consider a selective spilling strategy,
> which means spill data as little as possible to the disk, which means in
> case of failover upstream tasks need to be restarted to reproduce the
> complete intermediate results.
> - An alternative strategy we may introduce in future if needed is to spill
> the complete intermediate results. That avoids restarting upstream tasks in
> case of failover, because the produced intermediate results can be
> re-consumed, at the cost of more disk IO load.
> With both strategies, the trade-off between failover cost and IO load is
> for the user to decide. This is also discussed in the MemoryDataManager
> section of the FLIP.
>
> Best,
>
> Xintong
>
>
>
> On Fri, May 20, 2022 at 12:10 PM Aitozi <gjying1...@gmail.com> wrote:
>
> > Thanks Weijie for your answer. So you mean the hybrid shuffle mode will
> > limit
> > its usage to the bounded source, Right ?
> > One more question, with the bounded data and partly of the stage is
> running
> > in the Pipelined shuffle mode, what will be the behavior of the task
> > failure, Is the
> > checkpoint enabled for these running stages or will it re-run after the
> > failure?
> >
> > Best,
> > Aitozi
> >
> > weijie guo <guoweijieres...@gmail.com> 于2022年5月20日周五 10:45写道:
> >
> > > Hi, Aitozi:
> > >
> > > Thank you for the feedback!
> > > Here are some of my thoughts on your question
> > >
> > > >>> 1.If there is an unbounded data source, but only have resource to
> > > schedule the first stage, will it bring the big burden to the
> > disk/shuffle
> > > service which will occupy all the resource I think.
> > > First of all, Hybrid Shuffle Mode is oriented to the batch job
> scenario,
> > so
> > > there is no problem of unbounded data sources. Secondly, if you
> consider
> > > the stream scenario, I think Pipelined Shuffle should still be the best
> > > choice at present. For an unbounded data stream, it is not meaningful
> to
> > > only run some stages.
> > >
> > > >>> 2. Which kind of job will benefit from the hybrid shuffle mode. In
> > > other words, In which case we can use the hybrid shuffle mode:
> > > Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
> > > shuffle mode can effectively utilize cluster resources and avoid some
> > > unnecessary disk IO overhead. For OLAP scenarios, which are
> characterized
> > > by a large number of concurrently submitted short batch jobs, hybrid
> > > shuffle can solve the scheduling deadlock problem of pipelined shuffle
> > and
> > > achieve similar performance.
> > >
> > > Best regards,
> > >
> > > Weijie
> > >
> > >
> > > Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 08:05写道:
> > >
> > > > Hi Weijie:
> > > >
> > > >      Thanks for the nice FLIP, I have couple questions about this:
> > > >
> > > > 1) In the hybrid shuffle mode, the shuffle mode is decided by the
> > > resource.
> > > > If there
> > > > is an unbounded data source, but only have resource to schedule the
> > first
> > > > stage, will it
> > > > bring the big burden to the disk/shuffle service which will occupy
> all
> > > the
> > > > resource I think.
> > > >
> > > > 2) Which kind of job will benefit from the hybrid shuffle mode. In
> > other
> > > > words, In which
> > > > case we can use the hybrid shuffle mode:
> > > > - For batch job want to use more resource to reduce the e2e time ?
> > > > - Or for streaming job which may lack of resource temporarily ?
> > > > - Or for OLAP job which will try to make best use of available
> > resources
> > > as
> > > > you mentioned to finish the query?
> > > > Just want to know the typical use case for the Hybrid shuffle mode :)
> > > >
> > > >
> > > > Best,
> > > > Aitozi.
> > > >
> > > > weijie guo <guoweijieres...@gmail.com> 于2022年5月19日周四 18:33写道:
> > > >
> > > > > Yangze, Thank you for the feedback!
> > > > > Here's my thoughts for your questions:
> > > > >
> > > > > >>> How do we decide the size of the buffer pool in
> MemoryDataManager
> > > and
> > > > > the read buffers in FileDataManager?
> > > > > The BufferPool in MemoryDataManager is the LocalBufferPool used by
> > > > > ResultPartition, and the size is the same as the current
> > implementation
> > > > of
> > > > > sort-merge shuffle. In other words, the minimum value of BufferPool
> > is
> > > a
> > > > > configurable fixed value, and the maximum value is Math.max(min, 4
> *
> > > > > numSubpartitions). The default value can be determined by running
> the
> > > > > TPC-DS tests.
> > > > > Read buffers in FileDataManager are requested from the
> > > > > BatchShuffleReadBufferPool shared by TaskManager, it's size
> > controlled
> > > by
> > > > > *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the
> > default
> > > > > value is 32M, which is consistent with the current sort-merge
> shuffle
> > > > > logic.
> > > > >
> > > > > >>> Is there an upper limit for the sum of them? If there is, how
> > does
> > > > > MemoryDataManager and FileDataManager sync the memory usage?
> > > > > The buffers of the MemoryDataManager are limited by the size of the
> > > > > LocalBufferPool, and the upper limit is the size of the Network
> > Memory.
> > > > The
> > > > > buffers of the FileDataManager are directly requested from
> > > > > UnpooledOffHeapMemory, and are also limited by the size of the
> > > framework
> > > > > off-heap memory. I think there should be no need for additional
> > > > > synchronization mechanisms.
> > > > >
> > > > > >>> How do you disable the slot sharing? If user configures both
> the
> > > slot
> > > > > sharing group and hybrid shuffle, what will happen to that job?
> > > > > I think we can print a warning log when Hybrid Shuffle is enabled
> and
> > > SSG
> > > > > is configured during the JobGraph compilation stage, and fallback
> to
> > > the
> > > > > region slot sharing group by default. Of course, it will be
> > emphasized
> > > in
> > > > > the document that we do not currently support SSG, If configured,
> it
> > > will
> > > > > fall back to the default.
> > > > >
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Weijie
> > > > >
> > > > >
> > > > > Yangze Guo <karma...@gmail.com> 于2022年5月19日周四 16:25写道:
> > > > >
> > > > > > Thanks for driving this. Xintong and Weijie.
> > > > > >
> > > > > > I believe this feature will make Flink a better batch/OLAP
> engine.
> > +1
> > > > > > for the overall design.
> > > > > >
> > > > > > Some questions:
> > > > > > 1. How do we decide the size of the buffer pool in
> > MemoryDataManager
> > > > > > and the read buffers in FileDataManager?
> > > > > > 2. Is there an upper limit for the sum of them? If there is, how
> > does
> > > > > > MemoryDataManager and FileDataManager sync the memory usage?
> > > > > > 3. How do you disable the slot sharing? If user configures both
> the
> > > > > > slot sharing group and hybrid shuffle, what will happen to that
> > job?
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Yangze Guo
> > > > > >
> > > > > > On Thu, May 19, 2022 at 2:41 PM Xintong Song <
> > tonysong...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > Thanks for preparing this FLIP, Weijie.
> > > > > > >
> > > > > > > I think this is a good improvement on batch resource
> elasticity.
> > > > > Looking
> > > > > > > forward to the community feedback.
> > > > > > >
> > > > > > > Best,
> > > > > > >
> > > > > > > Xintong
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Thu, May 19, 2022 at 2:31 PM weijie guo <
> > > > guoweijieres...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > >
> > > > > > > > I’d like to start a discussion about FLIP-235[1], which
> > > introduce a
> > > > > > new shuffle mode
> > > > > > > >  can overcome some of the problems of Pipelined Shuffle and
> > > > Blocking
> > > > > > Shuffle in batch scenarios.
> > > > > > > >
> > > > > > > >
> > > > > > > > Currently in Flink, task scheduling is more or less
> constrained
> > > by
> > > > > the
> > > > > > shuffle implementations.
> > > > > > > > This will bring the following disadvantages:
> > > > > > > >
> > > > > > > >    1. Pipelined Shuffle:
> > > > > > > >     For pipelined shuffle, the upstream and downstream tasks
> > are
> > > > > > required to be deployed at the same time, to avoid upstream tasks
> > > being
> > > > > > blocked forever. This is fine when there are enough resources for
> > > both
> > > > > > upstream and downstream tasks to run simultaneously, but will
> cause
> > > the
> > > > > > following problems otherwise:
> > > > > > > >    1.
> > > > > > > >       Pipelined shuffle connected tasks (i.e., a pipelined
> > > region)
> > > > > > cannot be executed until obtaining resources for all of them,
> > > resulting
> > > > > in
> > > > > > longer job finishing time and poorer resource efficiency due to
> > > holding
> > > > > > part of the resources idle while waiting for the rest.
> > > > > > > >       2.
> > > > > > > >       More severely, if multiple jobs each hold part of the
> > > cluster
> > > > > > resources and are waiting for more, a deadlock would occur. The
> > > chance
> > > > is
> > > > > > not trivial, especially for scenarios such as OLAP where
> concurrent
> > > job
> > > > > > submissions are frequent.
> > > > > > > >       2. Blocking Shuffle:
> > > > > > > >     For blocking shuffle, execution of downstream tasks must
> > wait
> > > > for
> > > > > > all upstream tasks to finish, despite there might be more
> resources
> > > > > > available. The sequential execution of upstream and downstream
> > tasks
> > > > > > significantly increase the job finishing time, and the disk IO
> > > workload
> > > > > for
> > > > > > spilling and loading full intermediate data also affects the
> > > > performance.
> > > > > > > >
> > > > > > > >
> > > > > > > > We believe the root cause of the above problems is that
> shuffle
> > > > > > implementations put unnecessary constraints on task scheduling.
> > > > > > > >
> > > > > > > >
> > > > > > > > To solve this problem, Xintong Song and I propose to
> introduce
> > > > hybrid
> > > > > > shuffle to minimize the scheduling constraints. With Hybrid
> > Shuffle,
> > > > > Flink
> > > > > > should:
> > > > > > > >
> > > > > > > >    1. Make best use of available resources.
> > > > > > > >     Ideally, we want Flink to always make progress if
> possible.
> > > > That
> > > > > > is to say, it should always execute a pending task if there are
> > > > resources
> > > > > > available for that task.
> > > > > > > >    2. Minimize disk IO load.
> > > > > > > >     In-flight data should be consumed directly from memory as
> > > much
> > > > as
> > > > > > possible. Only data that is not consumed timely should be spilled
> > to
> > > > > disk.
> > > > > > > >
> > > > > > > > You can find more details in FLIP-235. Looking forward to
> your
> > > > > > feedback.
> > > > > > > >
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > >
> > > > > > > > Weijie
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to