> > Will this also allow spilling everything to disk while also forwarding > data to the next task? >
Yes, as long as the downstream task is started, this always forward the data, even while spilling everything. This would allow us to improve fine-grained recovery by no longer being > constrained to pipelined regions. I think it helps preventing restarts of the upstreams for a failed task, but not the downstreams. Because there's no guarantee a restarted task will prevent exactly same data (in terms of order) as the previous execution, thus downstreams cannot resume consuming the data. Best, Xintong On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler <ches...@apache.org> wrote: > Will this also allow spilling everything to disk while also forwarding > data to the next task? > > This would allow us to improve fine-grained recovery by no longer being > constrained to pipelined regions. > > On 25/05/2022 05:55, weijie guo wrote: > > Hi All, > > Thank you for your attention and feedback. > > Do you have any other comments? If there are no other questions, I'll > vote > > on FLIP-235 tomorrow. > > > > Best regards, > > > > Weijie > > > > > > Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 13:22写道: > > > >> Hi Xintong > >> Thanks for your detailed explanation, I misunderstand the spill > >> behavior at first glance, > >> I get your point now. I think it will be a good addition to the current > >> execution mode. > >> Looking forward to it :) > >> > >> Best, > >> Aitozi > >> > >> Xintong Song <tonysong...@gmail.com> 于2022年5月20日周五 12:26写道: > >> > >>> Hi Aitozi, > >>> > >>> In which case we can use the hybrid shuffle mode > >>> > >>> Just to directly answer this question, in addition to > >>> Weijie's explanations. For batch workload, if you want the workload to > >> take > >>> advantage of as many resources as available, which ranges from a single > >>> slot to as many slots as the total tasks, you may consider hybrid > shuffle > >>> mode. Admittedly, this may not always be wanted, e.g., users may not > want > >>> to execute a job if there's too few resources available, or may not > want > >> a > >>> job taking too many of the cluster resources. That's why we propose > >> hybrid > >>> shuffle as an additional option for batch users, rather than a > >> replacement > >>> for Pipelined or Blocking mode. > >>> > >>> So you mean the hybrid shuffle mode will limit its usage to the bounded > >>>> source, Right ? > >>>> > >>> Yes. > >>> > >>> One more question, with the bounded data and partly of the stage is > >> running > >>>> in the Pipelined shuffle mode, what will be the behavior of the task > >>>> failure, Is the checkpoint enabled for these running stages or will it > >>>> re-run after the failure? > >>>> > >>> There's no checkpoints. The failover behavior depends on the spilling > >>> strategy. > >>> - In the first version, we only consider a selective spilling strategy, > >>> which means spill data as little as possible to the disk, which means > in > >>> case of failover upstream tasks need to be restarted to reproduce the > >>> complete intermediate results. > >>> - An alternative strategy we may introduce in future if needed is to > >> spill > >>> the complete intermediate results. That avoids restarting upstream > tasks > >> in > >>> case of failover, because the produced intermediate results can be > >>> re-consumed, at the cost of more disk IO load. > >>> With both strategies, the trade-off between failover cost and IO load > is > >>> for the user to decide. This is also discussed in the MemoryDataManager > >>> section of the FLIP. > >>> > >>> Best, > >>> > >>> Xintong > >>> > >>> > >>> > >>> On Fri, May 20, 2022 at 12:10 PM Aitozi <gjying1...@gmail.com> wrote: > >>> > >>>> Thanks Weijie for your answer. So you mean the hybrid shuffle mode > will > >>>> limit > >>>> its usage to the bounded source, Right ? > >>>> One more question, with the bounded data and partly of the stage is > >>> running > >>>> in the Pipelined shuffle mode, what will be the behavior of the task > >>>> failure, Is the > >>>> checkpoint enabled for these running stages or will it re-run after > the > >>>> failure? > >>>> > >>>> Best, > >>>> Aitozi > >>>> > >>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月20日周五 10:45写道: > >>>> > >>>>> Hi, Aitozi: > >>>>> > >>>>> Thank you for the feedback! > >>>>> Here are some of my thoughts on your question > >>>>> > >>>>>>>> 1.If there is an unbounded data source, but only have resource to > >>>>> schedule the first stage, will it bring the big burden to the > >>>> disk/shuffle > >>>>> service which will occupy all the resource I think. > >>>>> First of all, Hybrid Shuffle Mode is oriented to the batch job > >>> scenario, > >>>> so > >>>>> there is no problem of unbounded data sources. Secondly, if you > >>> consider > >>>>> the stream scenario, I think Pipelined Shuffle should still be the > >> best > >>>>> choice at present. For an unbounded data stream, it is not meaningful > >>> to > >>>>> only run some stages. > >>>>> > >>>>>>>> 2. Which kind of job will benefit from the hybrid shuffle mode. > >> In > >>>>> other words, In which case we can use the hybrid shuffle mode: > >>>>> Both general batch jobs and OLAP jobs benefit. For batch jobs, hybrid > >>>>> shuffle mode can effectively utilize cluster resources and avoid some > >>>>> unnecessary disk IO overhead. For OLAP scenarios, which are > >>> characterized > >>>>> by a large number of concurrently submitted short batch jobs, hybrid > >>>>> shuffle can solve the scheduling deadlock problem of pipelined > >> shuffle > >>>> and > >>>>> achieve similar performance. > >>>>> > >>>>> Best regards, > >>>>> > >>>>> Weijie > >>>>> > >>>>> > >>>>> Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 08:05写道: > >>>>> > >>>>>> Hi Weijie: > >>>>>> > >>>>>> Thanks for the nice FLIP, I have couple questions about this: > >>>>>> > >>>>>> 1) In the hybrid shuffle mode, the shuffle mode is decided by the > >>>>> resource. > >>>>>> If there > >>>>>> is an unbounded data source, but only have resource to schedule the > >>>> first > >>>>>> stage, will it > >>>>>> bring the big burden to the disk/shuffle service which will occupy > >>> all > >>>>> the > >>>>>> resource I think. > >>>>>> > >>>>>> 2) Which kind of job will benefit from the hybrid shuffle mode. In > >>>> other > >>>>>> words, In which > >>>>>> case we can use the hybrid shuffle mode: > >>>>>> - For batch job want to use more resource to reduce the e2e time ? > >>>>>> - Or for streaming job which may lack of resource temporarily ? > >>>>>> - Or for OLAP job which will try to make best use of available > >>>> resources > >>>>> as > >>>>>> you mentioned to finish the query? > >>>>>> Just want to know the typical use case for the Hybrid shuffle mode > >> :) > >>>>>> > >>>>>> Best, > >>>>>> Aitozi. > >>>>>> > >>>>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月19日周四 18:33写道: > >>>>>> > >>>>>>> Yangze, Thank you for the feedback! > >>>>>>> Here's my thoughts for your questions: > >>>>>>> > >>>>>>>>>> How do we decide the size of the buffer pool in > >>> MemoryDataManager > >>>>> and > >>>>>>> the read buffers in FileDataManager? > >>>>>>> The BufferPool in MemoryDataManager is the LocalBufferPool used > >> by > >>>>>>> ResultPartition, and the size is the same as the current > >>>> implementation > >>>>>> of > >>>>>>> sort-merge shuffle. In other words, the minimum value of > >> BufferPool > >>>> is > >>>>> a > >>>>>>> configurable fixed value, and the maximum value is Math.max(min, > >> 4 > >>> * > >>>>>>> numSubpartitions). The default value can be determined by running > >>> the > >>>>>>> TPC-DS tests. > >>>>>>> Read buffers in FileDataManager are requested from the > >>>>>>> BatchShuffleReadBufferPool shared by TaskManager, it's size > >>>> controlled > >>>>> by > >>>>>>> *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the > >>>> default > >>>>>>> value is 32M, which is consistent with the current sort-merge > >>> shuffle > >>>>>>> logic. > >>>>>>> > >>>>>>>>>> Is there an upper limit for the sum of them? If there is, how > >>>> does > >>>>>>> MemoryDataManager and FileDataManager sync the memory usage? > >>>>>>> The buffers of the MemoryDataManager are limited by the size of > >> the > >>>>>>> LocalBufferPool, and the upper limit is the size of the Network > >>>> Memory. > >>>>>> The > >>>>>>> buffers of the FileDataManager are directly requested from > >>>>>>> UnpooledOffHeapMemory, and are also limited by the size of the > >>>>> framework > >>>>>>> off-heap memory. I think there should be no need for additional > >>>>>>> synchronization mechanisms. > >>>>>>> > >>>>>>>>>> How do you disable the slot sharing? If user configures both > >>> the > >>>>> slot > >>>>>>> sharing group and hybrid shuffle, what will happen to that job? > >>>>>>> I think we can print a warning log when Hybrid Shuffle is enabled > >>> and > >>>>> SSG > >>>>>>> is configured during the JobGraph compilation stage, and fallback > >>> to > >>>>> the > >>>>>>> region slot sharing group by default. Of course, it will be > >>>> emphasized > >>>>> in > >>>>>>> the document that we do not currently support SSG, If configured, > >>> it > >>>>> will > >>>>>>> fall back to the default. > >>>>>>> > >>>>>>> > >>>>>>> Best regards, > >>>>>>> > >>>>>>> Weijie > >>>>>>> > >>>>>>> > >>>>>>> Yangze Guo <karma...@gmail.com> 于2022年5月19日周四 16:25写道: > >>>>>>> > >>>>>>>> Thanks for driving this. Xintong and Weijie. > >>>>>>>> > >>>>>>>> I believe this feature will make Flink a better batch/OLAP > >>> engine. > >>>> +1 > >>>>>>>> for the overall design. > >>>>>>>> > >>>>>>>> Some questions: > >>>>>>>> 1. How do we decide the size of the buffer pool in > >>>> MemoryDataManager > >>>>>>>> and the read buffers in FileDataManager? > >>>>>>>> 2. Is there an upper limit for the sum of them? If there is, > >> how > >>>> does > >>>>>>>> MemoryDataManager and FileDataManager sync the memory usage? > >>>>>>>> 3. How do you disable the slot sharing? If user configures both > >>> the > >>>>>>>> slot sharing group and hybrid shuffle, what will happen to that > >>>> job? > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Yangze Guo > >>>>>>>> > >>>>>>>> On Thu, May 19, 2022 at 2:41 PM Xintong Song < > >>>> tonysong...@gmail.com> > >>>>>>>> wrote: > >>>>>>>>> Thanks for preparing this FLIP, Weijie. > >>>>>>>>> > >>>>>>>>> I think this is a good improvement on batch resource > >>> elasticity. > >>>>>>> Looking > >>>>>>>>> forward to the community feedback. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> > >>>>>>>>> Xintong > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Thu, May 19, 2022 at 2:31 PM weijie guo < > >>>>>> guoweijieres...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi all, > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> I’d like to start a discussion about FLIP-235[1], which > >>>>> introduce a > >>>>>>>> new shuffle mode > >>>>>>>>>> can overcome some of the problems of Pipelined Shuffle and > >>>>>> Blocking > >>>>>>>> Shuffle in batch scenarios. > >>>>>>>>>> > >>>>>>>>>> Currently in Flink, task scheduling is more or less > >>> constrained > >>>>> by > >>>>>>> the > >>>>>>>> shuffle implementations. > >>>>>>>>>> This will bring the following disadvantages: > >>>>>>>>>> > >>>>>>>>>> 1. Pipelined Shuffle: > >>>>>>>>>> For pipelined shuffle, the upstream and downstream > >> tasks > >>>> are > >>>>>>>> required to be deployed at the same time, to avoid upstream > >> tasks > >>>>> being > >>>>>>>> blocked forever. This is fine when there are enough resources > >> for > >>>>> both > >>>>>>>> upstream and downstream tasks to run simultaneously, but will > >>> cause > >>>>> the > >>>>>>>> following problems otherwise: > >>>>>>>>>> 1. > >>>>>>>>>> Pipelined shuffle connected tasks (i.e., a pipelined > >>>>> region) > >>>>>>>> cannot be executed until obtaining resources for all of them, > >>>>> resulting > >>>>>>> in > >>>>>>>> longer job finishing time and poorer resource efficiency due to > >>>>> holding > >>>>>>>> part of the resources idle while waiting for the rest. > >>>>>>>>>> 2. > >>>>>>>>>> More severely, if multiple jobs each hold part of the > >>>>> cluster > >>>>>>>> resources and are waiting for more, a deadlock would occur. The > >>>>> chance > >>>>>> is > >>>>>>>> not trivial, especially for scenarios such as OLAP where > >>> concurrent > >>>>> job > >>>>>>>> submissions are frequent. > >>>>>>>>>> 2. Blocking Shuffle: > >>>>>>>>>> For blocking shuffle, execution of downstream tasks > >> must > >>>> wait > >>>>>> for > >>>>>>>> all upstream tasks to finish, despite there might be more > >>> resources > >>>>>>>> available. The sequential execution of upstream and downstream > >>>> tasks > >>>>>>>> significantly increase the job finishing time, and the disk IO > >>>>> workload > >>>>>>> for > >>>>>>>> spilling and loading full intermediate data also affects the > >>>>>> performance. > >>>>>>>>>> > >>>>>>>>>> We believe the root cause of the above problems is that > >>> shuffle > >>>>>>>> implementations put unnecessary constraints on task scheduling. > >>>>>>>>>> > >>>>>>>>>> To solve this problem, Xintong Song and I propose to > >>> introduce > >>>>>> hybrid > >>>>>>>> shuffle to minimize the scheduling constraints. With Hybrid > >>>> Shuffle, > >>>>>>> Flink > >>>>>>>> should: > >>>>>>>>>> 1. Make best use of available resources. > >>>>>>>>>> Ideally, we want Flink to always make progress if > >>> possible. > >>>>>> That > >>>>>>>> is to say, it should always execute a pending task if there are > >>>>>> resources > >>>>>>>> available for that task. > >>>>>>>>>> 2. Minimize disk IO load. > >>>>>>>>>> In-flight data should be consumed directly from memory > >> as > >>>>> much > >>>>>> as > >>>>>>>> possible. Only data that is not consumed timely should be > >> spilled > >>>> to > >>>>>>> disk. > >>>>>>>>>> You can find more details in FLIP-235. Looking forward to > >>> your > >>>>>>>> feedback. > >>>>>>>>>> > >>>>>>>>>> [1] > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Best regards, > >>>>>>>>>> > >>>>>>>>>> Weijie > >>>>>>>>>> > >