Thanks for sharing the preliminary numbers with us Yingjie. The numbers look quite impressive :-)
Cheers, Till On Thu, Oct 15, 2020 at 5:25 PM Yingjie Cao <kevin.ying...@gmail.com> wrote: > Hi Till, > > Thanks for your reply and comments. > > You are right, the proposed sort-merge based shuffle is an extension of the > existing blocking shuffle and does not change any default behavior of > Flink. > > As for the performance, according to our previous experience, sort-merge > based implementation can reduce the shuffle time by 30% to even 90% > compared to hash-based implementation. My PoC implementation without any > further optimization can already reduce the shuffle time over 10% on SSD > and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job. > > After switch to sort-merge based blocking shuffle, some of our users' jobs > can scale up to over 20000 parallelism, though need some JM and RM side > optimization. I haven't ever tried to find where the upper bound is, but I > guess sever tens of thousand should be able to m > < > http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W > >eet > the needs of most users. > > Best, > Yingjie > > Till Rohrmann <trohrm...@apache.org> 于2020年10月15日周四 下午3:57写道: > > > Hi Yingjie, > > > > thanks for proposing the sort-merge based blocking shuffle. I like the > > proposal and it does not seem to change the internals of Flink. Instead > it > > is an extension of existing interfaces which makes it a > > non-invasive addition. > > > > Do you have any numbers comparing the performance of the sort-merge based > > shuffle against the hash-based shuffle? To what parallelism can you scale > > up when using the sort-merge based shuffle? > > > > Cheers, > > Till > > > > On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao <kevin.ying...@gmail.com> > > wrote: > > > > > Hi devs, > > > > > > Currently, Flink adopts a hash-style blocking shuffle implementation > > which > > > writes data sent to different reducer tasks into separate files > > > concurrently. Compared to sort-merge based approach writes those data > > > together into a single file and merges those small files into bigger > > ones, > > > hash-based approach has several weak points when it comes to running > > large > > > scale batch jobs: > > > > > > 1. *Stability*: For high parallelism (tens of thousands) batch job, > > > current hash-based blocking shuffle implementation writes too many > > files > > > concurrently which gives high pressure to the file system, for > > example, > > > maintenance of too many file metas, exhaustion of inodes or file > > > descriptors. All of these can be potential stability issues. > > Sort-Merge > > > based blocking shuffle don’t have the problem because for one result > > > partition, only one file is written at the same time. > > > 2. *Performance*: Large amounts of small shuffle files and random IO > > can > > > influence shuffle performance a lot especially for hdd (for ssd, > > > sequential > > > read is also important because of read ahead and cache). For batch > > jobs > > > processing massive data, small amount of data per subpartition is > > common > > > because of high parallelism. Besides, data skew is another cause of > > > small > > > subpartition files. By merging data of all subpartitions together in > > one > > > file, more sequential read can be achieved. > > > 3. *Resource*: For current hash-based implementation, each > > subpartition > > > needs at least one buffer. For large scale batch shuffles, the > memory > > > consumption can be huge. For example, we need at least 320M network > > > memory > > > per result partition if parallelism is set to 10000 and because of > the > > > huge > > > network consumption, it is hard to config the network memory for > large > > > scale batch job and sometimes parallelism can not be increased just > > > because of insufficient network memory which leads to bad user > > > experience. > > > > > > To improve Flink’s capability of running large scale batch jobs, we > would > > > like to introduce sort-merge based blocking shuffle to Flink[1]. Any > > > feedback is appreciated. > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink > > > > > > Best, > > > Yingjie > > > > > >