Hi All, Thanks for all the feedback about this FLIP.
Since there are no other concerns, this FLIP-235 discussion is over. I will open a vote today. Best regards, Weijie Xintong Song <tonysong...@gmail.com> 于2022年5月25日周三 22:17写道: > Ok, I think we are on the same page. I'm aware of > ExecutionConfig#setExecutionMode, which sets the data exchanging mode at > the job scope. > > Best, > > Xintong > > > > On Wed, May 25, 2022 at 9:50 PM Chesnay Schepler <ches...@apache.org> > wrote: > > > You can influence it to some extent via ExecutionConfig#setExecutionMode. > > You can for example for all shuffles to use blocking exchanges. > > > > I'm not proposing an API that would allow this to be set per edge. > > > > On 25/05/2022 15:23, Xintong Song wrote: > > > > In general, I agree with you about aiming jobs with no/few blocking > > exchanges for fine-grained recovery. The only problem is, correct me if > I'm > > wrong, users currently cannot control the data exchanging mode of a > > specific edge. I'm not aware of such APIs. > > > > As a first step, I'd prefer excluding this from the scope of this FLIP. > > > > Best, > > > > Xintong > > > > > > On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler <ches...@apache.org> > > wrote: > > > >> Yes; but that's also a limitation of the current fine-grained recovery. > >> > >> My suggestion was primarily aimed at jobs that have no/few blocking > >> exchanges, where users would currently have to explicitly configure > >> additional blocking exchanges to really get something out of > >> fine-grained recovery (at the expense of e2e job duration). > >> > >> On 25/05/2022 14:47, Xintong Song wrote: > >> >> Will this also allow spilling everything to disk while also > forwarding > >> >> data to the next task? > >> >> > >> > Yes, as long as the downstream task is started, this always forward > the > >> > data, even while spilling everything. > >> > > >> > This would allow us to improve fine-grained recovery by no longer > being > >> >> constrained to pipelined regions. > >> > > >> > I think it helps preventing restarts of the upstreams for a failed > task, > >> > but not the downstreams. Because there's no guarantee a restarted task > >> will > >> > prevent exactly same data (in terms of order) as the previous > execution, > >> > thus downstreams cannot resume consuming the data. > >> > > >> > > >> > Best, > >> > > >> > Xintong > >> > > >> > > >> > > >> > On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler <ches...@apache.org> > >> wrote: > >> > > >> >> Will this also allow spilling everything to disk while also > forwarding > >> >> data to the next task? > >> >> > >> >> This would allow us to improve fine-grained recovery by no longer > being > >> >> constrained to pipelined regions. > >> >> > >> >> On 25/05/2022 05:55, weijie guo wrote: > >> >>> Hi All, > >> >>> Thank you for your attention and feedback. > >> >>> Do you have any other comments? If there are no other questions, > I'll > >> >> vote > >> >>> on FLIP-235 tomorrow. > >> >>> > >> >>> Best regards, > >> >>> > >> >>> Weijie > >> >>> > >> >>> > >> >>> Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 13:22写道: > >> >>> > >> >>>> Hi Xintong > >> >>>> Thanks for your detailed explanation, I misunderstand the > spill > >> >>>> behavior at first glance, > >> >>>> I get your point now. I think it will be a good addition to the > >> current > >> >>>> execution mode. > >> >>>> Looking forward to it :) > >> >>>> > >> >>>> Best, > >> >>>> Aitozi > >> >>>> > >> >>>> Xintong Song <tonysong...@gmail.com> 于2022年5月20日周五 12:26写道: > >> >>>> > >> >>>>> Hi Aitozi, > >> >>>>> > >> >>>>> In which case we can use the hybrid shuffle mode > >> >>>>> > >> >>>>> Just to directly answer this question, in addition to > >> >>>>> Weijie's explanations. For batch workload, if you want the > workload > >> to > >> >>>> take > >> >>>>> advantage of as many resources as available, which ranges from a > >> single > >> >>>>> slot to as many slots as the total tasks, you may consider hybrid > >> >> shuffle > >> >>>>> mode. Admittedly, this may not always be wanted, e.g., users may > not > >> >> want > >> >>>>> to execute a job if there's too few resources available, or may > not > >> >> want > >> >>>> a > >> >>>>> job taking too many of the cluster resources. That's why we > propose > >> >>>> hybrid > >> >>>>> shuffle as an additional option for batch users, rather than a > >> >>>> replacement > >> >>>>> for Pipelined or Blocking mode. > >> >>>>> > >> >>>>> So you mean the hybrid shuffle mode will limit its usage to the > >> bounded > >> >>>>>> source, Right ? > >> >>>>>> > >> >>>>> Yes. > >> >>>>> > >> >>>>> One more question, with the bounded data and partly of the stage > is > >> >>>> running > >> >>>>>> in the Pipelined shuffle mode, what will be the behavior of the > >> task > >> >>>>>> failure, Is the checkpoint enabled for these running stages or > >> will it > >> >>>>>> re-run after the failure? > >> >>>>>> > >> >>>>> There's no checkpoints. The failover behavior depends on the > >> spilling > >> >>>>> strategy. > >> >>>>> - In the first version, we only consider a selective spilling > >> strategy, > >> >>>>> which means spill data as little as possible to the disk, which > >> means > >> >> in > >> >>>>> case of failover upstream tasks need to be restarted to reproduce > >> the > >> >>>>> complete intermediate results. > >> >>>>> - An alternative strategy we may introduce in future if needed is > to > >> >>>> spill > >> >>>>> the complete intermediate results. That avoids restarting upstream > >> >> tasks > >> >>>> in > >> >>>>> case of failover, because the produced intermediate results can be > >> >>>>> re-consumed, at the cost of more disk IO load. > >> >>>>> With both strategies, the trade-off between failover cost and IO > >> load > >> >> is > >> >>>>> for the user to decide. This is also discussed in the > >> MemoryDataManager > >> >>>>> section of the FLIP. > >> >>>>> > >> >>>>> Best, > >> >>>>> > >> >>>>> Xintong > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> On Fri, May 20, 2022 at 12:10 PM Aitozi <gjying1...@gmail.com> > >> wrote: > >> >>>>> > >> >>>>>> Thanks Weijie for your answer. So you mean the hybrid shuffle > mode > >> >> will > >> >>>>>> limit > >> >>>>>> its usage to the bounded source, Right ? > >> >>>>>> One more question, with the bounded data and partly of the stage > is > >> >>>>> running > >> >>>>>> in the Pipelined shuffle mode, what will be the behavior of the > >> task > >> >>>>>> failure, Is the > >> >>>>>> checkpoint enabled for these running stages or will it re-run > after > >> >> the > >> >>>>>> failure? > >> >>>>>> > >> >>>>>> Best, > >> >>>>>> Aitozi > >> >>>>>> > >> >>>>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月20日周五 10:45写道: > >> >>>>>> > >> >>>>>>> Hi, Aitozi: > >> >>>>>>> > >> >>>>>>> Thank you for the feedback! > >> >>>>>>> Here are some of my thoughts on your question > >> >>>>>>> > >> >>>>>>>>>> 1.If there is an unbounded data source, but only have > resource > >> to > >> >>>>>>> schedule the first stage, will it bring the big burden to the > >> >>>>>> disk/shuffle > >> >>>>>>> service which will occupy all the resource I think. > >> >>>>>>> First of all, Hybrid Shuffle Mode is oriented to the batch job > >> >>>>> scenario, > >> >>>>>> so > >> >>>>>>> there is no problem of unbounded data sources. Secondly, if you > >> >>>>> consider > >> >>>>>>> the stream scenario, I think Pipelined Shuffle should still be > the > >> >>>> best > >> >>>>>>> choice at present. For an unbounded data stream, it is not > >> meaningful > >> >>>>> to > >> >>>>>>> only run some stages. > >> >>>>>>> > >> >>>>>>>>>> 2. Which kind of job will benefit from the hybrid shuffle > mode. > >> >>>> In > >> >>>>>>> other words, In which case we can use the hybrid shuffle mode: > >> >>>>>>> Both general batch jobs and OLAP jobs benefit. For batch jobs, > >> hybrid > >> >>>>>>> shuffle mode can effectively utilize cluster resources and avoid > >> some > >> >>>>>>> unnecessary disk IO overhead. For OLAP scenarios, which are > >> >>>>> characterized > >> >>>>>>> by a large number of concurrently submitted short batch jobs, > >> hybrid > >> >>>>>>> shuffle can solve the scheduling deadlock problem of pipelined > >> >>>> shuffle > >> >>>>>> and > >> >>>>>>> achieve similar performance. > >> >>>>>>> > >> >>>>>>> Best regards, > >> >>>>>>> > >> >>>>>>> Weijie > >> >>>>>>> > >> >>>>>>> > >> >>>>>>> Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 08:05写道: > >> >>>>>>> > >> >>>>>>>> Hi Weijie: > >> >>>>>>>> > >> >>>>>>>> Thanks for the nice FLIP, I have couple questions about > >> this: > >> >>>>>>>> > >> >>>>>>>> 1) In the hybrid shuffle mode, the shuffle mode is decided by > the > >> >>>>>>> resource. > >> >>>>>>>> If there > >> >>>>>>>> is an unbounded data source, but only have resource to schedule > >> the > >> >>>>>> first > >> >>>>>>>> stage, will it > >> >>>>>>>> bring the big burden to the disk/shuffle service which will > >> occupy > >> >>>>> all > >> >>>>>>> the > >> >>>>>>>> resource I think. > >> >>>>>>>> > >> >>>>>>>> 2) Which kind of job will benefit from the hybrid shuffle mode. > >> In > >> >>>>>> other > >> >>>>>>>> words, In which > >> >>>>>>>> case we can use the hybrid shuffle mode: > >> >>>>>>>> - For batch job want to use more resource to reduce the e2e > time > >> ? > >> >>>>>>>> - Or for streaming job which may lack of resource temporarily ? > >> >>>>>>>> - Or for OLAP job which will try to make best use of available > >> >>>>>> resources > >> >>>>>>> as > >> >>>>>>>> you mentioned to finish the query? > >> >>>>>>>> Just want to know the typical use case for the Hybrid shuffle > >> mode > >> >>>> :) > >> >>>>>>>> Best, > >> >>>>>>>> Aitozi. > >> >>>>>>>> > >> >>>>>>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月19日周四 18:33写道: > >> >>>>>>>> > >> >>>>>>>>> Yangze, Thank you for the feedback! > >> >>>>>>>>> Here's my thoughts for your questions: > >> >>>>>>>>> > >> >>>>>>>>>>>> How do we decide the size of the buffer pool in > >> >>>>> MemoryDataManager > >> >>>>>>> and > >> >>>>>>>>> the read buffers in FileDataManager? > >> >>>>>>>>> The BufferPool in MemoryDataManager is the LocalBufferPool > used > >> >>>> by > >> >>>>>>>>> ResultPartition, and the size is the same as the current > >> >>>>>> implementation > >> >>>>>>>> of > >> >>>>>>>>> sort-merge shuffle. In other words, the minimum value of > >> >>>> BufferPool > >> >>>>>> is > >> >>>>>>> a > >> >>>>>>>>> configurable fixed value, and the maximum value is > Math.max(min, > >> >>>> 4 > >> >>>>> * > >> >>>>>>>>> numSubpartitions). The default value can be determined by > >> running > >> >>>>> the > >> >>>>>>>>> TPC-DS tests. > >> >>>>>>>>> Read buffers in FileDataManager are requested from the > >> >>>>>>>>> BatchShuffleReadBufferPool shared by TaskManager, it's size > >> >>>>>> controlled > >> >>>>>>> by > >> >>>>>>>>> *taskmanager.memory.framework.off-heap.batch-shuffle.size*, > the > >> >>>>>> default > >> >>>>>>>>> value is 32M, which is consistent with the current sort-merge > >> >>>>> shuffle > >> >>>>>>>>> logic. > >> >>>>>>>>> > >> >>>>>>>>>>>> Is there an upper limit for the sum of them? If there is, > how > >> >>>>>> does > >> >>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage? > >> >>>>>>>>> The buffers of the MemoryDataManager are limited by the size > of > >> >>>> the > >> >>>>>>>>> LocalBufferPool, and the upper limit is the size of the > Network > >> >>>>>> Memory. > >> >>>>>>>> The > >> >>>>>>>>> buffers of the FileDataManager are directly requested from > >> >>>>>>>>> UnpooledOffHeapMemory, and are also limited by the size of the > >> >>>>>>> framework > >> >>>>>>>>> off-heap memory. I think there should be no need for > additional > >> >>>>>>>>> synchronization mechanisms. > >> >>>>>>>>> > >> >>>>>>>>>>>> How do you disable the slot sharing? If user configures > both > >> >>>>> the > >> >>>>>>> slot > >> >>>>>>>>> sharing group and hybrid shuffle, what will happen to that > job? > >> >>>>>>>>> I think we can print a warning log when Hybrid Shuffle is > >> enabled > >> >>>>> and > >> >>>>>>> SSG > >> >>>>>>>>> is configured during the JobGraph compilation stage, and > >> fallback > >> >>>>> to > >> >>>>>>> the > >> >>>>>>>>> region slot sharing group by default. Of course, it will be > >> >>>>>> emphasized > >> >>>>>>> in > >> >>>>>>>>> the document that we do not currently support SSG, If > >> configured, > >> >>>>> it > >> >>>>>>> will > >> >>>>>>>>> fall back to the default. > >> >>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>>> Best regards, > >> >>>>>>>>> > >> >>>>>>>>> Weijie > >> >>>>>>>>> > >> >>>>>>>>> > >> >>>>>>>>> Yangze Guo <karma...@gmail.com> 于2022年5月19日周四 16:25写道: > >> >>>>>>>>> > >> >>>>>>>>>> Thanks for driving this. Xintong and Weijie. > >> >>>>>>>>>> > >> >>>>>>>>>> I believe this feature will make Flink a better batch/OLAP > >> >>>>> engine. > >> >>>>>> +1 > >> >>>>>>>>>> for the overall design. > >> >>>>>>>>>> > >> >>>>>>>>>> Some questions: > >> >>>>>>>>>> 1. How do we decide the size of the buffer pool in > >> >>>>>> MemoryDataManager > >> >>>>>>>>>> and the read buffers in FileDataManager? > >> >>>>>>>>>> 2. Is there an upper limit for the sum of them? If there is, > >> >>>> how > >> >>>>>> does > >> >>>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage? > >> >>>>>>>>>> 3. How do you disable the slot sharing? If user configures > both > >> >>>>> the > >> >>>>>>>>>> slot sharing group and hybrid shuffle, what will happen to > that > >> >>>>>> job? > >> >>>>>>>>>> Best, > >> >>>>>>>>>> Yangze Guo > >> >>>>>>>>>> > >> >>>>>>>>>> On Thu, May 19, 2022 at 2:41 PM Xintong Song < > >> >>>>>> tonysong...@gmail.com> > >> >>>>>>>>>> wrote: > >> >>>>>>>>>>> Thanks for preparing this FLIP, Weijie. > >> >>>>>>>>>>> > >> >>>>>>>>>>> I think this is a good improvement on batch resource > >> >>>>> elasticity. > >> >>>>>>>>> Looking > >> >>>>>>>>>>> forward to the community feedback. > >> >>>>>>>>>>> > >> >>>>>>>>>>> Best, > >> >>>>>>>>>>> > >> >>>>>>>>>>> Xintong > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> > >> >>>>>>>>>>> On Thu, May 19, 2022 at 2:31 PM weijie guo < > >> >>>>>>>> guoweijieres...@gmail.com> > >> >>>>>>>>>>> wrote: > >> >>>>>>>>>>> > >> >>>>>>>>>>>> Hi all, > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> I’d like to start a discussion about FLIP-235[1], which > >> >>>>>>> introduce a > >> >>>>>>>>>> new shuffle mode > >> >>>>>>>>>>>> can overcome some of the problems of Pipelined Shuffle > and > >> >>>>>>>> Blocking > >> >>>>>>>>>> Shuffle in batch scenarios. > >> >>>>>>>>>>>> Currently in Flink, task scheduling is more or less > >> >>>>> constrained > >> >>>>>>> by > >> >>>>>>>>> the > >> >>>>>>>>>> shuffle implementations. > >> >>>>>>>>>>>> This will bring the following disadvantages: > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> 1. Pipelined Shuffle: > >> >>>>>>>>>>>> For pipelined shuffle, the upstream and downstream > >> >>>> tasks > >> >>>>>> are > >> >>>>>>>>>> required to be deployed at the same time, to avoid upstream > >> >>>> tasks > >> >>>>>>> being > >> >>>>>>>>>> blocked forever. This is fine when there are enough resources > >> >>>> for > >> >>>>>>> both > >> >>>>>>>>>> upstream and downstream tasks to run simultaneously, but will > >> >>>>> cause > >> >>>>>>> the > >> >>>>>>>>>> following problems otherwise: > >> >>>>>>>>>>>> 1. > >> >>>>>>>>>>>> Pipelined shuffle connected tasks (i.e., a > pipelined > >> >>>>>>> region) > >> >>>>>>>>>> cannot be executed until obtaining resources for all of them, > >> >>>>>>> resulting > >> >>>>>>>>> in > >> >>>>>>>>>> longer job finishing time and poorer resource efficiency due > to > >> >>>>>>> holding > >> >>>>>>>>>> part of the resources idle while waiting for the rest. > >> >>>>>>>>>>>> 2. > >> >>>>>>>>>>>> More severely, if multiple jobs each hold part of > the > >> >>>>>>> cluster > >> >>>>>>>>>> resources and are waiting for more, a deadlock would occur. > The > >> >>>>>>> chance > >> >>>>>>>> is > >> >>>>>>>>>> not trivial, especially for scenarios such as OLAP where > >> >>>>> concurrent > >> >>>>>>> job > >> >>>>>>>>>> submissions are frequent. > >> >>>>>>>>>>>> 2. Blocking Shuffle: > >> >>>>>>>>>>>> For blocking shuffle, execution of downstream tasks > >> >>>> must > >> >>>>>> wait > >> >>>>>>>> for > >> >>>>>>>>>> all upstream tasks to finish, despite there might be more > >> >>>>> resources > >> >>>>>>>>>> available. The sequential execution of upstream and > downstream > >> >>>>>> tasks > >> >>>>>>>>>> significantly increase the job finishing time, and the disk > IO > >> >>>>>>> workload > >> >>>>>>>>> for > >> >>>>>>>>>> spilling and loading full intermediate data also affects the > >> >>>>>>>> performance. > >> >>>>>>>>>>>> We believe the root cause of the above problems is that > >> >>>>> shuffle > >> >>>>>>>>>> implementations put unnecessary constraints on task > scheduling. > >> >>>>>>>>>>>> To solve this problem, Xintong Song and I propose to > >> >>>>> introduce > >> >>>>>>>> hybrid > >> >>>>>>>>>> shuffle to minimize the scheduling constraints. With Hybrid > >> >>>>>> Shuffle, > >> >>>>>>>>> Flink > >> >>>>>>>>>> should: > >> >>>>>>>>>>>> 1. Make best use of available resources. > >> >>>>>>>>>>>> Ideally, we want Flink to always make progress if > >> >>>>> possible. > >> >>>>>>>> That > >> >>>>>>>>>> is to say, it should always execute a pending task if there > are > >> >>>>>>>> resources > >> >>>>>>>>>> available for that task. > >> >>>>>>>>>>>> 2. Minimize disk IO load. > >> >>>>>>>>>>>> In-flight data should be consumed directly from > memory > >> >>>> as > >> >>>>>>> much > >> >>>>>>>> as > >> >>>>>>>>>> possible. Only data that is not consumed timely should be > >> >>>> spilled > >> >>>>>> to > >> >>>>>>>>> disk. > >> >>>>>>>>>>>> You can find more details in FLIP-235. Looking forward to > >> >>>>> your > >> >>>>>>>>>> feedback. > >> >>>>>>>>>>>> [1] > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> > >> >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> Best regards, > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> Weijie > >> >>>>>>>>>>>> > >> >> > >> > >> > > >