Hi Konstantin,

Flink's sort algorithm works as follows:

- Each memory-consuming task (such as sort or combine) has a memory budget
which depends on the number of operators in the plan, the TM managed
memory, and the number of slots of the TM. Each TM slot has the same
fraction of the over all TM memory. If there are two memory consuming
operators (combine and sort), each of their tasks gets 50% of the slot
memory. So if you have a TM with 40GB and 4 slots, each slot has 10GB and
each task 5 GB.
- The sort splits its memory budget in three buffers.
- The first buffer is filled with incoming data. Once full, it is sorted
and the second buffer is filled. When the second buffer is full, the third
buffer is filled and the second buffer is sorted when sorting the first
buffer finished.
- When the first buffer is sorted, Flink waits until a certain amount of
data is received (by default 70% of the sort's memory budget). When that
happens, it starts spilling the first buffer to disk. When the buffer is
spilled, the first buffer can be filled again.
- When all data was read, the last buffer is only sorted but not spilled.
- The sorted stream is produced by merging the sorted and spilled records.

There are a few reasons that might cause spilling.
1) the spilling threshold is too tight. For example to sort 10GB in memory
(in a single task), you need more than 14.20GB of sorter memory (10GB /
0.7). The idea here is start early enough to spill such that the first
buffer is empty before the third buffer is filled we and we have to block
the input.
I'm not sure if it is easily possible to tweak the threshold.
2) the data might be skewed.

Something that you could try is to use a hash-combiner which can help to
improve the combine rate if you have have a rather low number of distinct
keys.
Hash combiners have to be explicitly chosen and are only available for
ReduceFunctions.
So you would have to implement the sum as a ReduceFunction and hint the
hash combiner like this

input.groupBy(0, 1).reduce(new SumFunc()).setCombineHint(CombineHint.HASH)

Hope this helps,
Fabian

2018-01-22 16:13 GMT+01:00 Konstantin Gregor <konstantin.gre...@tngtech.com>
:

> Hello everyone,
>
> I have a question about the spilling behavior of a Flink batch job.
>
> The relevant part is a standard map-reduce, aggregating 4 billion
> Tuple3<Integer, Integer, Integer> together via a groupBy(0,1).sum(2).
> And there really doesn't happen much else in the job.
>
> The problem is that I don't understand why this job spills to disk. In
> this example the spilling is not really an issue, but we run the same
> job with much larger datasets, where we simply run out of disk space. So
> we're trying to understand better what it spills and what we can do
> about it.
>
> In this example, I am running on AWS EMR (Flink 1.3.1) with a machine
> with 240GB memory. I tweaked the following parameters:
>
> yarn.heap-cutoff-ratio: 0.1
> taskmanager.memory.fraction: 0.9
> taskmanager.network.numberOfBuffers: 32768
>
> This leads to 170GB Flink Managed Memory which in my opinion should
> suffice for the amount of data (the amount of data going from the
> combine to the reduce is roughly 80GB). However, it is spilling over
> 70GB on disk.
>
> Do you have a hint for me why this could be the case and can explain
> what exactly is written into the state on such a group-reduce?
>
> Thank you so much for your input,
> best regards
>
> Konstantin
>
>
> --
> Konstantin Gregor * konstantin.gre...@tngtech.com
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Dr. Robert Dahlke, Gerhard Müller
> Amtsgericht München, HRB 135082
>

Reply via email to