Thanks for the update Yingjie. Would it make sense to write a short blog
post about this feature including some performance improvement numbers? I
think this could be interesting to our users.

Cheers,
Till

On Mon, Jun 7, 2021 at 4:49 AM Jingsong Li <jingsongl...@gmail.com> wrote:

> Thanks Yingjie for the great effort!
>
> This is really helpful to Flink Batch users!
>
> Best,
> Jingsong
>
> On Mon, Jun 7, 2021 at 10:11 AM Yingjie Cao <kevin.ying...@gmail.com>
> wrote:
>
> > Hi devs & users,
> >
> > The FLIP-148[1] has been released with Flink 1.13 and the final
> > implementation has some differences compared with the initial proposal in
> > the FLIP document. To avoid potential misunderstandings, I have updated
> the
> > FLIP document[1] accordingly and I also drafted another document[2] which
> > contains more implementation details.  FYI.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> > [2]
> >
> https://docs.google.com/document/d/1j12TkSqgf6dg3J48udA2MFrDOQccW24tzjn5pJlTQaQ/edit?usp=sharing
> >
> > Best,
> > Yingjie
> >
> > Yingjie Cao <kevin.ying...@gmail.com> 于2020年10月15日周四 上午11:02写道:
> >
> >> Hi devs,
> >>
> >> Currently, Flink adopts a hash-style blocking shuffle implementation
> >> which writes data sent to different reducer tasks into separate files
> >> concurrently. Compared to sort-merge based approach writes those data
> >> together into a single file and merges those small files into bigger
> ones,
> >> hash-based approach has several weak points when it comes to running
> large
> >> scale batch jobs:
> >>
> >>    1. *Stability*: For high parallelism (tens of thousands) batch job,
> >>    current hash-based blocking shuffle implementation writes too many
> files
> >>    concurrently which gives high pressure to the file system, for
> example,
> >>    maintenance of too many file metas, exhaustion of inodes or file
> >>    descriptors. All of these can be potential stability issues.
> Sort-Merge
> >>    based blocking shuffle don’t have the problem because for one result
> >>    partition, only one file is written at the same time.
> >>    2. *Performance*: Large amounts of small shuffle files and random IO
> >>    can influence shuffle performance a lot especially for hdd (for ssd,
> >>    sequential read is also important because of read ahead and cache).
> For
> >>    batch jobs processing massive data, small amount of data per
> subpartition
> >>    is common because of high parallelism. Besides, data skew is another
> cause
> >>    of small subpartition files. By merging data of all subpartitions
> together
> >>    in one file, more sequential read can be achieved.
> >>    3. *Resource*: For current hash-based implementation, each
> >>    subpartition needs at least one buffer. For large scale batch
> shuffles, the
> >>    memory consumption can be huge. For example, we need at least 320M
> network
> >>    memory per result partition if parallelism is set to 10000 and
> because of
> >>    the huge network consumption, it is hard to config the network
> memory for
> >>    large scale batch job and  sometimes parallelism can not be
> increased just
> >>    because of insufficient network memory  which leads to bad user
> experience.
> >>
> >> To improve Flink’s capability of running large scale batch jobs, we
> would
> >> like to introduce sort-merge based blocking shuffle to Flink[1]. Any
> >> feedback is appreciated.
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink
> >>
> >> Best,
> >> Yingjie
> >>
> >
>
> --
> Best, Jingsong Lee
>

Reply via email to