Ok, I think we are on the same page. I'm aware of
ExecutionConfig#setExecutionMode, which sets the data exchanging mode at
the job scope.

Best,

Xintong



On Wed, May 25, 2022 at 9:50 PM Chesnay Schepler <ches...@apache.org> wrote:

> You can influence it to some extent via ExecutionConfig#setExecutionMode.
> You can for example for all shuffles to use blocking exchanges.
>
> I'm not proposing an API that would allow this to be set per edge.
>
> On 25/05/2022 15:23, Xintong Song wrote:
>
> In general, I agree with you about aiming jobs with no/few blocking
> exchanges for fine-grained recovery. The only problem is, correct me if I'm
> wrong, users currently cannot control the data exchanging mode of a
> specific edge. I'm not aware of such APIs.
>
> As a first step, I'd prefer excluding this from the scope of this FLIP.
>
> Best,
>
> Xintong
>
>
> On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> Yes; but that's also a limitation of the current fine-grained recovery.
>>
>> My suggestion was primarily aimed at jobs that have no/few blocking
>> exchanges, where users would currently have to explicitly configure
>> additional blocking exchanges to really get something out of
>> fine-grained recovery (at the expense of e2e job duration).
>>
>> On 25/05/2022 14:47, Xintong Song wrote:
>> >> Will this also allow spilling everything to disk while also forwarding
>> >> data to the next task?
>> >>
>> > Yes, as long as the downstream task is started, this always forward the
>> > data, even while spilling everything.
>> >
>> > This would allow us to improve fine-grained recovery by no longer being
>> >> constrained to pipelined regions.
>> >
>> > I think it helps preventing restarts of the upstreams for a failed task,
>> > but not the downstreams. Because there's no guarantee a restarted task
>> will
>> > prevent exactly same data (in terms of order) as the previous execution,
>> > thus downstreams cannot resume consuming the data.
>> >
>> >
>> > Best,
>> >
>> > Xintong
>> >
>> >
>> >
>> > On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler <ches...@apache.org>
>> wrote:
>> >
>> >> Will this also allow spilling everything to disk while also forwarding
>> >> data to the next task?
>> >>
>> >> This would allow us to improve fine-grained recovery by no longer being
>> >> constrained to pipelined regions.
>> >>
>> >> On 25/05/2022 05:55, weijie guo wrote:
>> >>> Hi All,
>> >>> Thank you for your attention and feedback.
>> >>> Do you have any other comments? If there are no other questions, I'll
>> >> vote
>> >>> on FLIP-235 tomorrow.
>> >>>
>> >>> Best regards,
>> >>>
>> >>> Weijie
>> >>>
>> >>>
>> >>> Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 13:22写道:
>> >>>
>> >>>> Hi Xintong
>> >>>>       Thanks for your detailed explanation, I misunderstand the spill
>> >>>> behavior at first glance,
>> >>>> I get your point now. I think it will be a good addition to the
>> current
>> >>>> execution mode.
>> >>>> Looking forward to it :)
>> >>>>
>> >>>> Best,
>> >>>> Aitozi
>> >>>>
>> >>>> Xintong Song <tonysong...@gmail.com> 于2022年5月20日周五 12:26写道:
>> >>>>
>> >>>>> Hi Aitozi,
>> >>>>>
>> >>>>> In which case we can use the hybrid shuffle mode
>> >>>>>
>> >>>>> Just to directly answer this question, in addition to
>> >>>>> Weijie's explanations. For batch workload, if you want the workload
>> to
>> >>>> take
>> >>>>> advantage of as many resources as available, which ranges from a
>> single
>> >>>>> slot to as many slots as the total tasks, you may consider hybrid
>> >> shuffle
>> >>>>> mode. Admittedly, this may not always be wanted, e.g., users may not
>> >> want
>> >>>>> to execute a job if there's too few resources available, or may not
>> >> want
>> >>>> a
>> >>>>> job taking too many of the cluster resources. That's why we propose
>> >>>> hybrid
>> >>>>> shuffle as an additional option for batch users, rather than a
>> >>>> replacement
>> >>>>> for Pipelined or Blocking mode.
>> >>>>>
>> >>>>> So you mean the hybrid shuffle mode will limit its usage to the
>> bounded
>> >>>>>> source, Right ?
>> >>>>>>
>> >>>>> Yes.
>> >>>>>
>> >>>>> One more question, with the bounded data and partly of the stage is
>> >>>> running
>> >>>>>> in the Pipelined shuffle mode, what will be the behavior of the
>> task
>> >>>>>> failure, Is the checkpoint enabled for these running stages or
>> will it
>> >>>>>> re-run after the failure?
>> >>>>>>
>> >>>>> There's no checkpoints. The failover behavior depends on the
>> spilling
>> >>>>> strategy.
>> >>>>> - In the first version, we only consider a selective spilling
>> strategy,
>> >>>>> which means spill data as little as possible to the disk, which
>> means
>> >> in
>> >>>>> case of failover upstream tasks need to be restarted to reproduce
>> the
>> >>>>> complete intermediate results.
>> >>>>> - An alternative strategy we may introduce in future if needed is to
>> >>>> spill
>> >>>>> the complete intermediate results. That avoids restarting upstream
>> >> tasks
>> >>>> in
>> >>>>> case of failover, because the produced intermediate results can be
>> >>>>> re-consumed, at the cost of more disk IO load.
>> >>>>> With both strategies, the trade-off between failover cost and IO
>> load
>> >> is
>> >>>>> for the user to decide. This is also discussed in the
>> MemoryDataManager
>> >>>>> section of the FLIP.
>> >>>>>
>> >>>>> Best,
>> >>>>>
>> >>>>> Xintong
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> On Fri, May 20, 2022 at 12:10 PM Aitozi <gjying1...@gmail.com>
>> wrote:
>> >>>>>
>> >>>>>> Thanks Weijie for your answer. So you mean the hybrid shuffle mode
>> >> will
>> >>>>>> limit
>> >>>>>> its usage to the bounded source, Right ?
>> >>>>>> One more question, with the bounded data and partly of the stage is
>> >>>>> running
>> >>>>>> in the Pipelined shuffle mode, what will be the behavior of the
>> task
>> >>>>>> failure, Is the
>> >>>>>> checkpoint enabled for these running stages or will it re-run after
>> >> the
>> >>>>>> failure?
>> >>>>>>
>> >>>>>> Best,
>> >>>>>> Aitozi
>> >>>>>>
>> >>>>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月20日周五 10:45写道:
>> >>>>>>
>> >>>>>>> Hi, Aitozi:
>> >>>>>>>
>> >>>>>>> Thank you for the feedback!
>> >>>>>>> Here are some of my thoughts on your question
>> >>>>>>>
>> >>>>>>>>>> 1.If there is an unbounded data source, but only have resource
>> to
>> >>>>>>> schedule the first stage, will it bring the big burden to the
>> >>>>>> disk/shuffle
>> >>>>>>> service which will occupy all the resource I think.
>> >>>>>>> First of all, Hybrid Shuffle Mode is oriented to the batch job
>> >>>>> scenario,
>> >>>>>> so
>> >>>>>>> there is no problem of unbounded data sources. Secondly, if you
>> >>>>> consider
>> >>>>>>> the stream scenario, I think Pipelined Shuffle should still be the
>> >>>> best
>> >>>>>>> choice at present. For an unbounded data stream, it is not
>> meaningful
>> >>>>> to
>> >>>>>>> only run some stages.
>> >>>>>>>
>> >>>>>>>>>> 2. Which kind of job will benefit from the hybrid shuffle mode.
>> >>>> In
>> >>>>>>> other words, In which case we can use the hybrid shuffle mode:
>> >>>>>>> Both general batch jobs and OLAP jobs benefit. For batch jobs,
>> hybrid
>> >>>>>>> shuffle mode can effectively utilize cluster resources and avoid
>> some
>> >>>>>>> unnecessary disk IO overhead. For OLAP scenarios, which are
>> >>>>> characterized
>> >>>>>>> by a large number of concurrently submitted short batch jobs,
>> hybrid
>> >>>>>>> shuffle can solve the scheduling deadlock problem of pipelined
>> >>>> shuffle
>> >>>>>> and
>> >>>>>>> achieve similar performance.
>> >>>>>>>
>> >>>>>>> Best regards,
>> >>>>>>>
>> >>>>>>> Weijie
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 08:05写道:
>> >>>>>>>
>> >>>>>>>> Hi Weijie:
>> >>>>>>>>
>> >>>>>>>>        Thanks for the nice FLIP, I have couple questions about
>> this:
>> >>>>>>>>
>> >>>>>>>> 1) In the hybrid shuffle mode, the shuffle mode is decided by the
>> >>>>>>> resource.
>> >>>>>>>> If there
>> >>>>>>>> is an unbounded data source, but only have resource to schedule
>> the
>> >>>>>> first
>> >>>>>>>> stage, will it
>> >>>>>>>> bring the big burden to the disk/shuffle service which will
>> occupy
>> >>>>> all
>> >>>>>>> the
>> >>>>>>>> resource I think.
>> >>>>>>>>
>> >>>>>>>> 2) Which kind of job will benefit from the hybrid shuffle mode.
>> In
>> >>>>>> other
>> >>>>>>>> words, In which
>> >>>>>>>> case we can use the hybrid shuffle mode:
>> >>>>>>>> - For batch job want to use more resource to reduce the e2e time
>> ?
>> >>>>>>>> - Or for streaming job which may lack of resource temporarily ?
>> >>>>>>>> - Or for OLAP job which will try to make best use of available
>> >>>>>> resources
>> >>>>>>> as
>> >>>>>>>> you mentioned to finish the query?
>> >>>>>>>> Just want to know the typical use case for the Hybrid shuffle
>> mode
>> >>>> :)
>> >>>>>>>> Best,
>> >>>>>>>> Aitozi.
>> >>>>>>>>
>> >>>>>>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月19日周四 18:33写道:
>> >>>>>>>>
>> >>>>>>>>> Yangze, Thank you for the feedback!
>> >>>>>>>>> Here's my thoughts for your questions:
>> >>>>>>>>>
>> >>>>>>>>>>>> How do we decide the size of the buffer pool in
>> >>>>> MemoryDataManager
>> >>>>>>> and
>> >>>>>>>>> the read buffers in FileDataManager?
>> >>>>>>>>> The BufferPool in MemoryDataManager is the LocalBufferPool used
>> >>>> by
>> >>>>>>>>> ResultPartition, and the size is the same as the current
>> >>>>>> implementation
>> >>>>>>>> of
>> >>>>>>>>> sort-merge shuffle. In other words, the minimum value of
>> >>>> BufferPool
>> >>>>>> is
>> >>>>>>> a
>> >>>>>>>>> configurable fixed value, and the maximum value is Math.max(min,
>> >>>> 4
>> >>>>> *
>> >>>>>>>>> numSubpartitions). The default value can be determined by
>> running
>> >>>>> the
>> >>>>>>>>> TPC-DS tests.
>> >>>>>>>>> Read buffers in FileDataManager are requested from the
>> >>>>>>>>> BatchShuffleReadBufferPool shared by TaskManager, it's size
>> >>>>>> controlled
>> >>>>>>> by
>> >>>>>>>>> *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the
>> >>>>>> default
>> >>>>>>>>> value is 32M, which is consistent with the current sort-merge
>> >>>>> shuffle
>> >>>>>>>>> logic.
>> >>>>>>>>>
>> >>>>>>>>>>>> Is there an upper limit for the sum of them? If there is, how
>> >>>>>> does
>> >>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
>> >>>>>>>>> The buffers of the MemoryDataManager are limited by the size of
>> >>>> the
>> >>>>>>>>> LocalBufferPool, and the upper limit is the size of the Network
>> >>>>>> Memory.
>> >>>>>>>> The
>> >>>>>>>>> buffers of the FileDataManager are directly requested from
>> >>>>>>>>> UnpooledOffHeapMemory, and are also limited by the size of the
>> >>>>>>> framework
>> >>>>>>>>> off-heap memory. I think there should be no need for additional
>> >>>>>>>>> synchronization mechanisms.
>> >>>>>>>>>
>> >>>>>>>>>>>> How do you disable the slot sharing? If user configures both
>> >>>>> the
>> >>>>>>> slot
>> >>>>>>>>> sharing group and hybrid shuffle, what will happen to that job?
>> >>>>>>>>> I think we can print a warning log when Hybrid Shuffle is
>> enabled
>> >>>>> and
>> >>>>>>> SSG
>> >>>>>>>>> is configured during the JobGraph compilation stage, and
>> fallback
>> >>>>> to
>> >>>>>>> the
>> >>>>>>>>> region slot sharing group by default. Of course, it will be
>> >>>>>> emphasized
>> >>>>>>> in
>> >>>>>>>>> the document that we do not currently support SSG, If
>> configured,
>> >>>>> it
>> >>>>>>> will
>> >>>>>>>>> fall back to the default.
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Best regards,
>> >>>>>>>>>
>> >>>>>>>>> Weijie
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Yangze Guo <karma...@gmail.com> 于2022年5月19日周四 16:25写道:
>> >>>>>>>>>
>> >>>>>>>>>> Thanks for driving this. Xintong and Weijie.
>> >>>>>>>>>>
>> >>>>>>>>>> I believe this feature will make Flink a better batch/OLAP
>> >>>>> engine.
>> >>>>>> +1
>> >>>>>>>>>> for the overall design.
>> >>>>>>>>>>
>> >>>>>>>>>> Some questions:
>> >>>>>>>>>> 1. How do we decide the size of the buffer pool in
>> >>>>>> MemoryDataManager
>> >>>>>>>>>> and the read buffers in FileDataManager?
>> >>>>>>>>>> 2. Is there an upper limit for the sum of them? If there is,
>> >>>> how
>> >>>>>> does
>> >>>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
>> >>>>>>>>>> 3. How do you disable the slot sharing? If user configures both
>> >>>>> the
>> >>>>>>>>>> slot sharing group and hybrid shuffle, what will happen to that
>> >>>>>> job?
>> >>>>>>>>>> Best,
>> >>>>>>>>>> Yangze Guo
>> >>>>>>>>>>
>> >>>>>>>>>> On Thu, May 19, 2022 at 2:41 PM Xintong Song <
>> >>>>>> tonysong...@gmail.com>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>> Thanks for preparing this FLIP, Weijie.
>> >>>>>>>>>>>
>> >>>>>>>>>>> I think this is a good improvement on batch resource
>> >>>>> elasticity.
>> >>>>>>>>> Looking
>> >>>>>>>>>>> forward to the community feedback.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Best,
>> >>>>>>>>>>>
>> >>>>>>>>>>> Xintong
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Thu, May 19, 2022 at 2:31 PM weijie guo <
>> >>>>>>>> guoweijieres...@gmail.com>
>> >>>>>>>>>>> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>>> Hi all,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> I’d like to start a discussion about FLIP-235[1], which
>> >>>>>>> introduce a
>> >>>>>>>>>> new shuffle mode
>> >>>>>>>>>>>>    can overcome some of the problems of Pipelined Shuffle and
>> >>>>>>>> Blocking
>> >>>>>>>>>> Shuffle in batch scenarios.
>> >>>>>>>>>>>> Currently in Flink, task scheduling is more or less
>> >>>>> constrained
>> >>>>>>> by
>> >>>>>>>>> the
>> >>>>>>>>>> shuffle implementations.
>> >>>>>>>>>>>> This will bring the following disadvantages:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>      1. Pipelined Shuffle:
>> >>>>>>>>>>>>       For pipelined shuffle, the upstream and downstream
>> >>>> tasks
>> >>>>>> are
>> >>>>>>>>>> required to be deployed at the same time, to avoid upstream
>> >>>> tasks
>> >>>>>>> being
>> >>>>>>>>>> blocked forever. This is fine when there are enough resources
>> >>>> for
>> >>>>>>> both
>> >>>>>>>>>> upstream and downstream tasks to run simultaneously, but will
>> >>>>> cause
>> >>>>>>> the
>> >>>>>>>>>> following problems otherwise:
>> >>>>>>>>>>>>      1.
>> >>>>>>>>>>>>         Pipelined shuffle connected tasks (i.e., a pipelined
>> >>>>>>> region)
>> >>>>>>>>>> cannot be executed until obtaining resources for all of them,
>> >>>>>>> resulting
>> >>>>>>>>> in
>> >>>>>>>>>> longer job finishing time and poorer resource efficiency due to
>> >>>>>>> holding
>> >>>>>>>>>> part of the resources idle while waiting for the rest.
>> >>>>>>>>>>>>         2.
>> >>>>>>>>>>>>         More severely, if multiple jobs each hold part of the
>> >>>>>>> cluster
>> >>>>>>>>>> resources and are waiting for more, a deadlock would occur. The
>> >>>>>>> chance
>> >>>>>>>> is
>> >>>>>>>>>> not trivial, especially for scenarios such as OLAP where
>> >>>>> concurrent
>> >>>>>>> job
>> >>>>>>>>>> submissions are frequent.
>> >>>>>>>>>>>>         2. Blocking Shuffle:
>> >>>>>>>>>>>>       For blocking shuffle, execution of downstream tasks
>> >>>> must
>> >>>>>> wait
>> >>>>>>>> for
>> >>>>>>>>>> all upstream tasks to finish, despite there might be more
>> >>>>> resources
>> >>>>>>>>>> available. The sequential execution of upstream and downstream
>> >>>>>> tasks
>> >>>>>>>>>> significantly increase the job finishing time, and the disk IO
>> >>>>>>> workload
>> >>>>>>>>> for
>> >>>>>>>>>> spilling and loading full intermediate data also affects the
>> >>>>>>>> performance.
>> >>>>>>>>>>>> We believe the root cause of the above problems is that
>> >>>>> shuffle
>> >>>>>>>>>> implementations put unnecessary constraints on task scheduling.
>> >>>>>>>>>>>> To solve this problem, Xintong Song and I propose to
>> >>>>> introduce
>> >>>>>>>> hybrid
>> >>>>>>>>>> shuffle to minimize the scheduling constraints. With Hybrid
>> >>>>>> Shuffle,
>> >>>>>>>>> Flink
>> >>>>>>>>>> should:
>> >>>>>>>>>>>>      1. Make best use of available resources.
>> >>>>>>>>>>>>       Ideally, we want Flink to always make progress if
>> >>>>> possible.
>> >>>>>>>> That
>> >>>>>>>>>> is to say, it should always execute a pending task if there are
>> >>>>>>>> resources
>> >>>>>>>>>> available for that task.
>> >>>>>>>>>>>>      2. Minimize disk IO load.
>> >>>>>>>>>>>>       In-flight data should be consumed directly from memory
>> >>>> as
>> >>>>>>> much
>> >>>>>>>> as
>> >>>>>>>>>> possible. Only data that is not consumed timely should be
>> >>>> spilled
>> >>>>>> to
>> >>>>>>>>> disk.
>> >>>>>>>>>>>> You can find more details in FLIP-235. Looking forward to
>> >>>>> your
>> >>>>>>>>>> feedback.
>> >>>>>>>>>>>> [1]
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Best regards,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Weijie
>> >>>>>>>>>>>>
>> >>
>>
>>
>

Reply via email to