Thanks for the update Yingjie. Would it make sense to write a short blog post about this feature including some performance improvement numbers? I think this could be interesting to our users.
Cheers, Till On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <jingsongl...@gmail.com> wrote: > Thanks Yingjie for the great effort! > > This is really helpful to Flink Batch users! > > Best, > Jingsong > > On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <kevin.ying...@gmail.com> > wrote: > > > Hi devs & users, > > > > The FLIP-148[1] has been released with Flink 1.13 and the final > > implementation has some differences compared with the initial proposal in > > the FLIP document. To avoid potential misunderstandings, I have updated > the > > FLIP document[1] accordingly and I also drafted another document[2] which > > contains more implementation details. FYI. > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink > > [2] > > > https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing > > > > Best, > > Yingjie > > > > Yingjie Cao <kevin.ying...@gmail.com> 于2020年10月15日周四 上午11:02写道: > > > >> Hi devs, > >> > >> Currently, Flink adopts a hash-style blocking shuffle implementation > >> which writes data sent to different reducer tasks into separate files > >> concurrently. Compared to sort-merge based approach writes those data > >> together into a single file and merges those small files into bigger > ones, > >> hash-based approach has several weak points when it comes to running > large > >> scale batch jobs: > >> > >> 1. *Stability*: For high parallelism (tens of thousands) batch job, > >> current hash-based blocking shuffle implementation writes too many > files > >> concurrently which gives high pressure to the file system, for > example, > >> maintenance of too many file metas, exhaustion of inodes or file > >> descriptors. All of these can be potential stability issues. > Sort-Merge > >> based blocking shuffle don’t have the problem because for one result > >> partition, only one file is written at the same time. > >> 2. *Performance*: Large amounts of small shuffle files and random IO > >> can influence shuffle performance a lot especially for hdd (for ssd, > >> sequential read is also important because of read ahead and cache). > For > >> batch jobs processing massive data, small amount of data per > subpartition > >> is common because of high parallelism. Besides, data skew is another > cause > >> of small subpartition files. By merging data of all subpartitions > together > >> in one file, more sequential read can be achieved. > >> 3. *Resource*: For current hash-based implementation, each > >> subpartition needs at least one buffer. For large scale batch > shuffles, the > >> memory consumption can be huge. For example, we need at least 320M > network > >> memory per result partition if parallelism is set to 10000 and > because of > >> the huge network consumption, it is hard to config the network > memory for > >> large scale batch job and sometimes parallelism can not be > increased just > >> because of insufficient network memory which leads to bad user > experience. > >> > >> To improve Flink’s capability of running large scale batch jobs, we > would > >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any > >> feedback is appreciated. > >> > >> [1] > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink > >> > >> Best, > >> Yingjie > >> > > > > -- > Best, Jingsong Lee >