>
> Will this also allow spilling everything to disk while also forwarding
> data to the next task?
>

Yes, as long as the downstream task is started, this always forward the
data, even while spilling everything.

This would allow us to improve fine-grained recovery by no longer being
> constrained to pipelined regions.


I think it helps preventing restarts of the upstreams for a failed task,
but not the downstreams. Because there's no guarantee a restarted task will
prevent exactly same data (in terms of order) as the previous execution,
thus downstreams cannot resume consuming the data.


Best,

Xintong



On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler <ches...@apache.org> wrote:

> Will this also allow spilling everything to disk while also forwarding
> data to the next task?
>
> This would allow us to improve fine-grained recovery by no longer being
> constrained to pipelined regions.
>
> On 25/05/2022 05:55, weijie guo wrote:
> > Hi All,
> > Thank you for your attention and feedback.
> > Do you have any other comments? If there are no other questions, I'll
> vote
> > on FLIP-235 tomorrow.
> >
> > Best regards,
> >
> > Weijie
> >
> >
> > Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 13:22写道:
> >
> >> Hi Xintong
> >>      Thanks for your detailed explanation, I misunderstand the spill
> >> behavior at first glance,
> >> I get your point now. I think it will be a good addition to the current
> >> execution mode.
> >> Looking forward to it :)
> >>
> >> Best,
> >> Aitozi
> >>
> >> Xintong Song <tonysong...@gmail.com> 于2022年5月20日周五 12:26写道:
> >>
> >>> Hi Aitozi,
> >>>
> >>> In which case we can use the hybrid shuffle mode
> >>>
> >>> Just to directly answer this question, in addition to
> >>> Weijie's explanations. For batch workload, if you want the workload to
> >> take
> >>> advantage of as many resources as available, which ranges from a single
> >>> slot to as many slots as the total tasks, you may consider hybrid
> shuffle
> >>> mode. Admittedly, this may not always be wanted, e.g., users may not
> want
> >>> to execute a job if there's too few resources available, or may not
> want
> >> a
> >>> job taking too many of the cluster resources. That's why we propose
> >> hybrid
> >>> shuffle as an additional option for batch users, rather than a
> >> replacement
> >>> for Pipelined or Blocking mode.
> >>>
> >>> So you mean the hybrid shuffle mode will limit its usage to the bounded
> >>>> source, Right ?
> >>>>
> >>> Yes.
> >>>
> >>> One more question, with the bounded data and partly of the stage is
> >> running
> >>>> in the Pipelined shuffle mode, what will be the behavior of the task
> >>>> failure, Is the checkpoint enabled for these running stages or will it
> >>>> re-run after the failure?
> >>>>
> >>> There's no checkpoints. The failover behavior depends on the spilling
> >>> strategy.
> >>> - In the first version, we only consider a selective spilling strategy,
> >>> which means spill data as little as possible to the disk, which means
> in
> >>> case of failover upstream tasks need to be restarted to reproduce the
> >>> complete intermediate results.
> >>> - An alternative strategy we may introduce in future if needed is to
> >> spill
> >>> the complete intermediate results. That avoids restarting upstream
> tasks
> >> in
> >>> case of failover, because the produced intermediate results can be
> >>> re-consumed, at the cost of more disk IO load.
> >>> With both strategies, the trade-off between failover cost and IO load
> is
> >>> for the user to decide. This is also discussed in the MemoryDataManager
> >>> section of the FLIP.
> >>>
> >>> Best,
> >>>
> >>> Xintong
> >>>
> >>>
> >>>
> >>> On Fri, May 20, 2022 at 12:10 PM Aitozi <gjying1...@gmail.com> wrote:
> >>>
> >>>> Thanks Weijie for your answer. So you mean the hybrid shuffle mode
> will
> >>>> limit
> >>>> its usage to the bounded source, Right ?
> >>>> One more question, with the bounded data and partly of the stage is
> >>> running
> >>>> in the Pipelined shuffle mode, what will be the behavior of the task
> >>>> failure, Is the
> >>>> checkpoint enabled for these running stages or will it re-run after
> the
> >>>> failure?
> >>>>
> >>>> Best,
> >>>> Aitozi
> >>>>
> >>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月20日周五 10:45写道:
> >>>>
> >>>>> Hi, Aitozi:
> >>>>>
> >>>>> Thank you for the feedback!
> >>>>> Here are some of my thoughts on your question
> >>>>>
> >>>>>>>> 1.If there is an unbounded data source, but only have resource to
> >>>>> schedule the first stage, will it bring the big burden to the
> >>>> disk/shuffle
> >>>>> service which will occupy all the resource I think.
> >>>>> First of all, Hybrid Shuffle Mode is oriented to the batch job
> >>> scenario,
> >>>> so
> >>>>> there is no problem of unbounded data sources. Secondly, if you
> >>> consider
> >>>>> the stream scenario, I think Pipelined Shuffle should still be the
> >> best
> >>>>> choice at present. For an unbounded data stream, it is not meaningful
> >>> to
> >>>>> only run some stages.
> >>>>>
> >>>>>>>> 2. Which kind of job will benefit from the hybrid shuffle mode.
> >> In
> >>>>> other words, In which case we can use the hybrid shuffle mode:
> >>>>> Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid
> >>>>> shuffle mode can effectively utilize cluster resources and avoid some
> >>>>> unnecessary disk IO overhead. For OLAP scenarios, which are
> >>> characterized
> >>>>> by a large number of concurrently submitted short batch jobs, hybrid
> >>>>> shuffle can solve the scheduling deadlock problem of pipelined
> >> shuffle
> >>>> and
> >>>>> achieve similar performance.
> >>>>>
> >>>>> Best regards,
> >>>>>
> >>>>> Weijie
> >>>>>
> >>>>>
> >>>>> Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 08:05写道:
> >>>>>
> >>>>>> Hi Weijie:
> >>>>>>
> >>>>>>       Thanks for the nice FLIP, I have couple questions about this:
> >>>>>>
> >>>>>> 1) In the hybrid shuffle mode, the shuffle mode is decided by the
> >>>>> resource.
> >>>>>> If there
> >>>>>> is an unbounded data source, but only have resource to schedule the
> >>>> first
> >>>>>> stage, will it
> >>>>>> bring the big burden to the disk/shuffle service which will occupy
> >>> all
> >>>>> the
> >>>>>> resource I think.
> >>>>>>
> >>>>>> 2) Which kind of job will benefit from the hybrid shuffle mode. In
> >>>> other
> >>>>>> words, In which
> >>>>>> case we can use the hybrid shuffle mode:
> >>>>>> - For batch job want to use more resource to reduce the e2e time ?
> >>>>>> - Or for streaming job which may lack of resource temporarily ?
> >>>>>> - Or for OLAP job which will try to make best use of available
> >>>> resources
> >>>>> as
> >>>>>> you mentioned to finish the query?
> >>>>>> Just want to know the typical use case for the Hybrid shuffle mode
> >> :)
> >>>>>>
> >>>>>> Best,
> >>>>>> Aitozi.
> >>>>>>
> >>>>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月19日周四 18:33写道:
> >>>>>>
> >>>>>>> Yangze, Thank you for the feedback!
> >>>>>>> Here's my thoughts for your questions:
> >>>>>>>
> >>>>>>>>>> How do we decide the size of the buffer pool in
> >>> MemoryDataManager
> >>>>> and
> >>>>>>> the read buffers in FileDataManager?
> >>>>>>> The BufferPool in MemoryDataManager is the LocalBufferPool used
> >> by
> >>>>>>> ResultPartition, and the size is the same as the current
> >>>> implementation
> >>>>>> of
> >>>>>>> sort-merge shuffle. In other words, the minimum value of
> >> BufferPool
> >>>> is
> >>>>> a
> >>>>>>> configurable fixed value, and the maximum value is Math.max(min,
> >> 4
> >>> *
> >>>>>>> numSubpartitions). The default value can be determined by running
> >>> the
> >>>>>>> TPC-DS tests.
> >>>>>>> Read buffers in FileDataManager are requested from the
> >>>>>>> BatchShuffleReadBufferPool shared by TaskManager, it's size
> >>>> controlled
> >>>>> by
> >>>>>>> *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the
> >>>> default
> >>>>>>> value is 32M, which is consistent with the current sort-merge
> >>> shuffle
> >>>>>>> logic.
> >>>>>>>
> >>>>>>>>>> Is there an upper limit for the sum of them? If there is, how
> >>>> does
> >>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
> >>>>>>> The buffers of the MemoryDataManager are limited by the size of
> >> the
> >>>>>>> LocalBufferPool, and the upper limit is the size of the Network
> >>>> Memory.
> >>>>>> The
> >>>>>>> buffers of the FileDataManager are directly requested from
> >>>>>>> UnpooledOffHeapMemory, and are also limited by the size of the
> >>>>> framework
> >>>>>>> off-heap memory. I think there should be no need for additional
> >>>>>>> synchronization mechanisms.
> >>>>>>>
> >>>>>>>>>> How do you disable the slot sharing? If user configures both
> >>> the
> >>>>> slot
> >>>>>>> sharing group and hybrid shuffle, what will happen to that job?
> >>>>>>> I think we can print a warning log when Hybrid Shuffle is enabled
> >>> and
> >>>>> SSG
> >>>>>>> is configured during the JobGraph compilation stage, and fallback
> >>> to
> >>>>> the
> >>>>>>> region slot sharing group by default. Of course, it will be
> >>>> emphasized
> >>>>> in
> >>>>>>> the document that we do not currently support SSG, If configured,
> >>> it
> >>>>> will
> >>>>>>> fall back to the default.
> >>>>>>>
> >>>>>>>
> >>>>>>> Best regards,
> >>>>>>>
> >>>>>>> Weijie
> >>>>>>>
> >>>>>>>
> >>>>>>> Yangze Guo <karma...@gmail.com> 于2022年5月19日周四 16:25写道:
> >>>>>>>
> >>>>>>>> Thanks for driving this. Xintong and Weijie.
> >>>>>>>>
> >>>>>>>> I believe this feature will make Flink a better batch/OLAP
> >>> engine.
> >>>> +1
> >>>>>>>> for the overall design.
> >>>>>>>>
> >>>>>>>> Some questions:
> >>>>>>>> 1. How do we decide the size of the buffer pool in
> >>>> MemoryDataManager
> >>>>>>>> and the read buffers in FileDataManager?
> >>>>>>>> 2. Is there an upper limit for the sum of them? If there is,
> >> how
> >>>> does
> >>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
> >>>>>>>> 3. How do you disable the slot sharing? If user configures both
> >>> the
> >>>>>>>> slot sharing group and hybrid shuffle, what will happen to that
> >>>> job?
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Yangze Guo
> >>>>>>>>
> >>>>>>>> On Thu, May 19, 2022 at 2:41 PM Xintong Song <
> >>>> tonysong...@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>> Thanks for preparing this FLIP, Weijie.
> >>>>>>>>>
> >>>>>>>>> I think this is a good improvement on batch resource
> >>> elasticity.
> >>>>>>> Looking
> >>>>>>>>> forward to the community feedback.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>>
> >>>>>>>>> Xintong
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Thu, May 19, 2022 at 2:31 PM weijie guo <
> >>>>>> guoweijieres...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi all,
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> I’d like to start a discussion about FLIP-235[1], which
> >>>>> introduce a
> >>>>>>>> new shuffle mode
> >>>>>>>>>>   can overcome some of the problems of Pipelined Shuffle and
> >>>>>> Blocking
> >>>>>>>> Shuffle in batch scenarios.
> >>>>>>>>>>
> >>>>>>>>>> Currently in Flink, task scheduling is more or less
> >>> constrained
> >>>>> by
> >>>>>>> the
> >>>>>>>> shuffle implementations.
> >>>>>>>>>> This will bring the following disadvantages:
> >>>>>>>>>>
> >>>>>>>>>>     1. Pipelined Shuffle:
> >>>>>>>>>>      For pipelined shuffle, the upstream and downstream
> >> tasks
> >>>> are
> >>>>>>>> required to be deployed at the same time, to avoid upstream
> >> tasks
> >>>>> being
> >>>>>>>> blocked forever. This is fine when there are enough resources
> >> for
> >>>>> both
> >>>>>>>> upstream and downstream tasks to run simultaneously, but will
> >>> cause
> >>>>> the
> >>>>>>>> following problems otherwise:
> >>>>>>>>>>     1.
> >>>>>>>>>>        Pipelined shuffle connected tasks (i.e., a pipelined
> >>>>> region)
> >>>>>>>> cannot be executed until obtaining resources for all of them,
> >>>>> resulting
> >>>>>>> in
> >>>>>>>> longer job finishing time and poorer resource efficiency due to
> >>>>> holding
> >>>>>>>> part of the resources idle while waiting for the rest.
> >>>>>>>>>>        2.
> >>>>>>>>>>        More severely, if multiple jobs each hold part of the
> >>>>> cluster
> >>>>>>>> resources and are waiting for more, a deadlock would occur. The
> >>>>> chance
> >>>>>> is
> >>>>>>>> not trivial, especially for scenarios such as OLAP where
> >>> concurrent
> >>>>> job
> >>>>>>>> submissions are frequent.
> >>>>>>>>>>        2. Blocking Shuffle:
> >>>>>>>>>>      For blocking shuffle, execution of downstream tasks
> >> must
> >>>> wait
> >>>>>> for
> >>>>>>>> all upstream tasks to finish, despite there might be more
> >>> resources
> >>>>>>>> available. The sequential execution of upstream and downstream
> >>>> tasks
> >>>>>>>> significantly increase the job finishing time, and the disk IO
> >>>>> workload
> >>>>>>> for
> >>>>>>>> spilling and loading full intermediate data also affects the
> >>>>>> performance.
> >>>>>>>>>>
> >>>>>>>>>> We believe the root cause of the above problems is that
> >>> shuffle
> >>>>>>>> implementations put unnecessary constraints on task scheduling.
> >>>>>>>>>>
> >>>>>>>>>> To solve this problem, Xintong Song and I propose to
> >>> introduce
> >>>>>> hybrid
> >>>>>>>> shuffle to minimize the scheduling constraints. With Hybrid
> >>>> Shuffle,
> >>>>>>> Flink
> >>>>>>>> should:
> >>>>>>>>>>     1. Make best use of available resources.
> >>>>>>>>>>      Ideally, we want Flink to always make progress if
> >>> possible.
> >>>>>> That
> >>>>>>>> is to say, it should always execute a pending task if there are
> >>>>>> resources
> >>>>>>>> available for that task.
> >>>>>>>>>>     2. Minimize disk IO load.
> >>>>>>>>>>      In-flight data should be consumed directly from memory
> >> as
> >>>>> much
> >>>>>> as
> >>>>>>>> possible. Only data that is not consumed timely should be
> >> spilled
> >>>> to
> >>>>>>> disk.
> >>>>>>>>>> You can find more details in FLIP-235. Looking forward to
> >>> your
> >>>>>>>> feedback.
> >>>>>>>>>>
> >>>>>>>>>> [1]
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Best regards,
> >>>>>>>>>>
> >>>>>>>>>> Weijie
> >>>>>>>>>>
>
>

Reply via email to