In general, I agree with you about aiming jobs with no/few blocking
exchanges for fine-grained recovery. The only problem is, correct me
if I'm wrong, users currently cannot control the data exchanging mode
of a specific edge. I'm not aware of such APIs.
As a first step, I'd prefer excluding this from the scope of this FLIP.
Best,
Xintong
On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler <ches...@apache.org>
wrote:
Yes; but that's also a limitation of the current fine-grained
recovery.
My suggestion was primarily aimed at jobs that have no/few blocking
exchanges, where users would currently have to explicitly configure
additional blocking exchanges to really get something out of
fine-grained recovery (at the expense of e2e job duration).
On 25/05/2022 14:47, Xintong Song wrote:
>> Will this also allow spilling everything to disk while also
forwarding
>> data to the next task?
>>
> Yes, as long as the downstream task is started, this always
forward the
> data, even while spilling everything.
>
> This would allow us to improve fine-grained recovery by no
longer being
>> constrained to pipelined regions.
>
> I think it helps preventing restarts of the upstreams for a
failed task,
> but not the downstreams. Because there's no guarantee a
restarted task will
> prevent exactly same data (in terms of order) as the previous
execution,
> thus downstreams cannot resume consuming the data.
>
>
> Best,
>
> Xintong
>
>
>
> On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler
<ches...@apache.org> wrote:
>
>> Will this also allow spilling everything to disk while also
forwarding
>> data to the next task?
>>
>> This would allow us to improve fine-grained recovery by no
longer being
>> constrained to pipelined regions.
>>
>> On 25/05/2022 05:55, weijie guo wrote:
>>> Hi All,
>>> Thank you for your attention and feedback.
>>> Do you have any other comments? If there are no other
questions, I'll
>> vote
>>> on FLIP-235 tomorrow.
>>>
>>> Best regards,
>>>
>>> Weijie
>>>
>>>
>>> Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 13:22写道:
>>>
>>>> Hi Xintong
>>>> Thanks for your detailed explanation, I misunderstand
the spill
>>>> behavior at first glance,
>>>> I get your point now. I think it will be a good addition to
the current
>>>> execution mode.
>>>> Looking forward to it :)
>>>>
>>>> Best,
>>>> Aitozi
>>>>
>>>> Xintong Song <tonysong...@gmail.com> 于2022年5月20日周五
12:26写道:
>>>>
>>>>> Hi Aitozi,
>>>>>
>>>>> In which case we can use the hybrid shuffle mode
>>>>>
>>>>> Just to directly answer this question, in addition to
>>>>> Weijie's explanations. For batch workload, if you want the
workload to
>>>> take
>>>>> advantage of as many resources as available, which ranges
from a single
>>>>> slot to as many slots as the total tasks, you may consider
hybrid
>> shuffle
>>>>> mode. Admittedly, this may not always be wanted, e.g., users
may not
>> want
>>>>> to execute a job if there's too few resources available, or
may not
>> want
>>>> a
>>>>> job taking too many of the cluster resources. That's why we
propose
>>>> hybrid
>>>>> shuffle as an additional option for batch users, rather than a
>>>> replacement
>>>>> for Pipelined or Blocking mode.
>>>>>
>>>>> So you mean the hybrid shuffle mode will limit its usage to
the bounded
>>>>>> source, Right ?
>>>>>>
>>>>> Yes.
>>>>>
>>>>> One more question, with the bounded data and partly of the
stage is
>>>> running
>>>>>> in the Pipelined shuffle mode, what will be the behavior of
the task
>>>>>> failure, Is the checkpoint enabled for these running stages
or will it
>>>>>> re-run after the failure?
>>>>>>
>>>>> There's no checkpoints. The failover behavior depends on the
spilling
>>>>> strategy.
>>>>> - In the first version, we only consider a selective
spilling strategy,
>>>>> which means spill data as little as possible to the disk,
which means
>> in
>>>>> case of failover upstream tasks need to be restarted to
reproduce the
>>>>> complete intermediate results.
>>>>> - An alternative strategy we may introduce in future if
needed is to
>>>> spill
>>>>> the complete intermediate results. That avoids restarting
upstream
>> tasks
>>>> in
>>>>> case of failover, because the produced intermediate results
can be
>>>>> re-consumed, at the cost of more disk IO load.
>>>>> With both strategies, the trade-off between failover cost
and IO load
>> is
>>>>> for the user to decide. This is also discussed in the
MemoryDataManager
>>>>> section of the FLIP.
>>>>>
>>>>> Best,
>>>>>
>>>>> Xintong
>>>>>
>>>>>
>>>>>
>>>>> On Fri, May 20, 2022 at 12:10 PM Aitozi
<gjying1...@gmail.com> wrote:
>>>>>
>>>>>> Thanks Weijie for your answer. So you mean the hybrid
shuffle mode
>> will
>>>>>> limit
>>>>>> its usage to the bounded source, Right ?
>>>>>> One more question, with the bounded data and partly of the
stage is
>>>>> running
>>>>>> in the Pipelined shuffle mode, what will be the behavior of
the task
>>>>>> failure, Is the
>>>>>> checkpoint enabled for these running stages or will it
re-run after
>> the
>>>>>> failure?
>>>>>>
>>>>>> Best,
>>>>>> Aitozi
>>>>>>
>>>>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月20日周五
10:45写道:
>>>>>>
>>>>>>> Hi, Aitozi:
>>>>>>>
>>>>>>> Thank you for the feedback!
>>>>>>> Here are some of my thoughts on your question
>>>>>>>
>>>>>>>>>> 1.If there is an unbounded data source, but only have
resource to
>>>>>>> schedule the first stage, will it bring the big burden to the
>>>>>> disk/shuffle
>>>>>>> service which will occupy all the resource I think.
>>>>>>> First of all, Hybrid Shuffle Mode is oriented to the batch job
>>>>> scenario,
>>>>>> so
>>>>>>> there is no problem of unbounded data sources. Secondly,
if you
>>>>> consider
>>>>>>> the stream scenario, I think Pipelined Shuffle should
still be the
>>>> best
>>>>>>> choice at present. For an unbounded data stream, it is not
meaningful
>>>>> to
>>>>>>> only run some stages.
>>>>>>>
>>>>>>>>>> 2. Which kind of job will benefit from the hybrid
shuffle mode.
>>>> In
>>>>>>> other words, In which case we can use the hybrid shuffle mode:
>>>>>>> Both general batch jobs and OLAP jobs benefit. For batch
jobs, hybrid
>>>>>>> shuffle mode can effectively utilize cluster resources and
avoid some
>>>>>>> unnecessary disk IO overhead. For OLAP scenarios, which are
>>>>> characterized
>>>>>>> by a large number of concurrently submitted short batch
jobs, hybrid
>>>>>>> shuffle can solve the scheduling deadlock problem of pipelined
>>>> shuffle
>>>>>> and
>>>>>>> achieve similar performance.
>>>>>>>
>>>>>>> Best regards,
>>>>>>>
>>>>>>> Weijie
>>>>>>>
>>>>>>>
>>>>>>> Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 08:05写道:
>>>>>>>
>>>>>>>> Hi Weijie:
>>>>>>>>
>>>>>>>> Thanks for the nice FLIP, I have couple questions
about this:
>>>>>>>>
>>>>>>>> 1) In the hybrid shuffle mode, the shuffle mode is
decided by the
>>>>>>> resource.
>>>>>>>> If there
>>>>>>>> is an unbounded data source, but only have resource to
schedule the
>>>>>> first
>>>>>>>> stage, will it
>>>>>>>> bring the big burden to the disk/shuffle service which
will occupy
>>>>> all
>>>>>>> the
>>>>>>>> resource I think.
>>>>>>>>
>>>>>>>> 2) Which kind of job will benefit from the hybrid shuffle
mode. In
>>>>>> other
>>>>>>>> words, In which
>>>>>>>> case we can use the hybrid shuffle mode:
>>>>>>>> - For batch job want to use more resource to reduce the
e2e time ?
>>>>>>>> - Or for streaming job which may lack of resource
temporarily ?
>>>>>>>> - Or for OLAP job which will try to make best use of
available
>>>>>> resources
>>>>>>> as
>>>>>>>> you mentioned to finish the query?
>>>>>>>> Just want to know the typical use case for the Hybrid
shuffle mode
>>>> :)
>>>>>>>> Best,
>>>>>>>> Aitozi.
>>>>>>>>
>>>>>>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月19日周四
18:33写道:
>>>>>>>>
>>>>>>>>> Yangze, Thank you for the feedback!
>>>>>>>>> Here's my thoughts for your questions:
>>>>>>>>>
>>>>>>>>>>>> How do we decide the size of the buffer pool in
>>>>> MemoryDataManager
>>>>>>> and
>>>>>>>>> the read buffers in FileDataManager?
>>>>>>>>> The BufferPool in MemoryDataManager is the
LocalBufferPool used
>>>> by
>>>>>>>>> ResultPartition, and the size is the same as the current
>>>>>> implementation
>>>>>>>> of
>>>>>>>>> sort-merge shuffle. In other words, the minimum value of
>>>> BufferPool
>>>>>> is
>>>>>>> a
>>>>>>>>> configurable fixed value, and the maximum value is
Math.max(min,
>>>> 4
>>>>> *
>>>>>>>>> numSubpartitions). The default value can be determined
by running
>>>>> the
>>>>>>>>> TPC-DS tests.
>>>>>>>>> Read buffers in FileDataManager are requested from the
>>>>>>>>> BatchShuffleReadBufferPool shared by TaskManager, it's size
>>>>>> controlled
>>>>>>> by
>>>>>>>>>
*taskmanager.memory.framework.off-heap.batch-shuffle.size*, the
>>>>>> default
>>>>>>>>> value is 32M, which is consistent with the current
sort-merge
>>>>> shuffle
>>>>>>>>> logic.
>>>>>>>>>
>>>>>>>>>>>> Is there an upper limit for the sum of them? If there
is, how
>>>>>> does
>>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage?
>>>>>>>>> The buffers of the MemoryDataManager are limited by the
size of
>>>> the
>>>>>>>>> LocalBufferPool, and the upper limit is the size of the
Network
>>>>>> Memory.
>>>>>>>> The
>>>>>>>>> buffers of the FileDataManager are directly requested from
>>>>>>>>> UnpooledOffHeapMemory, and are also limited by the size
of the
>>>>>>> framework
>>>>>>>>> off-heap memory. I think there should be no need for
additional
>>>>>>>>> synchronization mechanisms.
>>>>>>>>>
>>>>>>>>>>>> How do you disable the slot sharing? If user
configures both
>>>>> the
>>>>>>> slot
>>>>>>>>> sharing group and hybrid shuffle, what will happen to
that job?
>>>>>>>>> I think we can print a warning log when Hybrid Shuffle
is enabled
>>>>> and
>>>>>>> SSG
>>>>>>>>> is configured during the JobGraph compilation stage, and
fallback
>>>>> to
>>>>>>> the
>>>>>>>>> region slot sharing group by default. Of course, it will be
>>>>>> emphasized
>>>>>>> in
>>>>>>>>> the document that we do not currently support SSG, If
configured,
>>>>> it
>>>>>>> will
>>>>>>>>> fall back to the default.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best regards,
>>>>>>>>>
>>>>>>>>> Weijie
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Yangze Guo <karma...@gmail.com> 于2022年5月19日周四
16:25写道:
>>>>>>>>>
>>>>>>>>>> Thanks for driving this. Xintong and Weijie.
>>>>>>>>>>
>>>>>>>>>> I believe this feature will make Flink a better batch/OLAP
>>>>> engine.
>>>>>> +1
>>>>>>>>>> for the overall design.
>>>>>>>>>>
>>>>>>>>>> Some questions:
>>>>>>>>>> 1. How do we decide the size of the buffer pool in
>>>>>> MemoryDataManager
>>>>>>>>>> and the read buffers in FileDataManager?
>>>>>>>>>> 2. Is there an upper limit for the sum of them? If
there is,
>>>> how
>>>>>> does
>>>>>>>>>> MemoryDataManager and FileDataManager sync the memory
usage?
>>>>>>>>>> 3. How do you disable the slot sharing? If user
configures both
>>>>> the
>>>>>>>>>> slot sharing group and hybrid shuffle, what will happen
to that
>>>>>> job?
>>>>>>>>>> Best,
>>>>>>>>>> Yangze Guo
>>>>>>>>>>
>>>>>>>>>> On Thu, May 19, 2022 at 2:41 PM Xintong Song <
>>>>>> tonysong...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>> Thanks for preparing this FLIP, Weijie.
>>>>>>>>>>>
>>>>>>>>>>> I think this is a good improvement on batch resource
>>>>> elasticity.
>>>>>>>>> Looking
>>>>>>>>>>> forward to the community feedback.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>>
>>>>>>>>>>> Xintong
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, May 19, 2022 at 2:31 PM weijie guo <
>>>>>>>> guoweijieres...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I’d like to start a discussion about FLIP-235[1], which
>>>>>>> introduce a
>>>>>>>>>> new shuffle mode
>>>>>>>>>>>> can overcome some of the problems of Pipelined
Shuffle and
>>>>>>>> Blocking
>>>>>>>>>> Shuffle in batch scenarios.
>>>>>>>>>>>> Currently in Flink, task scheduling is more or less
>>>>> constrained
>>>>>>> by
>>>>>>>>> the
>>>>>>>>>> shuffle implementations.
>>>>>>>>>>>> This will bring the following disadvantages:
>>>>>>>>>>>>
>>>>>>>>>>>> 1. Pipelined Shuffle:
>>>>>>>>>>>> For pipelined shuffle, the upstream and downstream
>>>> tasks
>>>>>> are
>>>>>>>>>> required to be deployed at the same time, to avoid upstream
>>>> tasks
>>>>>>> being
>>>>>>>>>> blocked forever. This is fine when there are enough
resources
>>>> for
>>>>>>> both
>>>>>>>>>> upstream and downstream tasks to run simultaneously,
but will
>>>>> cause
>>>>>>> the
>>>>>>>>>> following problems otherwise:
>>>>>>>>>>>> 1.
>>>>>>>>>>>> Pipelined shuffle connected tasks (i.e., a pipelined
>>>>>>> region)
>>>>>>>>>> cannot be executed until obtaining resources for all of
them,
>>>>>>> resulting
>>>>>>>>> in
>>>>>>>>>> longer job finishing time and poorer resource
efficiency due to
>>>>>>> holding
>>>>>>>>>> part of the resources idle while waiting for the rest.
>>>>>>>>>>>> 2.
>>>>>>>>>>>> More severely, if multiple jobs each hold
part of the
>>>>>>> cluster
>>>>>>>>>> resources and are waiting for more, a deadlock would
occur. The
>>>>>>> chance
>>>>>>>> is
>>>>>>>>>> not trivial, especially for scenarios such as OLAP where
>>>>> concurrent
>>>>>>> job
>>>>>>>>>> submissions are frequent.
>>>>>>>>>>>> 2. Blocking Shuffle:
>>>>>>>>>>>> For blocking shuffle, execution of downstream tasks
>>>> must
>>>>>> wait
>>>>>>>> for
>>>>>>>>>> all upstream tasks to finish, despite there might be more
>>>>> resources
>>>>>>>>>> available. The sequential execution of upstream and
downstream
>>>>>> tasks
>>>>>>>>>> significantly increase the job finishing time, and the
disk IO
>>>>>>> workload
>>>>>>>>> for
>>>>>>>>>> spilling and loading full intermediate data also
affects the
>>>>>>>> performance.
>>>>>>>>>>>> We believe the root cause of the above problems is that
>>>>> shuffle
>>>>>>>>>> implementations put unnecessary constraints on task
scheduling.
>>>>>>>>>>>> To solve this problem, Xintong Song and I propose to
>>>>> introduce
>>>>>>>> hybrid
>>>>>>>>>> shuffle to minimize the scheduling constraints. With Hybrid
>>>>>> Shuffle,
>>>>>>>>> Flink
>>>>>>>>>> should:
>>>>>>>>>>>> 1. Make best use of available resources.
>>>>>>>>>>>> Ideally, we want Flink to always make progress if
>>>>> possible.
>>>>>>>> That
>>>>>>>>>> is to say, it should always execute a pending task if
there are
>>>>>>>> resources
>>>>>>>>>> available for that task.
>>>>>>>>>>>> 2. Minimize disk IO load.
>>>>>>>>>>>> In-flight data should be consumed directly from memory
>>>> as
>>>>>>> much
>>>>>>>> as
>>>>>>>>>> possible. Only data that is not consumed timely should be
>>>> spilled
>>>>>> to
>>>>>>>>> disk.
>>>>>>>>>>>> You can find more details in FLIP-235. Looking forward to
>>>>> your
>>>>>>>>>> feedback.
>>>>>>>>>>>> [1]
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>
https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode
>>>>>>>>>>>>
>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>
>>>>>>>>>>>> Weijie
>>>>>>>>>>>>
>>