In general, I agree with you about aiming jobs with no/few blocking
exchanges for fine-grained recovery. The only problem is, correct me if I'm
wrong, users currently cannot control the data exchanging mode of a
specific edge. I'm not aware of such APIs.

As a first step, I'd prefer excluding this from the scope of this FLIP.

Best,

Xintong



On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler <ches...@apache.org> wrote:

> Yes; but that's also a limitation of the current fine-grained recovery.
>
> My suggestion was primarily aimed at jobs that have no/few blocking
> exchanges, where users would currently have to explicitly configure
> additional blocking exchanges to really get something out of
> fine-grained recovery (at the expense of e2e job duration).
>
> On 25/05/2022 14:47, Xintong Song wrote:
> >> Will this also allow spilling everything to disk while also forwarding
> >> data to the next task?
> >>
> > Yes, as long as the downstream task is started, this always forward the
> > data, even while spilling everything.
> >
> > This would allow us to improve fine-grained recovery by no longer being
> >> constrained to pipelined regions.
> >
> > I think it helps preventing restarts of the upstreams for a failed task,
> > but not the downstreams. Because there's no guarantee a restarted task
> will
> > prevent exactly same data (in terms of order) as the previous execution,
> > thus downstreams cannot resume consuming the data.
> >
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler <ches...@apache.org>
> wrote:
> >
> >> Will this also allow spilling everything to disk while also forwarding
> >> data to the next task?
> >>
> >> This would allow us to improve fine-grained recovery by no longer being
> >> constrained to pipelined regions.
> >>
> >> On 25/05/2022 05:55, weijie guo wrote:
> >>> Hi All,
> >>> Thank you for your attention and feedback.
> >>> Do you have any other comments? If there are no other questions, I'll
> >> vote
> >>> on FLIP-235 tomorrow.
> >>>
> >>> Best regards,
> >>>
> >>> Weijie
> >>>
> >>>
> >>> Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 13:22写道:
> >>>
> >>>> Hi Xintong
> >>>>       Thanks for your detailed explanation, I misunderstand the spill
> >>>> behavior at first glance,
> >>>> I get your point now. I think it will be a good addition to the
> current
> >>>> execution mode.
> >>>> Looking forward to it :)
> >>>>
> >>>> Best,
> >>>> Aitozi
> >>>>
> >>>> Xintong Song <tonysong...@gmail.com> 于2022年5月20日周五 12:26写道:
> >>>>
> >>>>> Hi Aitozi,
> >>>>>
> >>>>> In which case we can use the hybrid shuffle mode
> >>>>>
> >>>>> Just to directly answer this question, in addition to
> >>>>> Weijie's explanations. For batch workload, if you want the workload
> to
> >>>> take
> >>>>> advantage of as many resources as available, which ranges from a
> single
> >>>>> slot to as many slots as the total tasks, you may consider hybrid
> >> shuffle
> >>>>> mode. Admittedly, this may not always be wanted, e.g., users may not
> >> want
> >>>>> to execute a job if there's too few resources available, or may not
> >> want
> >>>> a
> >>>>> job taking too many of the cluster resources. That's why we propose
> >>>> hybrid
> >>>>> shuffle as an additional option for batch users, rather than a
> >>>> replacement
> >>>>> for Pipelined or Blocking mode.
> >>>>>
> >>>>> So you mean the hybrid shuffle mode will limit its usage to the
> bounded
> >>>>>> source, Right ?
> >>>>>>
> >>>>> Yes.
> >>>>>
> >>>>> One more question, with the bounded data and partly of the stage is
> >>>> running
> >>>>>> in the Pipelined shuffle mode, what will be the behavior of the task
> >>>>>> failure, Is the checkpoint enabled for these running stages or will
> it
> >>>>>> re-run after the failure?
> >>>>>>
> >>>>> There's no checkpoints. The failover behavior depends on the spilling
> >>>>> strategy.
> >>>>> - In the first version, we only consider a selective spilling
> strategy,
> >>>>> which means spill data as little as possible to the disk, which means
> >> in
> >>>>> case of failover upstream tasks need to be restarted to reproduce the
> >>>>> complete intermediate results.
> >>>>> - An alternative strategy we may introduce in future if needed is to
> >>>> spill
> >>>>> the complete intermediate results. That avoids restarting upstream
> >> tasks
> >>>> in
> >>>>> case of failover, because the produced intermediate results can be
> >>>>> re-consumed, at the cost of more disk IO load.
> >>>>> With both strategies, the trade-off between failover cost and IO load
> >> is
> >>>>> for the user to decide. This is also discussed in the
> MemoryDataManager
> >>>>> section of the FLIP.
> >>>>>
> >>>>> Best,
> >>>>>
> >>>>> Xintong
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Fri, May 20, 2022 at 12:10 PM Aitozi <gjying1...@gmail.com>
> wrote:
> >>>>>
> >>>>>> Thanks Weijie for your answer. So you mean the hybrid shuffle mode
> >> will
> >>>>>> limit
> >>>>>> its usage to the bounded source, Right ?
> >>>>>> One more question, with the bounded data and partly of the stage is
> >>>>> running
> >>>>>> in the Pipelined shuffle mode, what will be the behavior of the task
> >>>>>> failure, Is the
> >>>>>> checkpoint enabled for these running stages or will it re-run after
> >> the
> >>>>>> failure?
> >>>>>>
> >>>>>> Best,
> >>>>>> Aitozi
> >>>>>>
> >>>>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月20日周五 10:45写道:
> >>>>>>
> >>>>>>> Hi, Aitozi:
> >>>>>>>
> >>>>>>> Thank you for the feedback!
> >>>>>>> Here are some of my thoughts on your question
> >>>>>>>
> >>>>>>>>>> 1.If there is an unbounded data source, but only have resource
> to
> >>>>>>> schedule the first stage, will it bring the big burden to the
> >>>>>> disk/shuffle
> >>>>>>> service which will occupy all the resource I think.
> >>>>>>> First of all, Hybrid Shuffle Mode is oriented to the batch job
> >>>>> scenario,
> >>>>>> so
> >>>>>>> there is no problem of unbounded data sources. Secondly, if you
> >>>>> consider
> >>>>>>> the stream scenario, I think Pipelined Shuffle should still be the
> >>>> best
> >>>>>>> choice at present. For an unbounded data stream, it is not
> meaningful
> >>>>> to
> >>>>>>> only run some stages.
> >>>>>>>
> >>>>>>>>>> 2. Which kind of job will benefit from the hybrid shuffle mode.
> >>>> In
> >>>>>>> other words, In which case we can use the hybrid shuffle mode:
> >>>>>>> Both general batch jobs and OLAP jobs benefit. For batch jobs,
> hybrid
> >>>>>>> shuffle mode can effectively utilize cluster resources and avoid
> some
> >>>>>>> unnecessary disk IO overhead. For OLAP scenarios, which are
> >>>>> characterized
> >>>>>>> by a large number of concurrently submitted short batch jobs,
> hybrid
> >>>>>>> shuffle can solve the scheduling deadlock problem of pipelined
> >>>> shuffle
> >>>>>> and
> >>>>>>> achieve similar performance.
> >>>>>>>
> >>>>>>> Best regards,
> >>>>>>>
> >>>>>>> Weijie
> >>>>>>>
> >>>>>>>
> >>>>>>> Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 08:05写道:
> >>>>>>>
> >>>>>>>> Hi Weijie:
> >>>>>>>>
> >>>>>>>>        Thanks for the nice FLIP, I have couple questions about
> this:
> >>>>>>>>
> >>>>>>>> 1) In the hybrid shuffle mode, the shuffle mode is decided by the
> >>>>>>> resource.
> >>>>>>>> If there
> >>>>>>>> is an unbounded data source, but only have resource to schedule
> the
> >>>>>> first
> >>>>>>>> stage, will it
> >>>>>>>> bring the big burden to the disk/shuffle service which will occupy
> >>>>> all
> >>>>>>> the
> >>>>>>>> resource I think.
> >>>>>>>>
> >>>>>>>> 2) Which kind of job will benefit from the hybrid shuffle mode. In
> >>>>>> other
> >>>>>>>> words, In which
> >>>>>>>> case we can use the hybrid shuffle mode:
> >>>>>>>> - For batch job want to use more resource to reduce the e2e time ?
> >>>>>>>> - Or for streaming job which may lack of resource temporarily ?
> >>>>>>>> - Or for OLAP job which will try to make best use of available
> >>>>>> resources
> >>>>>>> as
> >>>>>>>> you mentioned to finish the query?
> >>>>>>>> Just want to know the typical use case for the Hybrid shuffle mode
> >>>> :)
> >>>>>>>> Best,
> >>>>>>>> Aitozi.
> >>>>>>>>
> >>>>>>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月19日周四 18:33写道:
> >>>>>>>>
> >>>>>>>>> Yangze, Thank you for the feedback!
> >>>>>>>>> Here's my thoughts for your questions:
> >>>>>>>>>
> >>>>>>>>>>>> How do we decide the size of the buffer pool in
> >>>>> MemoryDataManager
> >>>>>>> and
> >>>>>>>>> the read buffers in FileDataManager?
> >>>>>>>>> The BufferPool in MemoryDataManager is the LocalBufferPool used
> >>>> by
> >>>>>>>>> ResultPartition, and the size is the same as the current
> >>>>>> implementation
> >>>>>>>> of
> >>>>>>>>> sort-merge shuffle. In other words, the minimum value of
> >>>> BufferPool
> >>>>>> is
> >>>>>>> a
> >>>>>>>>> configurable fixed value, and the maximum value is Math.max(min,
> >>>> 4
> >>>>> *
> >>>>>>>>> numSubpartitions). The default value can be determined by running
> >>>>> the
> >>>>>>>>> TPC-DS tests.
> >>>>>>>>> Read buffers in FileDataManager are requested from the
> >>>>>>>>> BatchShuffleReadBufferPool shared by TaskManager, it's size
> >>>>>> controlled
> >>>>>>> by
> >>>>>>>>> *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the
> >>>>>> default
> >>>>>>>>> value is 32M, which is consistent with the current sort-merge
> >>>>> shuffle
> >>>>>>>>> logic.
> >>>>>>>>>
> >>>>>>>>>>>> Is there an upper limit for the sum of them? If there is, how
> >>>>>> does
> >>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
> >>>>>>>>> The buffers of the MemoryDataManager are limited by the size of
> >>>> the
> >>>>>>>>> LocalBufferPool, and the upper limit is the size of the Network
> >>>>>> Memory.
> >>>>>>>> The
> >>>>>>>>> buffers of the FileDataManager are directly requested from
> >>>>>>>>> UnpooledOffHeapMemory, and are also limited by the size of the
> >>>>>>> framework
> >>>>>>>>> off-heap memory. I think there should be no need for additional
> >>>>>>>>> synchronization mechanisms.
> >>>>>>>>>
> >>>>>>>>>>>> How do you disable the slot sharing? If user configures both
> >>>>> the
> >>>>>>> slot
> >>>>>>>>> sharing group and hybrid shuffle, what will happen to that job?
> >>>>>>>>> I think we can print a warning log when Hybrid Shuffle is enabled
> >>>>> and
> >>>>>>> SSG
> >>>>>>>>> is configured during the JobGraph compilation stage, and fallback
> >>>>> to
> >>>>>>> the
> >>>>>>>>> region slot sharing group by default. Of course, it will be
> >>>>>> emphasized
> >>>>>>> in
> >>>>>>>>> the document that we do not currently support SSG, If configured,
> >>>>> it
> >>>>>>> will
> >>>>>>>>> fall back to the default.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Best regards,
> >>>>>>>>>
> >>>>>>>>> Weijie
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Yangze Guo <karma...@gmail.com> 于2022年5月19日周四 16:25写道:
> >>>>>>>>>
> >>>>>>>>>> Thanks for driving this. Xintong and Weijie.
> >>>>>>>>>>
> >>>>>>>>>> I believe this feature will make Flink a better batch/OLAP
> >>>>> engine.
> >>>>>> +1
> >>>>>>>>>> for the overall design.
> >>>>>>>>>>
> >>>>>>>>>> Some questions:
> >>>>>>>>>> 1. How do we decide the size of the buffer pool in
> >>>>>> MemoryDataManager
> >>>>>>>>>> and the read buffers in FileDataManager?
> >>>>>>>>>> 2. Is there an upper limit for the sum of them? If there is,
> >>>> how
> >>>>>> does
> >>>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
> >>>>>>>>>> 3. How do you disable the slot sharing? If user configures both
> >>>>> the
> >>>>>>>>>> slot sharing group and hybrid shuffle, what will happen to that
> >>>>>> job?
> >>>>>>>>>> Best,
> >>>>>>>>>> Yangze Guo
> >>>>>>>>>>
> >>>>>>>>>> On Thu, May 19, 2022 at 2:41 PM Xintong Song <
> >>>>>> tonysong...@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>> Thanks for preparing this FLIP, Weijie.
> >>>>>>>>>>>
> >>>>>>>>>>> I think this is a good improvement on batch resource
> >>>>> elasticity.
> >>>>>>>>> Looking
> >>>>>>>>>>> forward to the community feedback.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>>
> >>>>>>>>>>> Xintong
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Thu, May 19, 2022 at 2:31 PM weijie guo <
> >>>>>>>> guoweijieres...@gmail.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> I’d like to start a discussion about FLIP-235[1], which
> >>>>>>> introduce a
> >>>>>>>>>> new shuffle mode
> >>>>>>>>>>>>    can overcome some of the problems of Pipelined Shuffle and
> >>>>>>>> Blocking
> >>>>>>>>>> Shuffle in batch scenarios.
> >>>>>>>>>>>> Currently in Flink, task scheduling is more or less
> >>>>> constrained
> >>>>>>> by
> >>>>>>>>> the
> >>>>>>>>>> shuffle implementations.
> >>>>>>>>>>>> This will bring the following disadvantages:
> >>>>>>>>>>>>
> >>>>>>>>>>>>      1. Pipelined Shuffle:
> >>>>>>>>>>>>       For pipelined shuffle, the upstream and downstream
> >>>> tasks
> >>>>>> are
> >>>>>>>>>> required to be deployed at the same time, to avoid upstream
> >>>> tasks
> >>>>>>> being
> >>>>>>>>>> blocked forever. This is fine when there are enough resources
> >>>> for
> >>>>>>> both
> >>>>>>>>>> upstream and downstream tasks to run simultaneously, but will
> >>>>> cause
> >>>>>>> the
> >>>>>>>>>> following problems otherwise:
> >>>>>>>>>>>>      1.
> >>>>>>>>>>>>         Pipelined shuffle connected tasks (i.e., a pipelined
> >>>>>>> region)
> >>>>>>>>>> cannot be executed until obtaining resources for all of them,
> >>>>>>> resulting
> >>>>>>>>> in
> >>>>>>>>>> longer job finishing time and poorer resource efficiency due to
> >>>>>>> holding
> >>>>>>>>>> part of the resources idle while waiting for the rest.
> >>>>>>>>>>>>         2.
> >>>>>>>>>>>>         More severely, if multiple jobs each hold part of the
> >>>>>>> cluster
> >>>>>>>>>> resources and are waiting for more, a deadlock would occur. The
> >>>>>>> chance
> >>>>>>>> is
> >>>>>>>>>> not trivial, especially for scenarios such as OLAP where
> >>>>> concurrent
> >>>>>>> job
> >>>>>>>>>> submissions are frequent.
> >>>>>>>>>>>>         2. Blocking Shuffle:
> >>>>>>>>>>>>       For blocking shuffle, execution of downstream tasks
> >>>> must
> >>>>>> wait
> >>>>>>>> for
> >>>>>>>>>> all upstream tasks to finish, despite there might be more
> >>>>> resources
> >>>>>>>>>> available. The sequential execution of upstream and downstream
> >>>>>> tasks
> >>>>>>>>>> significantly increase the job finishing time, and the disk IO
> >>>>>>> workload
> >>>>>>>>> for
> >>>>>>>>>> spilling and loading full intermediate data also affects the
> >>>>>>>> performance.
> >>>>>>>>>>>> We believe the root cause of the above problems is that
> >>>>> shuffle
> >>>>>>>>>> implementations put unnecessary constraints on task scheduling.
> >>>>>>>>>>>> To solve this problem, Xintong Song and I propose to
> >>>>> introduce
> >>>>>>>> hybrid
> >>>>>>>>>> shuffle to minimize the scheduling constraints. With Hybrid
> >>>>>> Shuffle,
> >>>>>>>>> Flink
> >>>>>>>>>> should:
> >>>>>>>>>>>>      1. Make best use of available resources.
> >>>>>>>>>>>>       Ideally, we want Flink to always make progress if
> >>>>> possible.
> >>>>>>>> That
> >>>>>>>>>> is to say, it should always execute a pending task if there are
> >>>>>>>> resources
> >>>>>>>>>> available for that task.
> >>>>>>>>>>>>      2. Minimize disk IO load.
> >>>>>>>>>>>>       In-flight data should be consumed directly from memory
> >>>> as
> >>>>>>> much
> >>>>>>>> as
> >>>>>>>>>> possible. Only data that is not consumed timely should be
> >>>> spilled
> >>>>>> to
> >>>>>>>>> disk.
> >>>>>>>>>>>> You can find more details in FLIP-235. Looking forward to
> >>>>> your
> >>>>>>>>>> feedback.
> >>>>>>>>>>>> [1]
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best regards,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Weijie
> >>>>>>>>>>>>
> >>
>
>

Reply via email to