In general, I agree with you about aiming jobs with no/few blocking exchanges for fine-grained recovery. The only problem is, correct me if I'm wrong, users currently cannot control the data exchanging mode of a specific edge. I'm not aware of such APIs.
As a first step, I'd prefer excluding this from the scope of this FLIP. Best, Xintong On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler <ches...@apache.org> wrote: > Yes; but that's also a limitation of the current fine-grained recovery. > > My suggestion was primarily aimed at jobs that have no/few blocking > exchanges, where users would currently have to explicitly configure > additional blocking exchanges to really get something out of > fine-grained recovery (at the expense of e2e job duration). > > On 25/05/2022 14:47, Xintong Song wrote: > >> Will this also allow spilling everything to disk while also forwarding > >> data to the next task? > >> > > Yes, as long as the downstream task is started, this always forward the > > data, even while spilling everything. > > > > This would allow us to improve fine-grained recovery by no longer being > >> constrained to pipelined regions. > > > > I think it helps preventing restarts of the upstreams for a failed task, > > but not the downstreams. Because there's no guarantee a restarted task > will > > prevent exactly same data (in terms of order) as the previous execution, > > thus downstreams cannot resume consuming the data. > > > > > > Best, > > > > Xintong > > > > > > > > On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler <ches...@apache.org> > wrote: > > > >> Will this also allow spilling everything to disk while also forwarding > >> data to the next task? > >> > >> This would allow us to improve fine-grained recovery by no longer being > >> constrained to pipelined regions. > >> > >> On 25/05/2022 05:55, weijie guo wrote: > >>> Hi All, > >>> Thank you for your attention and feedback. > >>> Do you have any other comments? If there are no other questions, I'll > >> vote > >>> on FLIP-235 tomorrow. > >>> > >>> Best regards, > >>> > >>> Weijie > >>> > >>> > >>> Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 13:22写道: > >>> > >>>> Hi Xintong > >>>> Thanks for your detailed explanation, I misunderstand the spill > >>>> behavior at first glance, > >>>> I get your point now. I think it will be a good addition to the > current > >>>> execution mode. > >>>> Looking forward to it :) > >>>> > >>>> Best, > >>>> Aitozi > >>>> > >>>> Xintong Song <tonysong...@gmail.com> 于2022年5月20日周五 12:26写道: > >>>> > >>>>> Hi Aitozi, > >>>>> > >>>>> In which case we can use the hybrid shuffle mode > >>>>> > >>>>> Just to directly answer this question, in addition to > >>>>> Weijie's explanations. For batch workload, if you want the workload > to > >>>> take > >>>>> advantage of as many resources as available, which ranges from a > single > >>>>> slot to as many slots as the total tasks, you may consider hybrid > >> shuffle > >>>>> mode. Admittedly, this may not always be wanted, e.g., users may not > >> want > >>>>> to execute a job if there's too few resources available, or may not > >> want > >>>> a > >>>>> job taking too many of the cluster resources. That's why we propose > >>>> hybrid > >>>>> shuffle as an additional option for batch users, rather than a > >>>> replacement > >>>>> for Pipelined or Blocking mode. > >>>>> > >>>>> So you mean the hybrid shuffle mode will limit its usage to the > bounded > >>>>>> source, Right ? > >>>>>> > >>>>> Yes. > >>>>> > >>>>> One more question, with the bounded data and partly of the stage is > >>>> running > >>>>>> in the Pipelined shuffle mode, what will be the behavior of the task > >>>>>> failure, Is the checkpoint enabled for these running stages or will > it > >>>>>> re-run after the failure? > >>>>>> > >>>>> There's no checkpoints. The failover behavior depends on the spilling > >>>>> strategy. > >>>>> - In the first version, we only consider a selective spilling > strategy, > >>>>> which means spill data as little as possible to the disk, which means > >> in > >>>>> case of failover upstream tasks need to be restarted to reproduce the > >>>>> complete intermediate results. > >>>>> - An alternative strategy we may introduce in future if needed is to > >>>> spill > >>>>> the complete intermediate results. That avoids restarting upstream > >> tasks > >>>> in > >>>>> case of failover, because the produced intermediate results can be > >>>>> re-consumed, at the cost of more disk IO load. > >>>>> With both strategies, the trade-off between failover cost and IO load > >> is > >>>>> for the user to decide. This is also discussed in the > MemoryDataManager > >>>>> section of the FLIP. > >>>>> > >>>>> Best, > >>>>> > >>>>> Xintong > >>>>> > >>>>> > >>>>> > >>>>> On Fri, May 20, 2022 at 12:10 PM Aitozi <gjying1...@gmail.com> > wrote: > >>>>> > >>>>>> Thanks Weijie for your answer. So you mean the hybrid shuffle mode > >> will > >>>>>> limit > >>>>>> its usage to the bounded source, Right ? > >>>>>> One more question, with the bounded data and partly of the stage is > >>>>> running > >>>>>> in the Pipelined shuffle mode, what will be the behavior of the task > >>>>>> failure, Is the > >>>>>> checkpoint enabled for these running stages or will it re-run after > >> the > >>>>>> failure? > >>>>>> > >>>>>> Best, > >>>>>> Aitozi > >>>>>> > >>>>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月20日周五 10:45写道: > >>>>>> > >>>>>>> Hi, Aitozi: > >>>>>>> > >>>>>>> Thank you for the feedback! > >>>>>>> Here are some of my thoughts on your question > >>>>>>> > >>>>>>>>>> 1.If there is an unbounded data source, but only have resource > to > >>>>>>> schedule the first stage, will it bring the big burden to the > >>>>>> disk/shuffle > >>>>>>> service which will occupy all the resource I think. > >>>>>>> First of all, Hybrid Shuffle Mode is oriented to the batch job > >>>>> scenario, > >>>>>> so > >>>>>>> there is no problem of unbounded data sources. Secondly, if you > >>>>> consider > >>>>>>> the stream scenario, I think Pipelined Shuffle should still be the > >>>> best > >>>>>>> choice at present. For an unbounded data stream, it is not > meaningful > >>>>> to > >>>>>>> only run some stages. > >>>>>>> > >>>>>>>>>> 2. Which kind of job will benefit from the hybrid shuffle mode. > >>>> In > >>>>>>> other words, In which case we can use the hybrid shuffle mode: > >>>>>>> Both general batch jobs and OLAP jobs benefit. For batch jobs, > hybrid > >>>>>>> shuffle mode can effectively utilize cluster resources and avoid > some > >>>>>>> unnecessary disk IO overhead. For OLAP scenarios, which are > >>>>> characterized > >>>>>>> by a large number of concurrently submitted short batch jobs, > hybrid > >>>>>>> shuffle can solve the scheduling deadlock problem of pipelined > >>>> shuffle > >>>>>> and > >>>>>>> achieve similar performance. > >>>>>>> > >>>>>>> Best regards, > >>>>>>> > >>>>>>> Weijie > >>>>>>> > >>>>>>> > >>>>>>> Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 08:05写道: > >>>>>>> > >>>>>>>> Hi Weijie: > >>>>>>>> > >>>>>>>> Thanks for the nice FLIP, I have couple questions about > this: > >>>>>>>> > >>>>>>>> 1) In the hybrid shuffle mode, the shuffle mode is decided by the > >>>>>>> resource. > >>>>>>>> If there > >>>>>>>> is an unbounded data source, but only have resource to schedule > the > >>>>>> first > >>>>>>>> stage, will it > >>>>>>>> bring the big burden to the disk/shuffle service which will occupy > >>>>> all > >>>>>>> the > >>>>>>>> resource I think. > >>>>>>>> > >>>>>>>> 2) Which kind of job will benefit from the hybrid shuffle mode. In > >>>>>> other > >>>>>>>> words, In which > >>>>>>>> case we can use the hybrid shuffle mode: > >>>>>>>> - For batch job want to use more resource to reduce the e2e time ? > >>>>>>>> - Or for streaming job which may lack of resource temporarily ? > >>>>>>>> - Or for OLAP job which will try to make best use of available > >>>>>> resources > >>>>>>> as > >>>>>>>> you mentioned to finish the query? > >>>>>>>> Just want to know the typical use case for the Hybrid shuffle mode > >>>> :) > >>>>>>>> Best, > >>>>>>>> Aitozi. > >>>>>>>> > >>>>>>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月19日周四 18:33写道: > >>>>>>>> > >>>>>>>>> Yangze, Thank you for the feedback! > >>>>>>>>> Here's my thoughts for your questions: > >>>>>>>>> > >>>>>>>>>>>> How do we decide the size of the buffer pool in > >>>>> MemoryDataManager > >>>>>>> and > >>>>>>>>> the read buffers in FileDataManager? > >>>>>>>>> The BufferPool in MemoryDataManager is the LocalBufferPool used > >>>> by > >>>>>>>>> ResultPartition, and the size is the same as the current > >>>>>> implementation > >>>>>>>> of > >>>>>>>>> sort-merge shuffle. In other words, the minimum value of > >>>> BufferPool > >>>>>> is > >>>>>>> a > >>>>>>>>> configurable fixed value, and the maximum value is Math.max(min, > >>>> 4 > >>>>> * > >>>>>>>>> numSubpartitions). The default value can be determined by running > >>>>> the > >>>>>>>>> TPC-DS tests. > >>>>>>>>> Read buffers in FileDataManager are requested from the > >>>>>>>>> BatchShuffleReadBufferPool shared by TaskManager, it's size > >>>>>> controlled > >>>>>>> by > >>>>>>>>> *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the > >>>>>> default > >>>>>>>>> value is 32M, which is consistent with the current sort-merge > >>>>> shuffle > >>>>>>>>> logic. > >>>>>>>>> > >>>>>>>>>>>> Is there an upper limit for the sum of them? If there is, how > >>>>>> does > >>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage? > >>>>>>>>> The buffers of the MemoryDataManager are limited by the size of > >>>> the > >>>>>>>>> LocalBufferPool, and the upper limit is the size of the Network > >>>>>> Memory. > >>>>>>>> The > >>>>>>>>> buffers of the FileDataManager are directly requested from > >>>>>>>>> UnpooledOffHeapMemory, and are also limited by the size of the > >>>>>>> framework > >>>>>>>>> off-heap memory. I think there should be no need for additional > >>>>>>>>> synchronization mechanisms. > >>>>>>>>> > >>>>>>>>>>>> How do you disable the slot sharing? If user configures both > >>>>> the > >>>>>>> slot > >>>>>>>>> sharing group and hybrid shuffle, what will happen to that job? > >>>>>>>>> I think we can print a warning log when Hybrid Shuffle is enabled > >>>>> and > >>>>>>> SSG > >>>>>>>>> is configured during the JobGraph compilation stage, and fallback > >>>>> to > >>>>>>> the > >>>>>>>>> region slot sharing group by default. Of course, it will be > >>>>>> emphasized > >>>>>>> in > >>>>>>>>> the document that we do not currently support SSG, If configured, > >>>>> it > >>>>>>> will > >>>>>>>>> fall back to the default. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Best regards, > >>>>>>>>> > >>>>>>>>> Weijie > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Yangze Guo <karma...@gmail.com> 于2022年5月19日周四 16:25写道: > >>>>>>>>> > >>>>>>>>>> Thanks for driving this. Xintong and Weijie. > >>>>>>>>>> > >>>>>>>>>> I believe this feature will make Flink a better batch/OLAP > >>>>> engine. > >>>>>> +1 > >>>>>>>>>> for the overall design. > >>>>>>>>>> > >>>>>>>>>> Some questions: > >>>>>>>>>> 1. How do we decide the size of the buffer pool in > >>>>>> MemoryDataManager > >>>>>>>>>> and the read buffers in FileDataManager? > >>>>>>>>>> 2. Is there an upper limit for the sum of them? If there is, > >>>> how > >>>>>> does > >>>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage? > >>>>>>>>>> 3. How do you disable the slot sharing? If user configures both > >>>>> the > >>>>>>>>>> slot sharing group and hybrid shuffle, what will happen to that > >>>>>> job? > >>>>>>>>>> Best, > >>>>>>>>>> Yangze Guo > >>>>>>>>>> > >>>>>>>>>> On Thu, May 19, 2022 at 2:41 PM Xintong Song < > >>>>>> tonysong...@gmail.com> > >>>>>>>>>> wrote: > >>>>>>>>>>> Thanks for preparing this FLIP, Weijie. > >>>>>>>>>>> > >>>>>>>>>>> I think this is a good improvement on batch resource > >>>>> elasticity. > >>>>>>>>> Looking > >>>>>>>>>>> forward to the community feedback. > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> > >>>>>>>>>>> Xintong > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Thu, May 19, 2022 at 2:31 PM weijie guo < > >>>>>>>> guoweijieres...@gmail.com> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi all, > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> I’d like to start a discussion about FLIP-235[1], which > >>>>>>> introduce a > >>>>>>>>>> new shuffle mode > >>>>>>>>>>>> can overcome some of the problems of Pipelined Shuffle and > >>>>>>>> Blocking > >>>>>>>>>> Shuffle in batch scenarios. > >>>>>>>>>>>> Currently in Flink, task scheduling is more or less > >>>>> constrained > >>>>>>> by > >>>>>>>>> the > >>>>>>>>>> shuffle implementations. > >>>>>>>>>>>> This will bring the following disadvantages: > >>>>>>>>>>>> > >>>>>>>>>>>> 1. Pipelined Shuffle: > >>>>>>>>>>>> For pipelined shuffle, the upstream and downstream > >>>> tasks > >>>>>> are > >>>>>>>>>> required to be deployed at the same time, to avoid upstream > >>>> tasks > >>>>>>> being > >>>>>>>>>> blocked forever. This is fine when there are enough resources > >>>> for > >>>>>>> both > >>>>>>>>>> upstream and downstream tasks to run simultaneously, but will > >>>>> cause > >>>>>>> the > >>>>>>>>>> following problems otherwise: > >>>>>>>>>>>> 1. > >>>>>>>>>>>> Pipelined shuffle connected tasks (i.e., a pipelined > >>>>>>> region) > >>>>>>>>>> cannot be executed until obtaining resources for all of them, > >>>>>>> resulting > >>>>>>>>> in > >>>>>>>>>> longer job finishing time and poorer resource efficiency due to > >>>>>>> holding > >>>>>>>>>> part of the resources idle while waiting for the rest. > >>>>>>>>>>>> 2. > >>>>>>>>>>>> More severely, if multiple jobs each hold part of the > >>>>>>> cluster > >>>>>>>>>> resources and are waiting for more, a deadlock would occur. The > >>>>>>> chance > >>>>>>>> is > >>>>>>>>>> not trivial, especially for scenarios such as OLAP where > >>>>> concurrent > >>>>>>> job > >>>>>>>>>> submissions are frequent. > >>>>>>>>>>>> 2. Blocking Shuffle: > >>>>>>>>>>>> For blocking shuffle, execution of downstream tasks > >>>> must > >>>>>> wait > >>>>>>>> for > >>>>>>>>>> all upstream tasks to finish, despite there might be more > >>>>> resources > >>>>>>>>>> available. The sequential execution of upstream and downstream > >>>>>> tasks > >>>>>>>>>> significantly increase the job finishing time, and the disk IO > >>>>>>> workload > >>>>>>>>> for > >>>>>>>>>> spilling and loading full intermediate data also affects the > >>>>>>>> performance. > >>>>>>>>>>>> We believe the root cause of the above problems is that > >>>>> shuffle > >>>>>>>>>> implementations put unnecessary constraints on task scheduling. > >>>>>>>>>>>> To solve this problem, Xintong Song and I propose to > >>>>> introduce > >>>>>>>> hybrid > >>>>>>>>>> shuffle to minimize the scheduling constraints. With Hybrid > >>>>>> Shuffle, > >>>>>>>>> Flink > >>>>>>>>>> should: > >>>>>>>>>>>> 1. Make best use of available resources. > >>>>>>>>>>>> Ideally, we want Flink to always make progress if > >>>>> possible. > >>>>>>>> That > >>>>>>>>>> is to say, it should always execute a pending task if there are > >>>>>>>> resources > >>>>>>>>>> available for that task. > >>>>>>>>>>>> 2. Minimize disk IO load. > >>>>>>>>>>>> In-flight data should be consumed directly from memory > >>>> as > >>>>>>> much > >>>>>>>> as > >>>>>>>>>> possible. Only data that is not consumed timely should be > >>>> spilled > >>>>>> to > >>>>>>>>> disk. > >>>>>>>>>>>> You can find more details in FLIP-235. Looking forward to > >>>>> your > >>>>>>>>>> feedback. > >>>>>>>>>>>> [1] > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode > >>>>>>>>>>>> > >>>>>>>>>>>> Best regards, > >>>>>>>>>>>> > >>>>>>>>>>>> Weijie > >>>>>>>>>>>> > >> > >