Thanks for sharing the preliminary numbers with us Yingjie. The numbers
look quite impressive :-)

Cheers,
Till

On Thu, Oct 15, 2020 at 5:25 PM Yingjie Cao <kevin.ying...@gmail.com> wrote:

> Hi Till,
>
> Thanks for your reply and comments.
>
> You are right, the proposed sort-merge based shuffle is an extension of the
> existing blocking shuffle and does not change any default behavior of
> Flink.
>
> As for the performance, according to our previous experience, sort-merge
> based implementation can reduce the shuffle time by 30% to even 90%
> compared to hash-based implementation. My PoC implementation without any
> further optimization can already reduce the shuffle time over 10% on SSD
> and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job.
>
> After switch to sort-merge based blocking shuffle, some of our users' jobs
> can scale up to over 20000 parallelism, though need some JM and RM side
> optimization. I haven't ever tried to find where the upper bound is, but I
> guess sever tens of thousand should be able to m
> <
> http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W
> >eet
> the needs of most users.
>
> Best,
> Yingjie
>
> Till Rohrmann <trohrm...@apache.org> 于2020年10月15日周四 下午3:57写道:
>
> > Hi Yingjie,
> >
> > thanks for proposing the sort-merge based blocking shuffle. I like the
> > proposal and it does not seem to change the internals of Flink. Instead
> it
> > is an extension of existing interfaces which makes it a
> > non-invasive addition.
> >
> > Do you have any numbers comparing the performance of the sort-merge based
> > shuffle against the hash-based shuffle? To what parallelism can you scale
> > up when using the sort-merge based shuffle?
> >
> > Cheers,
> > Till
> >
> > On Thu, Oct 15, 2020 at 5:03 AM Yingjie Cao <kevin.ying...@gmail.com>
> > wrote:
> >
> > > Hi devs,
> > >
> > > Currently, Flink adopts a hash-style blocking shuffle implementation
> > which
> > > writes data sent to different reducer tasks into separate files
> > > concurrently. Compared to sort-merge based approach writes those data
> > > together into a single file and merges those small files into bigger
> > ones,
> > > hash-based approach has several weak points when it comes to running
> > large
> > > scale batch jobs:
> > >
> > >    1. *Stability*: For high parallelism (tens of thousands) batch job,
> > >    current hash-based blocking shuffle implementation writes too many
> > files
> > >    concurrently which gives high pressure to the file system, for
> > example,
> > >    maintenance of too many file metas, exhaustion of inodes or file
> > >    descriptors. All of these can be potential stability issues.
> > Sort-Merge
> > >    based blocking shuffle don’t have the problem because for one result
> > >    partition, only one file is written at the same time.
> > >    2. *Performance*: Large amounts of small shuffle files and random IO
> > can
> > >    influence shuffle performance a lot especially for hdd (for ssd,
> > > sequential
> > >    read is also important because of read ahead and cache). For batch
> > jobs
> > >    processing massive data, small amount of data per subpartition is
> > common
> > >    because of high parallelism. Besides, data skew is another cause of
> > > small
> > >    subpartition files. By merging data of all subpartitions together in
> > one
> > >    file, more sequential read can be achieved.
> > >    3. *Resource*: For current hash-based implementation, each
> > subpartition
> > >    needs at least one buffer. For large scale batch shuffles, the
> memory
> > >    consumption can be huge. For example, we need at least 320M network
> > > memory
> > >    per result partition if parallelism is set to 10000 and because of
> the
> > > huge
> > >    network consumption, it is hard to config the network memory for
> large
> > >    scale batch job and  sometimes parallelism can not be increased just
> > >    because of insufficient network memory  which leads to bad user
> > > experience.
> > >
> > > To improve Flink’s capability of running large scale batch jobs, we
> would
> > > like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> > > feedback is appreciated.
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
> > >
> > > Best,
> > > Yingjie
> > >
> >
>

Reply via email to