Hi All,

Thanks for all the feedback about this FLIP.

Since there are no other concerns, this FLIP-235 discussion is over.  I will
open a vote today.

Best regards,

Weijie


Xintong Song <tonysong...@gmail.com> 于2022年5月25日周三 22:17写道:

> Ok, I think we are on the same page. I'm aware of
> ExecutionConfig#setExecutionMode, which sets the data exchanging mode at
> the job scope.
>
> Best,
>
> Xintong
>
>
>
> On Wed, May 25, 2022 at 9:50 PM Chesnay Schepler <ches...@apache.org>
> wrote:
>
> > You can influence it to some extent via ExecutionConfig#setExecutionMode.
> > You can for example for all shuffles to use blocking exchanges.
> >
> > I'm not proposing an API that would allow this to be set per edge.
> >
> > On 25/05/2022 15:23, Xintong Song wrote:
> >
> > In general, I agree with you about aiming jobs with no/few blocking
> > exchanges for fine-grained recovery. The only problem is, correct me if
> I'm
> > wrong, users currently cannot control the data exchanging mode of a
> > specific edge. I'm not aware of such APIs.
> >
> > As a first step, I'd prefer excluding this from the scope of this FLIP.
> >
> > Best,
> >
> > Xintong
> >
> >
> > On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler <ches...@apache.org>
> > wrote:
> >
> >> Yes; but that's also a limitation of the current fine-grained recovery.
> >>
> >> My suggestion was primarily aimed at jobs that have no/few blocking
> >> exchanges, where users would currently have to explicitly configure
> >> additional blocking exchanges to really get something out of
> >> fine-grained recovery (at the expense of e2e job duration).
> >>
> >> On 25/05/2022 14:47, Xintong Song wrote:
> >> >> Will this also allow spilling everything to disk while also
> forwarding
> >> >> data to the next task?
> >> >>
> >> > Yes, as long as the downstream task is started, this always forward
> the
> >> > data, even while spilling everything.
> >> >
> >> > This would allow us to improve fine-grained recovery by no longer
> being
> >> >> constrained to pipelined regions.
> >> >
> >> > I think it helps preventing restarts of the upstreams for a failed
> task,
> >> > but not the downstreams. Because there's no guarantee a restarted task
> >> will
> >> > prevent exactly same data (in terms of order) as the previous
> execution,
> >> > thus downstreams cannot resume consuming the data.
> >> >
> >> >
> >> > Best,
> >> >
> >> > Xintong
> >> >
> >> >
> >> >
> >> > On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler <ches...@apache.org>
> >> wrote:
> >> >
> >> >> Will this also allow spilling everything to disk while also
> forwarding
> >> >> data to the next task?
> >> >>
> >> >> This would allow us to improve fine-grained recovery by no longer
> being
> >> >> constrained to pipelined regions.
> >> >>
> >> >> On 25/05/2022 05:55, weijie guo wrote:
> >> >>> Hi All,
> >> >>> Thank you for your attention and feedback.
> >> >>> Do you have any other comments? If there are no other questions,
> I'll
> >> >> vote
> >> >>> on FLIP-235 tomorrow.
> >> >>>
> >> >>> Best regards,
> >> >>>
> >> >>> Weijie
> >> >>>
> >> >>>
> >> >>> Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 13:22写道:
> >> >>>
> >> >>>> Hi Xintong
> >> >>>>       Thanks for your detailed explanation, I misunderstand the
> spill
> >> >>>> behavior at first glance,
> >> >>>> I get your point now. I think it will be a good addition to the
> >> current
> >> >>>> execution mode.
> >> >>>> Looking forward to it :)
> >> >>>>
> >> >>>> Best,
> >> >>>> Aitozi
> >> >>>>
> >> >>>> Xintong Song <tonysong...@gmail.com> 于2022年5月20日周五 12:26写道:
> >> >>>>
> >> >>>>> Hi Aitozi,
> >> >>>>>
> >> >>>>> In which case we can use the hybrid shuffle mode
> >> >>>>>
> >> >>>>> Just to directly answer this question, in addition to
> >> >>>>> Weijie's explanations. For batch workload, if you want the
> workload
> >> to
> >> >>>> take
> >> >>>>> advantage of as many resources as available, which ranges from a
> >> single
> >> >>>>> slot to as many slots as the total tasks, you may consider hybrid
> >> >> shuffle
> >> >>>>> mode. Admittedly, this may not always be wanted, e.g., users may
> not
> >> >> want
> >> >>>>> to execute a job if there's too few resources available, or may
> not
> >> >> want
> >> >>>> a
> >> >>>>> job taking too many of the cluster resources. That's why we
> propose
> >> >>>> hybrid
> >> >>>>> shuffle as an additional option for batch users, rather than a
> >> >>>> replacement
> >> >>>>> for Pipelined or Blocking mode.
> >> >>>>>
> >> >>>>> So you mean the hybrid shuffle mode will limit its usage to the
> >> bounded
> >> >>>>>> source, Right ?
> >> >>>>>>
> >> >>>>> Yes.
> >> >>>>>
> >> >>>>> One more question, with the bounded data and partly of the stage
> is
> >> >>>> running
> >> >>>>>> in the Pipelined shuffle mode, what will be the behavior of the
> >> task
> >> >>>>>> failure, Is the checkpoint enabled for these running stages or
> >> will it
> >> >>>>>> re-run after the failure?
> >> >>>>>>
> >> >>>>> There's no checkpoints. The failover behavior depends on the
> >> spilling
> >> >>>>> strategy.
> >> >>>>> - In the first version, we only consider a selective spilling
> >> strategy,
> >> >>>>> which means spill data as little as possible to the disk, which
> >> means
> >> >> in
> >> >>>>> case of failover upstream tasks need to be restarted to reproduce
> >> the
> >> >>>>> complete intermediate results.
> >> >>>>> - An alternative strategy we may introduce in future if needed is
> to
> >> >>>> spill
> >> >>>>> the complete intermediate results. That avoids restarting upstream
> >> >> tasks
> >> >>>> in
> >> >>>>> case of failover, because the produced intermediate results can be
> >> >>>>> re-consumed, at the cost of more disk IO load.
> >> >>>>> With both strategies, the trade-off between failover cost and IO
> >> load
> >> >> is
> >> >>>>> for the user to decide. This is also discussed in the
> >> MemoryDataManager
> >> >>>>> section of the FLIP.
> >> >>>>>
> >> >>>>> Best,
> >> >>>>>
> >> >>>>> Xintong
> >> >>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> On Fri, May 20, 2022 at 12:10 PM Aitozi <gjying1...@gmail.com>
> >> wrote:
> >> >>>>>
> >> >>>>>> Thanks Weijie for your answer. So you mean the hybrid shuffle
> mode
> >> >> will
> >> >>>>>> limit
> >> >>>>>> its usage to the bounded source, Right ?
> >> >>>>>> One more question, with the bounded data and partly of the stage
> is
> >> >>>>> running
> >> >>>>>> in the Pipelined shuffle mode, what will be the behavior of the
> >> task
> >> >>>>>> failure, Is the
> >> >>>>>> checkpoint enabled for these running stages or will it re-run
> after
> >> >> the
> >> >>>>>> failure?
> >> >>>>>>
> >> >>>>>> Best,
> >> >>>>>> Aitozi
> >> >>>>>>
> >> >>>>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月20日周五 10:45写道:
> >> >>>>>>
> >> >>>>>>> Hi, Aitozi:
> >> >>>>>>>
> >> >>>>>>> Thank you for the feedback!
> >> >>>>>>> Here are some of my thoughts on your question
> >> >>>>>>>
> >> >>>>>>>>>> 1.If there is an unbounded data source, but only have
> resource
> >> to
> >> >>>>>>> schedule the first stage, will it bring the big burden to the
> >> >>>>>> disk/shuffle
> >> >>>>>>> service which will occupy all the resource I think.
> >> >>>>>>> First of all, Hybrid Shuffle Mode is oriented to the batch job
> >> >>>>> scenario,
> >> >>>>>> so
> >> >>>>>>> there is no problem of unbounded data sources. Secondly, if you
> >> >>>>> consider
> >> >>>>>>> the stream scenario, I think Pipelined Shuffle should still be
> the
> >> >>>> best
> >> >>>>>>> choice at present. For an unbounded data stream, it is not
> >> meaningful
> >> >>>>> to
> >> >>>>>>> only run some stages.
> >> >>>>>>>
> >> >>>>>>>>>> 2. Which kind of job will benefit from the hybrid shuffle
> mode.
> >> >>>> In
> >> >>>>>>> other words, In which case we can use the hybrid shuffle mode:
> >> >>>>>>> Both general batch jobs and OLAP jobs benefit. For batch jobs,
> >> hybrid
> >> >>>>>>> shuffle mode can effectively utilize cluster resources and avoid
> >> some
> >> >>>>>>> unnecessary disk IO overhead. For OLAP scenarios, which are
> >> >>>>> characterized
> >> >>>>>>> by a large number of concurrently submitted short batch jobs,
> >> hybrid
> >> >>>>>>> shuffle can solve the scheduling deadlock problem of pipelined
> >> >>>> shuffle
> >> >>>>>> and
> >> >>>>>>> achieve similar performance.
> >> >>>>>>>
> >> >>>>>>> Best regards,
> >> >>>>>>>
> >> >>>>>>> Weijie
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>>> Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 08:05写道:
> >> >>>>>>>
> >> >>>>>>>> Hi Weijie:
> >> >>>>>>>>
> >> >>>>>>>>        Thanks for the nice FLIP, I have couple questions about
> >> this:
> >> >>>>>>>>
> >> >>>>>>>> 1) In the hybrid shuffle mode, the shuffle mode is decided by
> the
> >> >>>>>>> resource.
> >> >>>>>>>> If there
> >> >>>>>>>> is an unbounded data source, but only have resource to schedule
> >> the
> >> >>>>>> first
> >> >>>>>>>> stage, will it
> >> >>>>>>>> bring the big burden to the disk/shuffle service which will
> >> occupy
> >> >>>>> all
> >> >>>>>>> the
> >> >>>>>>>> resource I think.
> >> >>>>>>>>
> >> >>>>>>>> 2) Which kind of job will benefit from the hybrid shuffle mode.
> >> In
> >> >>>>>> other
> >> >>>>>>>> words, In which
> >> >>>>>>>> case we can use the hybrid shuffle mode:
> >> >>>>>>>> - For batch job want to use more resource to reduce the e2e
> time
> >> ?
> >> >>>>>>>> - Or for streaming job which may lack of resource temporarily ?
> >> >>>>>>>> - Or for OLAP job which will try to make best use of available
> >> >>>>>> resources
> >> >>>>>>> as
> >> >>>>>>>> you mentioned to finish the query?
> >> >>>>>>>> Just want to know the typical use case for the Hybrid shuffle
> >> mode
> >> >>>> :)
> >> >>>>>>>> Best,
> >> >>>>>>>> Aitozi.
> >> >>>>>>>>
> >> >>>>>>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月19日周四 18:33写道:
> >> >>>>>>>>
> >> >>>>>>>>> Yangze, Thank you for the feedback!
> >> >>>>>>>>> Here's my thoughts for your questions:
> >> >>>>>>>>>
> >> >>>>>>>>>>>> How do we decide the size of the buffer pool in
> >> >>>>> MemoryDataManager
> >> >>>>>>> and
> >> >>>>>>>>> the read buffers in FileDataManager?
> >> >>>>>>>>> The BufferPool in MemoryDataManager is the LocalBufferPool
> used
> >> >>>> by
> >> >>>>>>>>> ResultPartition, and the size is the same as the current
> >> >>>>>> implementation
> >> >>>>>>>> of
> >> >>>>>>>>> sort-merge shuffle. In other words, the minimum value of
> >> >>>> BufferPool
> >> >>>>>> is
> >> >>>>>>> a
> >> >>>>>>>>> configurable fixed value, and the maximum value is
> Math.max(min,
> >> >>>> 4
> >> >>>>> *
> >> >>>>>>>>> numSubpartitions). The default value can be determined by
> >> running
> >> >>>>> the
> >> >>>>>>>>> TPC-DS tests.
> >> >>>>>>>>> Read buffers in FileDataManager are requested from the
> >> >>>>>>>>> BatchShuffleReadBufferPool shared by TaskManager, it's size
> >> >>>>>> controlled
> >> >>>>>>> by
> >> >>>>>>>>> *taskmanager.memory.framework.off-heap.batch-shuffle.size*,
> the
> >> >>>>>> default
> >> >>>>>>>>> value is 32M, which is consistent with the current sort-merge
> >> >>>>> shuffle
> >> >>>>>>>>> logic.
> >> >>>>>>>>>
> >> >>>>>>>>>>>> Is there an upper limit for the sum of them? If there is,
> how
> >> >>>>>> does
> >> >>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
> >> >>>>>>>>> The buffers of the MemoryDataManager are limited by the size
> of
> >> >>>> the
> >> >>>>>>>>> LocalBufferPool, and the upper limit is the size of the
> Network
> >> >>>>>> Memory.
> >> >>>>>>>> The
> >> >>>>>>>>> buffers of the FileDataManager are directly requested from
> >> >>>>>>>>> UnpooledOffHeapMemory, and are also limited by the size of the
> >> >>>>>>> framework
> >> >>>>>>>>> off-heap memory. I think there should be no need for
> additional
> >> >>>>>>>>> synchronization mechanisms.
> >> >>>>>>>>>
> >> >>>>>>>>>>>> How do you disable the slot sharing? If user configures
> both
> >> >>>>> the
> >> >>>>>>> slot
> >> >>>>>>>>> sharing group and hybrid shuffle, what will happen to that
> job?
> >> >>>>>>>>> I think we can print a warning log when Hybrid Shuffle is
> >> enabled
> >> >>>>> and
> >> >>>>>>> SSG
> >> >>>>>>>>> is configured during the JobGraph compilation stage, and
> >> fallback
> >> >>>>> to
> >> >>>>>>> the
> >> >>>>>>>>> region slot sharing group by default. Of course, it will be
> >> >>>>>> emphasized
> >> >>>>>>> in
> >> >>>>>>>>> the document that we do not currently support SSG, If
> >> configured,
> >> >>>>> it
> >> >>>>>>> will
> >> >>>>>>>>> fall back to the default.
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> Best regards,
> >> >>>>>>>>>
> >> >>>>>>>>> Weijie
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> Yangze Guo <karma...@gmail.com> 于2022年5月19日周四 16:25写道:
> >> >>>>>>>>>
> >> >>>>>>>>>> Thanks for driving this. Xintong and Weijie.
> >> >>>>>>>>>>
> >> >>>>>>>>>> I believe this feature will make Flink a better batch/OLAP
> >> >>>>> engine.
> >> >>>>>> +1
> >> >>>>>>>>>> for the overall design.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Some questions:
> >> >>>>>>>>>> 1. How do we decide the size of the buffer pool in
> >> >>>>>> MemoryDataManager
> >> >>>>>>>>>> and the read buffers in FileDataManager?
> >> >>>>>>>>>> 2. Is there an upper limit for the sum of them? If there is,
> >> >>>> how
> >> >>>>>> does
> >> >>>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
> >> >>>>>>>>>> 3. How do you disable the slot sharing? If user configures
> both
> >> >>>>> the
> >> >>>>>>>>>> slot sharing group and hybrid shuffle, what will happen to
> that
> >> >>>>>> job?
> >> >>>>>>>>>> Best,
> >> >>>>>>>>>> Yangze Guo
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Thu, May 19, 2022 at 2:41 PM Xintong Song <
> >> >>>>>> tonysong...@gmail.com>
> >> >>>>>>>>>> wrote:
> >> >>>>>>>>>>> Thanks for preparing this FLIP, Weijie.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> I think this is a good improvement on batch resource
> >> >>>>> elasticity.
> >> >>>>>>>>> Looking
> >> >>>>>>>>>>> forward to the community feedback.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Best,
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Xintong
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> On Thu, May 19, 2022 at 2:31 PM weijie guo <
> >> >>>>>>>> guoweijieres...@gmail.com>
> >> >>>>>>>>>>> wrote:
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>> Hi all,
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> I’d like to start a discussion about FLIP-235[1], which
> >> >>>>>>> introduce a
> >> >>>>>>>>>> new shuffle mode
> >> >>>>>>>>>>>>    can overcome some of the problems of Pipelined Shuffle
> and
> >> >>>>>>>> Blocking
> >> >>>>>>>>>> Shuffle in batch scenarios.
> >> >>>>>>>>>>>> Currently in Flink, task scheduling is more or less
> >> >>>>> constrained
> >> >>>>>>> by
> >> >>>>>>>>> the
> >> >>>>>>>>>> shuffle implementations.
> >> >>>>>>>>>>>> This will bring the following disadvantages:
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>>      1. Pipelined Shuffle:
> >> >>>>>>>>>>>>       For pipelined shuffle, the upstream and downstream
> >> >>>> tasks
> >> >>>>>> are
> >> >>>>>>>>>> required to be deployed at the same time, to avoid upstream
> >> >>>> tasks
> >> >>>>>>> being
> >> >>>>>>>>>> blocked forever. This is fine when there are enough resources
> >> >>>> for
> >> >>>>>>> both
> >> >>>>>>>>>> upstream and downstream tasks to run simultaneously, but will
> >> >>>>> cause
> >> >>>>>>> the
> >> >>>>>>>>>> following problems otherwise:
> >> >>>>>>>>>>>>      1.
> >> >>>>>>>>>>>>         Pipelined shuffle connected tasks (i.e., a
> pipelined
> >> >>>>>>> region)
> >> >>>>>>>>>> cannot be executed until obtaining resources for all of them,
> >> >>>>>>> resulting
> >> >>>>>>>>> in
> >> >>>>>>>>>> longer job finishing time and poorer resource efficiency due
> to
> >> >>>>>>> holding
> >> >>>>>>>>>> part of the resources idle while waiting for the rest.
> >> >>>>>>>>>>>>         2.
> >> >>>>>>>>>>>>         More severely, if multiple jobs each hold part of
> the
> >> >>>>>>> cluster
> >> >>>>>>>>>> resources and are waiting for more, a deadlock would occur.
> The
> >> >>>>>>> chance
> >> >>>>>>>> is
> >> >>>>>>>>>> not trivial, especially for scenarios such as OLAP where
> >> >>>>> concurrent
> >> >>>>>>> job
> >> >>>>>>>>>> submissions are frequent.
> >> >>>>>>>>>>>>         2. Blocking Shuffle:
> >> >>>>>>>>>>>>       For blocking shuffle, execution of downstream tasks
> >> >>>> must
> >> >>>>>> wait
> >> >>>>>>>> for
> >> >>>>>>>>>> all upstream tasks to finish, despite there might be more
> >> >>>>> resources
> >> >>>>>>>>>> available. The sequential execution of upstream and
> downstream
> >> >>>>>> tasks
> >> >>>>>>>>>> significantly increase the job finishing time, and the disk
> IO
> >> >>>>>>> workload
> >> >>>>>>>>> for
> >> >>>>>>>>>> spilling and loading full intermediate data also affects the
> >> >>>>>>>> performance.
> >> >>>>>>>>>>>> We believe the root cause of the above problems is that
> >> >>>>> shuffle
> >> >>>>>>>>>> implementations put unnecessary constraints on task
> scheduling.
> >> >>>>>>>>>>>> To solve this problem, Xintong Song and I propose to
> >> >>>>> introduce
> >> >>>>>>>> hybrid
> >> >>>>>>>>>> shuffle to minimize the scheduling constraints. With Hybrid
> >> >>>>>> Shuffle,
> >> >>>>>>>>> Flink
> >> >>>>>>>>>> should:
> >> >>>>>>>>>>>>      1. Make best use of available resources.
> >> >>>>>>>>>>>>       Ideally, we want Flink to always make progress if
> >> >>>>> possible.
> >> >>>>>>>> That
> >> >>>>>>>>>> is to say, it should always execute a pending task if there
> are
> >> >>>>>>>> resources
> >> >>>>>>>>>> available for that task.
> >> >>>>>>>>>>>>      2. Minimize disk IO load.
> >> >>>>>>>>>>>>       In-flight data should be consumed directly from
> memory
> >> >>>> as
> >> >>>>>>> much
> >> >>>>>>>> as
> >> >>>>>>>>>> possible. Only data that is not consumed timely should be
> >> >>>> spilled
> >> >>>>>> to
> >> >>>>>>>>> disk.
> >> >>>>>>>>>>>> You can find more details in FLIP-235. Looking forward to
> >> >>>>> your
> >> >>>>>>>>>> feedback.
> >> >>>>>>>>>>>> [1]
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>>
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> Best regards,
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> Weijie
> >> >>>>>>>>>>>>
> >> >>
> >>
> >>
> >
>

Reply via email to