Hi Yingjie,

thanks for proposing the sort-merge based blocking shuffle. I like the
proposal and it does not seem to change the internals of Flink. Instead it
is an extension of existing interfaces which makes it a
non-invasive addition.

Do you have any numbers comparing the performance of the sort-merge based
shuffle against the hash-based shuffle? To what parallelism can you scale
up when using the sort-merge based shuffle?

Cheers,
Till

On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao <kevin.ying...@gmail.com> wrote:

> Hi devs,
>
> Currently, Flink adopts a hash-style blocking shuffle implementation which
> writes data sent to different reducer tasks into separate files
> concurrently. Compared to sort-merge based approach writes those data
> together into a single file and merges those small files into bigger ones,
> hash-based approach has several weak points when it comes to running large
> scale batch jobs:
>
>    1. *Stability*: For high parallelism (tens of thousands) batch job,
>    current hash-based blocking shuffle implementation writes too many files
>    concurrently which gives high pressure to the file system, for example,
>    maintenance of too many file metas, exhaustion of inodes or file
>    descriptors. All of these can be potential stability issues. Sort-Merge
>    based blocking shuffle don’t have the problem because for one result
>    partition, only one file is written at the same time.
>    2. *Performance*: Large amounts of small shuffle files and random IO can
>    influence shuffle performance a lot especially for hdd (for ssd,
> sequential
>    read is also important because of read ahead and cache). For batch jobs
>    processing massive data, small amount of data per subpartition is common
>    because of high parallelism. Besides, data skew is another cause of
> small
>    subpartition files. By merging data of all subpartitions together in one
>    file, more sequential read can be achieved.
>    3. *Resource*: For current hash-based implementation, each subpartition
>    needs at least one buffer. For large scale batch shuffles, the memory
>    consumption can be huge. For example, we need at least 320M network
> memory
>    per result partition if parallelism is set to 10000 and because of the
> huge
>    network consumption, it is hard to config the network memory for large
>    scale batch job and  sometimes parallelism can not be increased just
>    because of insufficient network memory  which leads to bad user
> experience.
>
> To improve Flink’s capability of running large scale batch jobs, we would
> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> feedback is appreciated.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
>
> Best,
> Yingjie
>

Reply via email to