Ok, I think we are on the same page. I'm aware of ExecutionConfig#setExecutionMode, which sets the data exchanging mode at the job scope.
Best, Xintong On Wed, May 25, 2022 at 9:50 PM Chesnay Schepler <ches...@apache.org> wrote: > You can influence it to some extent via ExecutionConfig#setExecutionMode. > You can for example for all shuffles to use blocking exchanges. > > I'm not proposing an API that would allow this to be set per edge. > > On 25/05/2022 15:23, Xintong Song wrote: > > In general, I agree with you about aiming jobs with no/few blocking > exchanges for fine-grained recovery. The only problem is, correct me if I'm > wrong, users currently cannot control the data exchanging mode of a > specific edge. I'm not aware of such APIs. > > As a first step, I'd prefer excluding this from the scope of this FLIP. > > Best, > > Xintong > > > On Wed, May 25, 2022 at 8:54 PM Chesnay Schepler <ches...@apache.org> > wrote: > >> Yes; but that's also a limitation of the current fine-grained recovery. >> >> My suggestion was primarily aimed at jobs that have no/few blocking >> exchanges, where users would currently have to explicitly configure >> additional blocking exchanges to really get something out of >> fine-grained recovery (at the expense of e2e job duration). >> >> On 25/05/2022 14:47, Xintong Song wrote: >> >> Will this also allow spilling everything to disk while also forwarding >> >> data to the next task? >> >> >> > Yes, as long as the downstream task is started, this always forward the >> > data, even while spilling everything. >> > >> > This would allow us to improve fine-grained recovery by no longer being >> >> constrained to pipelined regions. >> > >> > I think it helps preventing restarts of the upstreams for a failed task, >> > but not the downstreams. Because there's no guarantee a restarted task >> will >> > prevent exactly same data (in terms of order) as the previous execution, >> > thus downstreams cannot resume consuming the data. >> > >> > >> > Best, >> > >> > Xintong >> > >> > >> > >> > On Wed, May 25, 2022 at 3:05 PM Chesnay Schepler <ches...@apache.org> >> wrote: >> > >> >> Will this also allow spilling everything to disk while also forwarding >> >> data to the next task? >> >> >> >> This would allow us to improve fine-grained recovery by no longer being >> >> constrained to pipelined regions. >> >> >> >> On 25/05/2022 05:55, weijie guo wrote: >> >>> Hi All, >> >>> Thank you for your attention and feedback. >> >>> Do you have any other comments? If there are no other questions, I'll >> >> vote >> >>> on FLIP-235 tomorrow. >> >>> >> >>> Best regards, >> >>> >> >>> Weijie >> >>> >> >>> >> >>> Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 13:22写道: >> >>> >> >>>> Hi Xintong >> >>>> Thanks for your detailed explanation, I misunderstand the spill >> >>>> behavior at first glance, >> >>>> I get your point now. I think it will be a good addition to the >> current >> >>>> execution mode. >> >>>> Looking forward to it :) >> >>>> >> >>>> Best, >> >>>> Aitozi >> >>>> >> >>>> Xintong Song <tonysong...@gmail.com> 于2022年5月20日周五 12:26写道: >> >>>> >> >>>>> Hi Aitozi, >> >>>>> >> >>>>> In which case we can use the hybrid shuffle mode >> >>>>> >> >>>>> Just to directly answer this question, in addition to >> >>>>> Weijie's explanations. For batch workload, if you want the workload >> to >> >>>> take >> >>>>> advantage of as many resources as available, which ranges from a >> single >> >>>>> slot to as many slots as the total tasks, you may consider hybrid >> >> shuffle >> >>>>> mode. Admittedly, this may not always be wanted, e.g., users may not >> >> want >> >>>>> to execute a job if there's too few resources available, or may not >> >> want >> >>>> a >> >>>>> job taking too many of the cluster resources. That's why we propose >> >>>> hybrid >> >>>>> shuffle as an additional option for batch users, rather than a >> >>>> replacement >> >>>>> for Pipelined or Blocking mode. >> >>>>> >> >>>>> So you mean the hybrid shuffle mode will limit its usage to the >> bounded >> >>>>>> source, Right ? >> >>>>>> >> >>>>> Yes. >> >>>>> >> >>>>> One more question, with the bounded data and partly of the stage is >> >>>> running >> >>>>>> in the Pipelined shuffle mode, what will be the behavior of the >> task >> >>>>>> failure, Is the checkpoint enabled for these running stages or >> will it >> >>>>>> re-run after the failure? >> >>>>>> >> >>>>> There's no checkpoints. The failover behavior depends on the >> spilling >> >>>>> strategy. >> >>>>> - In the first version, we only consider a selective spilling >> strategy, >> >>>>> which means spill data as little as possible to the disk, which >> means >> >> in >> >>>>> case of failover upstream tasks need to be restarted to reproduce >> the >> >>>>> complete intermediate results. >> >>>>> - An alternative strategy we may introduce in future if needed is to >> >>>> spill >> >>>>> the complete intermediate results. That avoids restarting upstream >> >> tasks >> >>>> in >> >>>>> case of failover, because the produced intermediate results can be >> >>>>> re-consumed, at the cost of more disk IO load. >> >>>>> With both strategies, the trade-off between failover cost and IO >> load >> >> is >> >>>>> for the user to decide. This is also discussed in the >> MemoryDataManager >> >>>>> section of the FLIP. >> >>>>> >> >>>>> Best, >> >>>>> >> >>>>> Xintong >> >>>>> >> >>>>> >> >>>>> >> >>>>> On Fri, May 20, 2022 at 12:10 PM Aitozi <gjying1...@gmail.com> >> wrote: >> >>>>> >> >>>>>> Thanks Weijie for your answer. So you mean the hybrid shuffle mode >> >> will >> >>>>>> limit >> >>>>>> its usage to the bounded source, Right ? >> >>>>>> One more question, with the bounded data and partly of the stage is >> >>>>> running >> >>>>>> in the Pipelined shuffle mode, what will be the behavior of the >> task >> >>>>>> failure, Is the >> >>>>>> checkpoint enabled for these running stages or will it re-run after >> >> the >> >>>>>> failure? >> >>>>>> >> >>>>>> Best, >> >>>>>> Aitozi >> >>>>>> >> >>>>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月20日周五 10:45写道: >> >>>>>> >> >>>>>>> Hi, Aitozi: >> >>>>>>> >> >>>>>>> Thank you for the feedback! >> >>>>>>> Here are some of my thoughts on your question >> >>>>>>> >> >>>>>>>>>> 1.If there is an unbounded data source, but only have resource >> to >> >>>>>>> schedule the first stage, will it bring the big burden to the >> >>>>>> disk/shuffle >> >>>>>>> service which will occupy all the resource I think. >> >>>>>>> First of all, Hybrid Shuffle Mode is oriented to the batch job >> >>>>> scenario, >> >>>>>> so >> >>>>>>> there is no problem of unbounded data sources. Secondly, if you >> >>>>> consider >> >>>>>>> the stream scenario, I think Pipelined Shuffle should still be the >> >>>> best >> >>>>>>> choice at present. For an unbounded data stream, it is not >> meaningful >> >>>>> to >> >>>>>>> only run some stages. >> >>>>>>> >> >>>>>>>>>> 2. Which kind of job will benefit from the hybrid shuffle mode. >> >>>> In >> >>>>>>> other words, In which case we can use the hybrid shuffle mode: >> >>>>>>> Both general batch jobs and OLAP jobs benefit. For batch jobs, >> hybrid >> >>>>>>> shuffle mode can effectively utilize cluster resources and avoid >> some >> >>>>>>> unnecessary disk IO overhead. For OLAP scenarios, which are >> >>>>> characterized >> >>>>>>> by a large number of concurrently submitted short batch jobs, >> hybrid >> >>>>>>> shuffle can solve the scheduling deadlock problem of pipelined >> >>>> shuffle >> >>>>>> and >> >>>>>>> achieve similar performance. >> >>>>>>> >> >>>>>>> Best regards, >> >>>>>>> >> >>>>>>> Weijie >> >>>>>>> >> >>>>>>> >> >>>>>>> Aitozi <gjying1...@gmail.com> 于2022年5月20日周五 08:05写道: >> >>>>>>> >> >>>>>>>> Hi Weijie: >> >>>>>>>> >> >>>>>>>> Thanks for the nice FLIP, I have couple questions about >> this: >> >>>>>>>> >> >>>>>>>> 1) In the hybrid shuffle mode, the shuffle mode is decided by the >> >>>>>>> resource. >> >>>>>>>> If there >> >>>>>>>> is an unbounded data source, but only have resource to schedule >> the >> >>>>>> first >> >>>>>>>> stage, will it >> >>>>>>>> bring the big burden to the disk/shuffle service which will >> occupy >> >>>>> all >> >>>>>>> the >> >>>>>>>> resource I think. >> >>>>>>>> >> >>>>>>>> 2) Which kind of job will benefit from the hybrid shuffle mode. >> In >> >>>>>> other >> >>>>>>>> words, In which >> >>>>>>>> case we can use the hybrid shuffle mode: >> >>>>>>>> - For batch job want to use more resource to reduce the e2e time >> ? >> >>>>>>>> - Or for streaming job which may lack of resource temporarily ? >> >>>>>>>> - Or for OLAP job which will try to make best use of available >> >>>>>> resources >> >>>>>>> as >> >>>>>>>> you mentioned to finish the query? >> >>>>>>>> Just want to know the typical use case for the Hybrid shuffle >> mode >> >>>> :) >> >>>>>>>> Best, >> >>>>>>>> Aitozi. >> >>>>>>>> >> >>>>>>>> weijie guo <guoweijieres...@gmail.com> 于2022年5月19日周四 18:33写道: >> >>>>>>>> >> >>>>>>>>> Yangze, Thank you for the feedback! >> >>>>>>>>> Here's my thoughts for your questions: >> >>>>>>>>> >> >>>>>>>>>>>> How do we decide the size of the buffer pool in >> >>>>> MemoryDataManager >> >>>>>>> and >> >>>>>>>>> the read buffers in FileDataManager? >> >>>>>>>>> The BufferPool in MemoryDataManager is the LocalBufferPool used >> >>>> by >> >>>>>>>>> ResultPartition, and the size is the same as the current >> >>>>>> implementation >> >>>>>>>> of >> >>>>>>>>> sort-merge shuffle. In other words, the minimum value of >> >>>> BufferPool >> >>>>>> is >> >>>>>>> a >> >>>>>>>>> configurable fixed value, and the maximum value is Math.max(min, >> >>>> 4 >> >>>>> * >> >>>>>>>>> numSubpartitions). The default value can be determined by >> running >> >>>>> the >> >>>>>>>>> TPC-DS tests. >> >>>>>>>>> Read buffers in FileDataManager are requested from the >> >>>>>>>>> BatchShuffleReadBufferPool shared by TaskManager, it's size >> >>>>>> controlled >> >>>>>>> by >> >>>>>>>>> *taskmanager.memory.framework.off-heap.batch-shuffle.size*, the >> >>>>>> default >> >>>>>>>>> value is 32M, which is consistent with the current sort-merge >> >>>>> shuffle >> >>>>>>>>> logic. >> >>>>>>>>> >> >>>>>>>>>>>> Is there an upper limit for the sum of them? If there is, how >> >>>>>> does >> >>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage? >> >>>>>>>>> The buffers of the MemoryDataManager are limited by the size of >> >>>> the >> >>>>>>>>> LocalBufferPool, and the upper limit is the size of the Network >> >>>>>> Memory. >> >>>>>>>> The >> >>>>>>>>> buffers of the FileDataManager are directly requested from >> >>>>>>>>> UnpooledOffHeapMemory, and are also limited by the size of the >> >>>>>>> framework >> >>>>>>>>> off-heap memory. I think there should be no need for additional >> >>>>>>>>> synchronization mechanisms. >> >>>>>>>>> >> >>>>>>>>>>>> How do you disable the slot sharing? If user configures both >> >>>>> the >> >>>>>>> slot >> >>>>>>>>> sharing group and hybrid shuffle, what will happen to that job? >> >>>>>>>>> I think we can print a warning log when Hybrid Shuffle is >> enabled >> >>>>> and >> >>>>>>> SSG >> >>>>>>>>> is configured during the JobGraph compilation stage, and >> fallback >> >>>>> to >> >>>>>>> the >> >>>>>>>>> region slot sharing group by default. Of course, it will be >> >>>>>> emphasized >> >>>>>>> in >> >>>>>>>>> the document that we do not currently support SSG, If >> configured, >> >>>>> it >> >>>>>>> will >> >>>>>>>>> fall back to the default. >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> Best regards, >> >>>>>>>>> >> >>>>>>>>> Weijie >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> Yangze Guo <karma...@gmail.com> 于2022年5月19日周四 16:25写道: >> >>>>>>>>> >> >>>>>>>>>> Thanks for driving this. Xintong and Weijie. >> >>>>>>>>>> >> >>>>>>>>>> I believe this feature will make Flink a better batch/OLAP >> >>>>> engine. >> >>>>>> +1 >> >>>>>>>>>> for the overall design. >> >>>>>>>>>> >> >>>>>>>>>> Some questions: >> >>>>>>>>>> 1. How do we decide the size of the buffer pool in >> >>>>>> MemoryDataManager >> >>>>>>>>>> and the read buffers in FileDataManager? >> >>>>>>>>>> 2. Is there an upper limit for the sum of them? If there is, >> >>>> how >> >>>>>> does >> >>>>>>>>>> MemoryDataManager and FileDataManager sync the memory usage? >> >>>>>>>>>> 3. How do you disable the slot sharing? If user configures both >> >>>>> the >> >>>>>>>>>> slot sharing group and hybrid shuffle, what will happen to that >> >>>>>> job? >> >>>>>>>>>> Best, >> >>>>>>>>>> Yangze Guo >> >>>>>>>>>> >> >>>>>>>>>> On Thu, May 19, 2022 at 2:41 PM Xintong Song < >> >>>>>> tonysong...@gmail.com> >> >>>>>>>>>> wrote: >> >>>>>>>>>>> Thanks for preparing this FLIP, Weijie. >> >>>>>>>>>>> >> >>>>>>>>>>> I think this is a good improvement on batch resource >> >>>>> elasticity. >> >>>>>>>>> Looking >> >>>>>>>>>>> forward to the community feedback. >> >>>>>>>>>>> >> >>>>>>>>>>> Best, >> >>>>>>>>>>> >> >>>>>>>>>>> Xintong >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> On Thu, May 19, 2022 at 2:31 PM weijie guo < >> >>>>>>>> guoweijieres...@gmail.com> >> >>>>>>>>>>> wrote: >> >>>>>>>>>>> >> >>>>>>>>>>>> Hi all, >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> I’d like to start a discussion about FLIP-235[1], which >> >>>>>>> introduce a >> >>>>>>>>>> new shuffle mode >> >>>>>>>>>>>> can overcome some of the problems of Pipelined Shuffle and >> >>>>>>>> Blocking >> >>>>>>>>>> Shuffle in batch scenarios. >> >>>>>>>>>>>> Currently in Flink, task scheduling is more or less >> >>>>> constrained >> >>>>>>> by >> >>>>>>>>> the >> >>>>>>>>>> shuffle implementations. >> >>>>>>>>>>>> This will bring the following disadvantages: >> >>>>>>>>>>>> >> >>>>>>>>>>>> 1. Pipelined Shuffle: >> >>>>>>>>>>>> For pipelined shuffle, the upstream and downstream >> >>>> tasks >> >>>>>> are >> >>>>>>>>>> required to be deployed at the same time, to avoid upstream >> >>>> tasks >> >>>>>>> being >> >>>>>>>>>> blocked forever. This is fine when there are enough resources >> >>>> for >> >>>>>>> both >> >>>>>>>>>> upstream and downstream tasks to run simultaneously, but will >> >>>>> cause >> >>>>>>> the >> >>>>>>>>>> following problems otherwise: >> >>>>>>>>>>>> 1. >> >>>>>>>>>>>> Pipelined shuffle connected tasks (i.e., a pipelined >> >>>>>>> region) >> >>>>>>>>>> cannot be executed until obtaining resources for all of them, >> >>>>>>> resulting >> >>>>>>>>> in >> >>>>>>>>>> longer job finishing time and poorer resource efficiency due to >> >>>>>>> holding >> >>>>>>>>>> part of the resources idle while waiting for the rest. >> >>>>>>>>>>>> 2. >> >>>>>>>>>>>> More severely, if multiple jobs each hold part of the >> >>>>>>> cluster >> >>>>>>>>>> resources and are waiting for more, a deadlock would occur. The >> >>>>>>> chance >> >>>>>>>> is >> >>>>>>>>>> not trivial, especially for scenarios such as OLAP where >> >>>>> concurrent >> >>>>>>> job >> >>>>>>>>>> submissions are frequent. >> >>>>>>>>>>>> 2. Blocking Shuffle: >> >>>>>>>>>>>> For blocking shuffle, execution of downstream tasks >> >>>> must >> >>>>>> wait >> >>>>>>>> for >> >>>>>>>>>> all upstream tasks to finish, despite there might be more >> >>>>> resources >> >>>>>>>>>> available. The sequential execution of upstream and downstream >> >>>>>> tasks >> >>>>>>>>>> significantly increase the job finishing time, and the disk IO >> >>>>>>> workload >> >>>>>>>>> for >> >>>>>>>>>> spilling and loading full intermediate data also affects the >> >>>>>>>> performance. >> >>>>>>>>>>>> We believe the root cause of the above problems is that >> >>>>> shuffle >> >>>>>>>>>> implementations put unnecessary constraints on task scheduling. >> >>>>>>>>>>>> To solve this problem, Xintong Song and I propose to >> >>>>> introduce >> >>>>>>>> hybrid >> >>>>>>>>>> shuffle to minimize the scheduling constraints. With Hybrid >> >>>>>> Shuffle, >> >>>>>>>>> Flink >> >>>>>>>>>> should: >> >>>>>>>>>>>> 1. Make best use of available resources. >> >>>>>>>>>>>> Ideally, we want Flink to always make progress if >> >>>>> possible. >> >>>>>>>> That >> >>>>>>>>>> is to say, it should always execute a pending task if there are >> >>>>>>>> resources >> >>>>>>>>>> available for that task. >> >>>>>>>>>>>> 2. Minimize disk IO load. >> >>>>>>>>>>>> In-flight data should be consumed directly from memory >> >>>> as >> >>>>>>> much >> >>>>>>>> as >> >>>>>>>>>> possible. Only data that is not consumed timely should be >> >>>> spilled >> >>>>>> to >> >>>>>>>>> disk. >> >>>>>>>>>>>> You can find more details in FLIP-235. Looking forward to >> >>>>> your >> >>>>>>>>>> feedback. >> >>>>>>>>>>>> [1] >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-235%3A+Hybrid+Shuffle+Mode >> >>>>>>>>>>>> >> >>>>>>>>>>>> Best regards, >> >>>>>>>>>>>> >> >>>>>>>>>>>> Weijie >> >>>>>>>>>>>> >> >> >> >> >