Hi All, Thank you for your attention and feedback. Do you have any other comments? If there are no other questions, I'll vote on FLIP-235 tomorrow.
Best regards, Weijie Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 13:22写道: > Hi Xintong > Thanks for your detailed explanation, I misunderstand the spill > behavior at first glance, > I get your point now. I think it will be a good addition to the current > execution mode. > Looking forward to it :) > > Best, > Aitozi > > Xintong Song <tonysong...@gmail.com> 于2022年5月20日周五 12:26写道: > > > Hi Aitozi, > > > > In which case we can use the hybrid shuffle mode > > > > Just to directly answer this question, in addition to > > Weijie's explanations. For batch workload, if you want the workload to > take > > advantage of as many resources as available, which ranges from a single > > slot to as many slots as the total tasks, you may consider hybrid shuffle > > mode. Admittedly, this may not always be wanted, e.g., users may not want > > to execute a job if there's too few resources available, or may not want > a > > job taking too many of the cluster resources. That's why we propose > hybrid > > shuffle as an additional option for batch users, rather than a > replacement > > for Pipelined or Blocking mode. > > > > So you mean the hybrid shuffle mode will limit its usage to the bounded > > > source, Right ? > > > > > Yes. > > > > One more question, with the bounded data and partly of the stage is > running > > > in the Pipelined shuffle mode, what will be the behavior of the task > > > failure, Is the checkpoint enabled for these running stages or will it > > > re-run after the failure? > > > > > There's no checkpoints. The failover behavior depends on the spilling > > strategy. > > - In the first version, we only consider a selective spilling strategy, > > which means spill data as little as possible to the disk, which means in > > case of failover upstream tasks need to be restarted to reproduce the > > complete intermediate results. > > - An alternative strategy we may introduce in future if needed is to > spill > > the complete intermediate results. That avoids restarting upstream tasks > in > > case of failover, because the produced intermediate results can be > > re-consumed, at the cost of more disk IO load. > > With both strategies, the trade-off between failover cost and IO load is > > for the user to decide. This is also discussed in the MemoryDataManager > > section of the FLIP. > > > > Best, > > > > Xintong > > > > > > > > On Fri, May 20, 2022 at 12:10 PM Aitozi <gjying1...@gmail.com> wrote: > > > > > Thanks Weijie for your answer. So you mean the hybrid shuffle mode will > > > limit > > > its usage to the bounded source, Right ? > > > One more question, with the bounded data and partly of the stage is > > running > > > in the Pipelined shuffle mode, what will be the behavior of the task > > > failure, Is the > > > checkpoint enabled for these running stages or will it re-run after the > > > failure? > > > > > > Best, > > > Aitozi > > > > > > weijie guo <guoweijieres...@gmail.com> 于2022年5月20日周五 10:45写道: > > > > > > > Hi, Aitozi: > > > > > > > > Thank you for the feedback! > > > > Here are some of my thoughts on your question > > > > > > > > >>> 1.If there is an unbounded data source, but only have resource to > > > > schedule the first stage, will it bring the big burden to the > > > disk/shuffle > > > > service which will occupy all the resource I think. > > > > First of all, Hybrid Shuffle Mode is oriented to the batch job > > scenario, > > > so > > > > there is no problem of unbounded data sources. Secondly, if you > > consider > > > > the stream scenario, I think Pipelined Shuffle should still be the > best > > > > choice at present. For an unbounded data stream, it is not meaningful > > to > > > > only run some stages. > > > > > > > > >>> 2. Which kind of job will benefit from the hybrid shuffle mode. > In > > > > other words, In which case we can use the hybrid shuffle mode: > > > > Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid > > > > shuffle mode can effectively utilize cluster resources and avoid some > > > > unnecessary disk IO overhead. For OLAP scenarios, which are > > characterized > > > > by a large number of concurrently submitted short batch jobs, hybrid > > > > shuffle can solve the scheduling deadlock problem of pipelined > shuffle > > > and > > > > achieve similar performance. > > > > > > > > Best regards, > > > > > > > > Weijie > > > > > > > > > > > > Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 08:05写道: > > > > > > > > > Hi Weijie: > > > > > > > > > > Thanks for the nice FLIP, I have couple questions about this: > > > > > > > > > > 1) In the hybrid shuffle mode, the shuffle mode is decided by the > > > > resource. > > > > > If there > > > > > is an unbounded data source, but only have resource to schedule the > > > first > > > > > stage, will it > > > > > bring the big burden to the disk/shuffle service which will occupy > > all > > > > the > > > > > resource I think. > > > > > > > > > > 2) Which kind of job will benefit from the hybrid shuffle mode. In > > > other > > > > > words, In which > > > > > case we can use the hybrid shuffle mode: > > > > > - For batch job want to use more resource to reduce the e2e time ? > > > > > - Or for streaming job which may lack of resource temporarily ? > > > > > - Or for OLAP job which will try to make best use of available > > > resources > > > > as > > > > > you mentioned to finish the query? > > > > > Just want to know the typical use case for the Hybrid shuffle mode > :) > > > > > > > > > > > > > > > Best, > > > > > Aitozi. > > > > > > > > > > weijie guo <guoweijieres...@gmail.com> 于2022年5月19日周四 18:33写道: > > > > > > > > > > > Yangze, Thank you for the feedback! > > > > > > Here's my thoughts for your questions: > > > > > > > > > > > > >>> How do we decide the size of the buffer pool in > > MemoryDataManager > > > > and > > > > > > the read buffers in FileDataManager? > > > > > > The BufferPool in MemoryDataManager is the LocalBufferPool used > by > > > > > > ResultPartition, and the size is the same as the current > > > implementation > > > > > of > > > > > > sort-merge shuffle. In other words, the minimum value of > BufferPool > > > is > > > > a > > > > > > configurable fixed value, and the maximum value is Math.max(min, > 4 > > * > > > > > > numSubpartitions). The default value can be determined by running > > the > > > > > > TPC-DS tests. > > > > > > Read buffers in FileDataManager are requested from the > > > > > > BatchShuffleReadBufferPool shared by TaskManager, it's size > > > controlled > > > > by > > > > > > *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the > > > default > > > > > > value is 32M, which is consistent with the current sort-merge > > shuffle > > > > > > logic. > > > > > > > > > > > > >>> Is there an upper limit for the sum of them? If there is, how > > > does > > > > > > MemoryDataManager and FileDataManager sync the memory usage? > > > > > > The buffers of the MemoryDataManager are limited by the size of > the > > > > > > LocalBufferPool, and the upper limit is the size of the Network > > > Memory. > > > > > The > > > > > > buffers of the FileDataManager are directly requested from > > > > > > UnpooledOffHeapMemory, and are also limited by the size of the > > > > framework > > > > > > off-heap memory. I think there should be no need for additional > > > > > > synchronization mechanisms. > > > > > > > > > > > > >>> How do you disable the slot sharing? If user configures both > > the > > > > slot > > > > > > sharing group and hybrid shuffle, what will happen to that job? > > > > > > I think we can print a warning log when Hybrid Shuffle is enabled > > and > > > > SSG > > > > > > is configured during the JobGraph compilation stage, and fallback > > to > > > > the > > > > > > region slot sharing group by default. Of course, it will be > > > emphasized > > > > in > > > > > > the document that we do not currently support SSG, If configured, > > it > > > > will > > > > > > fall back to the default. > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > Weijie > > > > > > > > > > > > > > > > > > Yangze Guo <karma...@gmail.com> 于2022年5月19日周四 16:25写道: > > > > > > > > > > > > > Thanks for driving this. Xintong and Weijie. > > > > > > > > > > > > > > I believe this feature will make Flink a better batch/OLAP > > engine. > > > +1 > > > > > > > for the overall design. > > > > > > > > > > > > > > Some questions: > > > > > > > 1. How do we decide the size of the buffer pool in > > > MemoryDataManager > > > > > > > and the read buffers in FileDataManager? > > > > > > > 2. Is there an upper limit for the sum of them? If there is, > how > > > does > > > > > > > MemoryDataManager and FileDataManager sync the memory usage? > > > > > > > 3. How do you disable the slot sharing? If user configures both > > the > > > > > > > slot sharing group and hybrid shuffle, what will happen to that > > > job? > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > Yangze Guo > > > > > > > > > > > > > > On Thu, May 19, 2022 at 2:41 PM Xintong Song < > > > tonysong...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > Thanks for preparing this FLIP, Weijie. > > > > > > > > > > > > > > > > I think this is a good improvement on batch resource > > elasticity. > > > > > > Looking > > > > > > > > forward to the community feedback. > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > Xintong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 19, 2022 at 2:31 PM weijie guo < > > > > > guoweijieres...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > > > > I’d like to start a discussion about FLIP-235[1], which > > > > introduce a > > > > > > > new shuffle mode > > > > > > > > > can overcome some of the problems of Pipelined Shuffle and > > > > > Blocking > > > > > > > Shuffle in batch scenarios. > > > > > > > > > > > > > > > > > > > > > > > > > > > Currently in Flink, task scheduling is more or less > > constrained > > > > by > > > > > > the > > > > > > > shuffle implementations. > > > > > > > > > This will bring the following disadvantages: > > > > > > > > > > > > > > > > > > 1. Pipelined Shuffle: > > > > > > > > > For pipelined shuffle, the upstream and downstream > tasks > > > are > > > > > > > required to be deployed at the same time, to avoid upstream > tasks > > > > being > > > > > > > blocked forever. This is fine when there are enough resources > for > > > > both > > > > > > > upstream and downstream tasks to run simultaneously, but will > > cause > > > > the > > > > > > > following problems otherwise: > > > > > > > > > 1. > > > > > > > > > Pipelined shuffle connected tasks (i.e., a pipelined > > > > region) > > > > > > > cannot be executed until obtaining resources for all of them, > > > > resulting > > > > > > in > > > > > > > longer job finishing time and poorer resource efficiency due to > > > > holding > > > > > > > part of the resources idle while waiting for the rest. > > > > > > > > > 2. > > > > > > > > > More severely, if multiple jobs each hold part of the > > > > cluster > > > > > > > resources and are waiting for more, a deadlock would occur. The > > > > chance > > > > > is > > > > > > > not trivial, especially for scenarios such as OLAP where > > concurrent > > > > job > > > > > > > submissions are frequent. > > > > > > > > > 2. Blocking Shuffle: > > > > > > > > > For blocking shuffle, execution of downstream tasks > must > > > wait > > > > > for > > > > > > > all upstream tasks to finish, despite there might be more > > resources > > > > > > > available. The sequential execution of upstream and downstream > > > tasks > > > > > > > significantly increase the job finishing time, and the disk IO > > > > workload > > > > > > for > > > > > > > spilling and loading full intermediate data also affects the > > > > > performance. > > > > > > > > > > > > > > > > > > > > > > > > > > > We believe the root cause of the above problems is that > > shuffle > > > > > > > implementations put unnecessary constraints on task scheduling. > > > > > > > > > > > > > > > > > > > > > > > > > > > To solve this problem, Xintong Song and I propose to > > introduce > > > > > hybrid > > > > > > > shuffle to minimize the scheduling constraints. With Hybrid > > > Shuffle, > > > > > > Flink > > > > > > > should: > > > > > > > > > > > > > > > > > > 1. Make best use of available resources. > > > > > > > > > Ideally, we want Flink to always make progress if > > possible. > > > > > That > > > > > > > is to say, it should always execute a pending task if there are > > > > > resources > > > > > > > available for that task. > > > > > > > > > 2. Minimize disk IO load. > > > > > > > > > In-flight data should be consumed directly from memory > as > > > > much > > > > > as > > > > > > > possible. Only data that is not consumed timely should be > spilled > > > to > > > > > > disk. > > > > > > > > > > > > > > > > > > You can find more details in FLIP-235. Looking forward to > > your > > > > > > > feedback. > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Best regards, > > > > > > > > > > > > > > > > > > Weijie > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >