You can influence it to some extent via ExecutionConfig#setExecutionMode.
You can for example for all shuffles to use blocking exchanges.
I'm not proposing an API that would allow this to be set per edge.

On 25/05/2022 15:23, Xintong Song wrote:
In general, I agree with you about aiming jobs with no/few blocking exchanges for fine-grained recovery. The only problem is, correct me if I'm wrong, users currently cannot control the data exchanging mode of a specific edge. I'm not aware of such APIs.
As a first step, I'd prefer excluding this from the scope of this FLIP.

Best,

Xintong



On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler <ches...@apache.org> wrote:
    Yes; but that's also a limitation of the current fine-grained
    recovery.

    My suggestion was primarily aimed at jobs that have no/few blocking
    exchanges, where users would currently have to explicitly configure
    additional blocking exchanges to really get something out of
    fine-grained recovery (at the expense of e2e job duration).

    On 25/05/2022 14:47, Xintong Song wrote:
    >> Will this also allow spilling everything to disk while also
    forwarding
    >> data to the next task?
    >>
    > Yes, as long as the downstream task is started, this always
    forward the
    > data, even while spilling everything.
    >
    > This would allow us to improve fine-grained recovery by no
    longer being
    >> constrained to pipelined regions.
    >
    > I think it helps preventing restarts of the upstreams for a
    failed task,
    > but not the downstreams. Because there's no guarantee a
    restarted task will
    > prevent exactly same data (in terms of order) as the previous
    execution,
    > thus downstreams cannot resume consuming the data.
    >
    >
    > Best,
    >
    > Xintong
    >
    >
    >
    > On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler
    <ches...@apache.org> wrote:
    >
    >> Will this also allow spilling everything to disk while also
    forwarding
    >> data to the next task?
    >>
    >> This would allow us to improve fine-grained recovery by no
    longer being
    >> constrained to pipelined regions.
    >>
    >> On 25/05/2022 05:55, weijie guo wrote:
    >>> Hi All,
    >>> Thank you for your attention and feedback.
    >>> Do you have any other comments? If there are no other
    questions, I'll
    >> vote
    >>> on FLIP-235 tomorrow.
    >>>
    >>> Best regards,
    >>>
    >>> Weijie
    >>>
    >>>
    >>> Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 13:22写道:
    >>>
    >>>> Hi Xintong
    >>>>       Thanks for your detailed explanation, I misunderstand
    the spill
    >>>> behavior at first glance,
    >>>> I get your point now. I think it will be a good addition to
    the current
    >>>> execution mode.
    >>>> Looking forward to it :)
    >>>>
    >>>> Best,
    >>>> Aitozi
    >>>>
    >>>> Xintong Song <tonysong...@gmail.com> 于2022年5月20日周五
    12:26写道:
    >>>>
    >>>>> Hi Aitozi,
    >>>>>
    >>>>> In which case we can use the hybrid shuffle mode
    >>>>>
    >>>>> Just to directly answer this question, in addition to
    >>>>> Weijie's explanations. For batch workload, if you want the
    workload to
    >>>> take
    >>>>> advantage of as many resources as available, which ranges
    from a single
    >>>>> slot to as many slots as the total tasks, you may consider
    hybrid
    >> shuffle
    >>>>> mode. Admittedly, this may not always be wanted, e.g., users
    may not
    >> want
    >>>>> to execute a job if there's too few resources available, or
    may not
    >> want
    >>>> a
    >>>>> job taking too many of the cluster resources. That's why we
    propose
    >>>> hybrid
    >>>>> shuffle as an additional option for batch users, rather than a
    >>>> replacement
    >>>>> for Pipelined or Blocking mode.
    >>>>>
    >>>>> So you mean the hybrid shuffle mode will limit its usage to
    the bounded
    >>>>>> source, Right ?
    >>>>>>
    >>>>> Yes.
    >>>>>
    >>>>> One more question, with the bounded data and partly of the
    stage is
    >>>> running
    >>>>>> in the Pipelined shuffle mode, what will be the behavior of
    the task
    >>>>>> failure, Is the checkpoint enabled for these running stages
    or will it
    >>>>>> re-run after the failure?
    >>>>>>
    >>>>> There's no checkpoints. The failover behavior depends on the
    spilling
    >>>>> strategy.
    >>>>> - In the first version, we only consider a selective
    spilling strategy,
    >>>>> which means spill data as little as possible to the disk,
    which means
    >> in
    >>>>> case of failover upstream tasks need to be restarted to
    reproduce the
    >>>>> complete intermediate results.
    >>>>> - An alternative strategy we may introduce in future if
    needed is to
    >>>> spill
    >>>>> the complete intermediate results. That avoids restarting
    upstream
    >> tasks
    >>>> in
    >>>>> case of failover, because the produced intermediate results
    can be
    >>>>> re-consumed, at the cost of more disk IO load.
    >>>>> With both strategies, the trade-off between failover cost
    and IO load
    >> is
    >>>>> for the user to decide. This is also discussed in the
    MemoryDataManager
    >>>>> section of the FLIP.
    >>>>>
    >>>>> Best,
    >>>>>
    >>>>> Xintong
    >>>>>
    >>>>>
    >>>>>
    >>>>> On Fri, May 20, 2022 at 12:10 PM Aitozi
    <gjying1...@gmail.com> wrote:
    >>>>>
    >>>>>> Thanks Weijie for your answer. So you mean the hybrid
    shuffle mode
    >> will
    >>>>>> limit
    >>>>>> its usage to the bounded source, Right ?
    >>>>>> One more question, with the bounded data and partly of the
    stage is
    >>>>> running
    >>>>>> in the Pipelined shuffle mode, what will be the behavior of
    the task
    >>>>>> failure, Is the
    >>>>>> checkpoint enabled for these running stages or will it
    re-run after
    >> the
    >>>>>> failure?
    >>>>>>
    >>>>>> Best,
    >>>>>> Aitozi
    >>>>>>
    >>>>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月20日周五
    10:45写道:
    >>>>>>
    >>>>>>> Hi, Aitozi:
    >>>>>>>
    >>>>>>> Thank you for the feedback!
    >>>>>>> Here are some of my thoughts on your question
    >>>>>>>
    >>>>>>>>>> 1.If there is an unbounded data source, but only have
    resource to
    >>>>>>> schedule the first stage, will it bring the big burden to the
    >>>>>> disk/shuffle
    >>>>>>> service which will occupy all the resource I think.
    >>>>>>> First of all, Hybrid Shuffle Mode is oriented to the batch job
    >>>>> scenario,
    >>>>>> so
    >>>>>>> there is no problem of unbounded data sources. Secondly,
    if you
    >>>>> consider
    >>>>>>> the stream scenario, I think Pipelined Shuffle should
    still be the
    >>>> best
    >>>>>>> choice at present. For an unbounded data stream, it is not
    meaningful
    >>>>> to
    >>>>>>> only run some stages.
    >>>>>>>
    >>>>>>>>>> 2. Which kind of job will benefit from the hybrid
    shuffle mode.
    >>>> In
    >>>>>>> other words, In which case we can use the hybrid shuffle mode:
    >>>>>>> Both general batch jobs and OLAP jobs benefit. For batch
    jobs, hybrid
    >>>>>>> shuffle mode can effectively utilize cluster resources and
    avoid some
    >>>>>>> unnecessary disk IO overhead. For OLAP scenarios, which are
    >>>>> characterized
    >>>>>>> by a large number of concurrently submitted short batch
    jobs, hybrid
    >>>>>>> shuffle can solve the scheduling deadlock problem of pipelined
    >>>> shuffle
    >>>>>> and
    >>>>>>> achieve similar performance.
    >>>>>>>
    >>>>>>> Best regards,
    >>>>>>>
    >>>>>>> Weijie
    >>>>>>>
    >>>>>>>
    >>>>>>> Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 08:05写道:
    >>>>>>>
    >>>>>>>> Hi Weijie:
    >>>>>>>>
    >>>>>>>>        Thanks for the nice FLIP, I have couple questions
    about this:
    >>>>>>>>
    >>>>>>>> 1) In the hybrid shuffle mode, the shuffle mode is
    decided by the
    >>>>>>> resource.
    >>>>>>>> If there
    >>>>>>>> is an unbounded data source, but only have resource to
    schedule the
    >>>>>> first
    >>>>>>>> stage, will it
    >>>>>>>> bring the big burden to the disk/shuffle service which
    will occupy
    >>>>> all
    >>>>>>> the
    >>>>>>>> resource I think.
    >>>>>>>>
    >>>>>>>> 2) Which kind of job will benefit from the hybrid shuffle
    mode. In
    >>>>>> other
    >>>>>>>> words, In which
    >>>>>>>> case we can use the hybrid shuffle mode:
    >>>>>>>> - For batch job want to use more resource to reduce the
    e2e time ?
    >>>>>>>> - Or for streaming job which may lack of resource
    temporarily ?
    >>>>>>>> - Or for OLAP job which will try to make best use of
    available
    >>>>>> resources
    >>>>>>> as
    >>>>>>>> you mentioned to finish the query?
    >>>>>>>> Just want to know the typical use case for the Hybrid
    shuffle mode
    >>>> :)
    >>>>>>>> Best,
    >>>>>>>> Aitozi.
    >>>>>>>>
    >>>>>>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月19日周四
    18:33写道:
    >>>>>>>>
    >>>>>>>>> Yangze, Thank you for the feedback!
    >>>>>>>>> Here's my thoughts for your questions:
    >>>>>>>>>
    >>>>>>>>>>>> How do we decide the size of the buffer pool in
    >>>>> MemoryDataManager
    >>>>>>> and
    >>>>>>>>> the read buffers in FileDataManager?
    >>>>>>>>> The BufferPool in MemoryDataManager is the
    LocalBufferPool used
    >>>> by
    >>>>>>>>> ResultPartition, and the size is the same as the current
    >>>>>> implementation
    >>>>>>>> of
    >>>>>>>>> sort-merge shuffle. In other words, the minimum value of
    >>>> BufferPool
    >>>>>> is
    >>>>>>> a
    >>>>>>>>> configurable fixed value, and the maximum value is
    Math.max(min,
    >>>> 4
    >>>>> *
    >>>>>>>>> numSubpartitions). The default value can be determined
    by running
    >>>>> the
    >>>>>>>>> TPC-DS tests.
    >>>>>>>>> Read buffers in FileDataManager are requested from the
    >>>>>>>>> BatchShuffleReadBufferPool shared by TaskManager, it's size
    >>>>>> controlled
    >>>>>>> by
    >>>>>>>>>
    *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the
    >>>>>> default
    >>>>>>>>> value is 32M, which is consistent with the current
    sort-merge
    >>>>> shuffle
    >>>>>>>>> logic.
    >>>>>>>>>
    >>>>>>>>>>>> Is there an upper limit for the sum of them? If there
    is, how
    >>>>>> does
    >>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
    >>>>>>>>> The buffers of the MemoryDataManager are limited by the
    size of
    >>>> the
    >>>>>>>>> LocalBufferPool, and the upper limit is the size of the
    Network
    >>>>>> Memory.
    >>>>>>>> The
    >>>>>>>>> buffers of the FileDataManager are directly requested from
    >>>>>>>>> UnpooledOffHeapMemory, and are also limited by the size
    of the
    >>>>>>> framework
    >>>>>>>>> off-heap memory. I think there should be no need for
    additional
    >>>>>>>>> synchronization mechanisms.
    >>>>>>>>>
    >>>>>>>>>>>> How do you disable the slot sharing? If user
    configures both
    >>>>> the
    >>>>>>> slot
    >>>>>>>>> sharing group and hybrid shuffle, what will happen to
    that job?
    >>>>>>>>> I think we can print a warning log when Hybrid Shuffle
    is enabled
    >>>>> and
    >>>>>>> SSG
    >>>>>>>>> is configured during the JobGraph compilation stage, and
    fallback
    >>>>> to
    >>>>>>> the
    >>>>>>>>> region slot sharing group by default. Of course, it will be
    >>>>>> emphasized
    >>>>>>> in
    >>>>>>>>> the document that we do not currently support SSG, If
    configured,
    >>>>> it
    >>>>>>> will
    >>>>>>>>> fall back to the default.
    >>>>>>>>>
    >>>>>>>>>
    >>>>>>>>> Best regards,
    >>>>>>>>>
    >>>>>>>>> Weijie
    >>>>>>>>>
    >>>>>>>>>
    >>>>>>>>> Yangze Guo <karma...@gmail.com> 于2022年5月19日周四
    16:25写道:
    >>>>>>>>>
    >>>>>>>>>> Thanks for driving this. Xintong and Weijie.
    >>>>>>>>>>
    >>>>>>>>>> I believe this feature will make Flink a better batch/OLAP
    >>>>> engine.
    >>>>>> +1
    >>>>>>>>>> for the overall design.
    >>>>>>>>>>
    >>>>>>>>>> Some questions:
    >>>>>>>>>> 1. How do we decide the size of the buffer pool in
    >>>>>> MemoryDataManager
    >>>>>>>>>> and the read buffers in FileDataManager?
    >>>>>>>>>> 2. Is there an upper limit for the sum of them? If
    there is,
    >>>> how
    >>>>>> does
    >>>>>>>>>> MemoryDataManager and FileDataManager sync the memory
    usage?
    >>>>>>>>>> 3. How do you disable the slot sharing? If user
    configures both
    >>>>> the
    >>>>>>>>>> slot sharing group and hybrid shuffle, what will happen
    to that
    >>>>>> job?
    >>>>>>>>>> Best,
    >>>>>>>>>> Yangze Guo
    >>>>>>>>>>
    >>>>>>>>>> On Thu, May 19, 2022 at 2:41 PM Xintong Song <
    >>>>>> tonysong...@gmail.com>
    >>>>>>>>>> wrote:
    >>>>>>>>>>> Thanks for preparing this FLIP, Weijie.
    >>>>>>>>>>>
    >>>>>>>>>>> I think this is a good improvement on batch resource
    >>>>> elasticity.
    >>>>>>>>> Looking
    >>>>>>>>>>> forward to the community feedback.
    >>>>>>>>>>>
    >>>>>>>>>>> Best,
    >>>>>>>>>>>
    >>>>>>>>>>> Xintong
    >>>>>>>>>>>
    >>>>>>>>>>>
    >>>>>>>>>>>
    >>>>>>>>>>> On Thu, May 19, 2022 at 2:31 PM weijie guo <
    >>>>>>>> guoweijieres...@gmail.com>
    >>>>>>>>>>> wrote:
    >>>>>>>>>>>
    >>>>>>>>>>>> Hi all,
    >>>>>>>>>>>>
    >>>>>>>>>>>>
    >>>>>>>>>>>> I’d like to start a discussion about FLIP-235[1], which
    >>>>>>> introduce a
    >>>>>>>>>> new shuffle mode
    >>>>>>>>>>>>    can overcome some of the problems of Pipelined
    Shuffle and
    >>>>>>>> Blocking
    >>>>>>>>>> Shuffle in batch scenarios.
    >>>>>>>>>>>> Currently in Flink, task scheduling is more or less
    >>>>> constrained
    >>>>>>> by
    >>>>>>>>> the
    >>>>>>>>>> shuffle implementations.
    >>>>>>>>>>>> This will bring the following disadvantages:
    >>>>>>>>>>>>
    >>>>>>>>>>>>      1. Pipelined Shuffle:
    >>>>>>>>>>>>       For pipelined shuffle, the upstream and downstream
    >>>> tasks
    >>>>>> are
    >>>>>>>>>> required to be deployed at the same time, to avoid upstream
    >>>> tasks
    >>>>>>> being
    >>>>>>>>>> blocked forever. This is fine when there are enough
    resources
    >>>> for
    >>>>>>> both
    >>>>>>>>>> upstream and downstream tasks to run simultaneously,
    but will
    >>>>> cause
    >>>>>>> the
    >>>>>>>>>> following problems otherwise:
    >>>>>>>>>>>>      1.
    >>>>>>>>>>>>  Pipelined shuffle connected tasks (i.e., a pipelined
    >>>>>>> region)
    >>>>>>>>>> cannot be executed until obtaining resources for all of
    them,
    >>>>>>> resulting
    >>>>>>>>> in
    >>>>>>>>>> longer job finishing time and poorer resource
    efficiency due to
    >>>>>>> holding
    >>>>>>>>>> part of the resources idle while waiting for the rest.
    >>>>>>>>>>>>         2.
    >>>>>>>>>>>>         More severely, if multiple jobs each hold
    part of the
    >>>>>>> cluster
    >>>>>>>>>> resources and are waiting for more, a deadlock would
    occur. The
    >>>>>>> chance
    >>>>>>>> is
    >>>>>>>>>> not trivial, especially for scenarios such as OLAP where
    >>>>> concurrent
    >>>>>>> job
    >>>>>>>>>> submissions are frequent.
    >>>>>>>>>>>>         2. Blocking Shuffle:
    >>>>>>>>>>>>       For blocking shuffle, execution of downstream tasks
    >>>> must
    >>>>>> wait
    >>>>>>>> for
    >>>>>>>>>> all upstream tasks to finish, despite there might be more
    >>>>> resources
    >>>>>>>>>> available. The sequential execution of upstream and
    downstream
    >>>>>> tasks
    >>>>>>>>>> significantly increase the job finishing time, and the
    disk IO
    >>>>>>> workload
    >>>>>>>>> for
    >>>>>>>>>> spilling and loading full intermediate data also
    affects the
    >>>>>>>> performance.
    >>>>>>>>>>>> We believe the root cause of the above problems is that
    >>>>> shuffle
    >>>>>>>>>> implementations put unnecessary constraints on task
    scheduling.
    >>>>>>>>>>>> To solve this problem, Xintong Song and I propose to
    >>>>> introduce
    >>>>>>>> hybrid
    >>>>>>>>>> shuffle to minimize the scheduling constraints. With Hybrid
    >>>>>> Shuffle,
    >>>>>>>>> Flink
    >>>>>>>>>> should:
    >>>>>>>>>>>>      1. Make best use of available resources.
    >>>>>>>>>>>>  Ideally, we want Flink to always make progress if
    >>>>> possible.
    >>>>>>>> That
    >>>>>>>>>> is to say, it should always execute a pending task if
    there are
    >>>>>>>> resources
    >>>>>>>>>> available for that task.
    >>>>>>>>>>>>      2. Minimize disk IO load.
    >>>>>>>>>>>>  In-flight data should be consumed directly from memory
    >>>> as
    >>>>>>> much
    >>>>>>>> as
    >>>>>>>>>> possible. Only data that is not consumed timely should be
    >>>> spilled
    >>>>>> to
    >>>>>>>>> disk.
    >>>>>>>>>>>> You can find more details in FLIP-235. Looking forward to
    >>>>> your
    >>>>>>>>>> feedback.
    >>>>>>>>>>>> [1]
    >>>>>>>>>>>>
    >>>>>>>>>>>>
    >>>>>>>>>>>>
    >>
    
https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
    >>>>>>>>>>>>
    >>>>>>>>>>>> Best regards,
    >>>>>>>>>>>>
    >>>>>>>>>>>> Weijie
    >>>>>>>>>>>>
    >>

Reply via email to