Hi Aitozi, In which case we can use the hybrid shuffle mode
Just to directly answer this question, in addition to Weijie's explanations. For batch workload, if you want the workload to take advantage of as many resources as available, which ranges from a single slot to as many slots as the total tasks, you may consider hybrid shuffle mode. Admittedly, this may not always be wanted, e.g., users may not want to execute a job if there's too few resources available, or may not want a job taking too many of the cluster resources. That's why we propose hybrid shuffle as an additional option for batch users, rather than a replacement for Pipelined or Blocking mode. So you mean the hybrid shuffle mode will limit its usage to the bounded > source, Right ? > Yes. One more question, with the bounded data and partly of the stage is running > in the Pipelined shuffle mode, what will be the behavior of the task > failure, Is the checkpoint enabled for these running stages or will it > re-run after the failure? > There's no checkpoints. The failover behavior depends on the spilling strategy. - In the first version, we only consider a selective spilling strategy, which means spill data as little as possible to the disk, which means in case of failover upstream tasks need to be restarted to reproduce the complete intermediate results. - An alternative strategy we may introduce in future if needed is to spill the complete intermediate results. That avoids restarting upstream tasks in case of failover, because the produced intermediate results can be re-consumed, at the cost of more disk IO load. With both strategies, the trade-off between failover cost and IO load is for the user to decide. This is also discussed in the MemoryDataManager section of the FLIP. Best, Xintong On Fri, May 20, 2022 at 12:10 PM Aitozi <gjying1...@gmail.com> wrote: > Thanks Weijie for your answer. So you mean the hybrid shuffle mode will > limit > its usage to the bounded source, Right ? > One more question, with the bounded data and partly of the stage is running > in the Pipelined shuffle mode, what will be the behavior of the task > failure, Is the > checkpoint enabled for these running stages or will it re-run after the > failure? > > Best, > Aitozi > > weijie guo <guoweijieres...@gmail.com> 于2022年5月20日周五 10:45写道: > > > Hi, Aitozi: > > > > Thank you for the feedback! > > Here are some of my thoughts on your question > > > > >>> 1.If there is an unbounded data source, but only have resource to > > schedule the first stage, will it bring the big burden to the > disk/shuffle > > service which will occupy all the resource I think. > > First of all, Hybrid Shuffle Mode is oriented to the batch job scenario, > so > > there is no problem of unbounded data sources. Secondly, if you consider > > the stream scenario, I think Pipelined Shuffle should still be the best > > choice at present. For an unbounded data stream, it is not meaningful to > > only run some stages. > > > > >>> 2. Which kind of job will benefit from the hybrid shuffle mode. In > > other words, In which case we can use the hybrid shuffle mode: > > Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid > > shuffle mode can effectively utilize cluster resources and avoid some > > unnecessary disk IO overhead. For OLAP scenarios, which are characterized > > by a large number of concurrently submitted short batch jobs, hybrid > > shuffle can solve the scheduling deadlock problem of pipelined shuffle > and > > achieve similar performance. > > > > Best regards, > > > > Weijie > > > > > > Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 08:05写道: > > > > > Hi Weijie: > > > > > > Thanks for the nice FLIP, I have couple questions about this: > > > > > > 1) In the hybrid shuffle mode, the shuffle mode is decided by the > > resource. > > > If there > > > is an unbounded data source, but only have resource to schedule the > first > > > stage, will it > > > bring the big burden to the disk/shuffle service which will occupy all > > the > > > resource I think. > > > > > > 2) Which kind of job will benefit from the hybrid shuffle mode. In > other > > > words, In which > > > case we can use the hybrid shuffle mode: > > > - For batch job want to use more resource to reduce the e2e time ? > > > - Or for streaming job which may lack of resource temporarily ? > > > - Or for OLAP job which will try to make best use of available > resources > > as > > > you mentioned to finish the query? > > > Just want to know the typical use case for the Hybrid shuffle mode :) > > > > > > > > > Best, > > > Aitozi. > > > > > > weijie guo <guoweijieres...@gmail.com> 于2022年5月19日周四 18:33写道: > > > > > > > Yangze, Thank you for the feedback! > > > > Here's my thoughts for your questions: > > > > > > > > >>> How do we decide the size of the buffer pool in MemoryDataManager > > and > > > > the read buffers in FileDataManager? > > > > The BufferPool in MemoryDataManager is the LocalBufferPool used by > > > > ResultPartition, and the size is the same as the current > implementation > > > of > > > > sort-merge shuffle. In other words, the minimum value of BufferPool > is > > a > > > > configurable fixed value, and the maximum value is Math.max(min, 4 * > > > > numSubpartitions). The default value can be determined by running the > > > > TPC-DS tests. > > > > Read buffers in FileDataManager are requested from the > > > > BatchShuffleReadBufferPool shared by TaskManager, it's size > controlled > > by > > > > *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the > default > > > > value is 32M, which is consistent with the current sort-merge shuffle > > > > logic. > > > > > > > > >>> Is there an upper limit for the sum of them? If there is, how > does > > > > MemoryDataManager and FileDataManager sync the memory usage? > > > > The buffers of the MemoryDataManager are limited by the size of the > > > > LocalBufferPool, and the upper limit is the size of the Network > Memory. > > > The > > > > buffers of the FileDataManager are directly requested from > > > > UnpooledOffHeapMemory, and are also limited by the size of the > > framework > > > > off-heap memory. I think there should be no need for additional > > > > synchronization mechanisms. > > > > > > > > >>> How do you disable the slot sharing? If user configures both the > > slot > > > > sharing group and hybrid shuffle, what will happen to that job? > > > > I think we can print a warning log when Hybrid Shuffle is enabled and > > SSG > > > > is configured during the JobGraph compilation stage, and fallback to > > the > > > > region slot sharing group by default. Of course, it will be > emphasized > > in > > > > the document that we do not currently support SSG, If configured, it > > will > > > > fall back to the default. > > > > > > > > > > > > Best regards, > > > > > > > > Weijie > > > > > > > > > > > > Yangze Guo <karma...@gmail.com> 于2022年5月19日周四 16:25写道: > > > > > > > > > Thanks for driving this. Xintong and Weijie. > > > > > > > > > > I believe this feature will make Flink a better batch/OLAP engine. > +1 > > > > > for the overall design. > > > > > > > > > > Some questions: > > > > > 1. How do we decide the size of the buffer pool in > MemoryDataManager > > > > > and the read buffers in FileDataManager? > > > > > 2. Is there an upper limit for the sum of them? If there is, how > does > > > > > MemoryDataManager and FileDataManager sync the memory usage? > > > > > 3. How do you disable the slot sharing? If user configures both the > > > > > slot sharing group and hybrid shuffle, what will happen to that > job? > > > > > > > > > > > > > > > Best, > > > > > Yangze Guo > > > > > > > > > > On Thu, May 19, 2022 at 2:41 PM Xintong Song < > tonysong...@gmail.com> > > > > > wrote: > > > > > > > > > > > > Thanks for preparing this FLIP, Weijie. > > > > > > > > > > > > I think this is a good improvement on batch resource elasticity. > > > > Looking > > > > > > forward to the community feedback. > > > > > > > > > > > > Best, > > > > > > > > > > > > Xintong > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 19, 2022 at 2:31 PM weijie guo < > > > guoweijieres...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > I’d like to start a discussion about FLIP-235[1], which > > introduce a > > > > > new shuffle mode > > > > > > > can overcome some of the problems of Pipelined Shuffle and > > > Blocking > > > > > Shuffle in batch scenarios. > > > > > > > > > > > > > > > > > > > > > Currently in Flink, task scheduling is more or less constrained > > by > > > > the > > > > > shuffle implementations. > > > > > > > This will bring the following disadvantages: > > > > > > > > > > > > > > 1. Pipelined Shuffle: > > > > > > > For pipelined shuffle, the upstream and downstream tasks > are > > > > > required to be deployed at the same time, to avoid upstream tasks > > being > > > > > blocked forever. This is fine when there are enough resources for > > both > > > > > upstream and downstream tasks to run simultaneously, but will cause > > the > > > > > following problems otherwise: > > > > > > > 1. > > > > > > > Pipelined shuffle connected tasks (i.e., a pipelined > > region) > > > > > cannot be executed until obtaining resources for all of them, > > resulting > > > > in > > > > > longer job finishing time and poorer resource efficiency due to > > holding > > > > > part of the resources idle while waiting for the rest. > > > > > > > 2. > > > > > > > More severely, if multiple jobs each hold part of the > > cluster > > > > > resources and are waiting for more, a deadlock would occur. The > > chance > > > is > > > > > not trivial, especially for scenarios such as OLAP where concurrent > > job > > > > > submissions are frequent. > > > > > > > 2. Blocking Shuffle: > > > > > > > For blocking shuffle, execution of downstream tasks must > wait > > > for > > > > > all upstream tasks to finish, despite there might be more resources > > > > > available. The sequential execution of upstream and downstream > tasks > > > > > significantly increase the job finishing time, and the disk IO > > workload > > > > for > > > > > spilling and loading full intermediate data also affects the > > > performance. > > > > > > > > > > > > > > > > > > > > > We believe the root cause of the above problems is that shuffle > > > > > implementations put unnecessary constraints on task scheduling. > > > > > > > > > > > > > > > > > > > > > To solve this problem, Xintong Song and I propose to introduce > > > hybrid > > > > > shuffle to minimize the scheduling constraints. With Hybrid > Shuffle, > > > > Flink > > > > > should: > > > > > > > > > > > > > > 1. Make best use of available resources. > > > > > > > Ideally, we want Flink to always make progress if possible. > > > That > > > > > is to say, it should always execute a pending task if there are > > > resources > > > > > available for that task. > > > > > > > 2. Minimize disk IO load. > > > > > > > In-flight data should be consumed directly from memory as > > much > > > as > > > > > possible. Only data that is not consumed timely should be spilled > to > > > > disk. > > > > > > > > > > > > > > You can find more details in FLIP-235. Looking forward to your > > > > > feedback. > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > Weijie > > > > > > > > > > > > > > > > > > > > > >