Thanks Yingjie for the great effort! This is really helpful to Flink Batch users!
Best, Jingsong On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <kevin.ying...@gmail.com> wrote: > Hi devs & users, > > The FLIP-148[1] has been released with Flink 1.13 and the final > implementation has some differences compared with the initial proposal in > the FLIP document. To avoid potential misunderstandings, I have updated the > FLIP document[1] accordingly and I also drafted another document[2] which > contains more implementation details. FYI. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink > [2] > https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing > > Best, > Yingjie > > Yingjie Cao <kevin.ying...@gmail.com> 于2020年10月15日周四 上午11:02写道: > >> Hi devs, >> >> Currently, Flink adopts a hash-style blocking shuffle implementation >> which writes data sent to different reducer tasks into separate files >> concurrently. Compared to sort-merge based approach writes those data >> together into a single file and merges those small files into bigger ones, >> hash-based approach has several weak points when it comes to running large >> scale batch jobs: >> >> 1. *Stability*: For high parallelism (tens of thousands) batch job, >> current hash-based blocking shuffle implementation writes too many files >> concurrently which gives high pressure to the file system, for example, >> maintenance of too many file metas, exhaustion of inodes or file >> descriptors. All of these can be potential stability issues. Sort-Merge >> based blocking shuffle don’t have the problem because for one result >> partition, only one file is written at the same time. >> 2. *Performance*: Large amounts of small shuffle files and random IO >> can influence shuffle performance a lot especially for hdd (for ssd, >> sequential read is also important because of read ahead and cache). For >> batch jobs processing massive data, small amount of data per subpartition >> is common because of high parallelism. Besides, data skew is another cause >> of small subpartition files. By merging data of all subpartitions together >> in one file, more sequential read can be achieved. >> 3. *Resource*: For current hash-based implementation, each >> subpartition needs at least one buffer. For large scale batch shuffles, >> the >> memory consumption can be huge. For example, we need at least 320M network >> memory per result partition if parallelism is set to 10000 and because of >> the huge network consumption, it is hard to config the network memory for >> large scale batch job and sometimes parallelism can not be increased just >> because of insufficient network memory which leads to bad user >> experience. >> >> To improve Flink’s capability of running large scale batch jobs, we would >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any >> feedback is appreciated. >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink >> >> Best, >> Yingjie >> > -- Best, Jingsong Lee