Hi Yingjie, thanks for proposing the sort-merge based blocking shuffle. I like the proposal and it does not seem to change the internals of Flink. Instead it is an extension of existing interfaces which makes it a non-invasive addition.
Do you have any numbers comparing the performance of the sort-merge based shuffle against the hash-based shuffle? To what parallelism can you scale up when using the sort-merge based shuffle? Cheers, Till On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao <kevin.ying...@gmail.com> wrote: > Hi devs, > > Currently, Flink adopts a hash-style blocking shuffle implementation which > writes data sent to different reducer tasks into separate files > concurrently. Compared to sort-merge based approach writes those data > together into a single file and merges those small files into bigger ones, > hash-based approach has several weak points when it comes to running large > scale batch jobs: > > 1. *Stability*: For high parallelism (tens of thousands) batch job, > current hash-based blocking shuffle implementation writes too many files > concurrently which gives high pressure to the file system, for example, > maintenance of too many file metas, exhaustion of inodes or file > descriptors. All of these can be potential stability issues. Sort-Merge > based blocking shuffle don’t have the problem because for one result > partition, only one file is written at the same time. > 2. *Performance*: Large amounts of small shuffle files and random IO can > influence shuffle performance a lot especially for hdd (for ssd, > sequential > read is also important because of read ahead and cache). For batch jobs > processing massive data, small amount of data per subpartition is common > because of high parallelism. Besides, data skew is another cause of > small > subpartition files. By merging data of all subpartitions together in one > file, more sequential read can be achieved. > 3. *Resource*: For current hash-based implementation, each subpartition > needs at least one buffer. For large scale batch shuffles, the memory > consumption can be huge. For example, we need at least 320M network > memory > per result partition if parallelism is set to 10000 and because of the > huge > network consumption, it is hard to config the network memory for large > scale batch job and sometimes parallelism can not be increased just > because of insufficient network memory which leads to bad user > experience. > > To improve Flink’s capability of running large scale batch jobs, we would > like to introduce sort-merge based blocking shuffle to Flink[1]. Any > feedback is appreciated. > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink > > Best, > Yingjie >