I don’t think our function are memory heavy they typically are cogroups and 
merge the records on the left with the records on the right.

We’re currently requiring 720GB of heap to do our processing which frankly 
appears ridiculous to us. Could too much parallelism be causing the problem? 
Looking at:

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Optimal-Configuration-for-Cluster-td5024.html

If we are processing 17 “datasets” in a single job and each has an individual 
parallelism of 40 is that a total parallelism (potential) of 17*40 and given 
your network buffers calculation of parallelism squared, would that do it or 
only if we explicitly configure it that way:

taskmanager.network.numberOfBuffers: p ^ 2 * t * 4

where p is the maximum parallelism of the job and t is the number of task 
manager.
You can process more than one parallel task per TM if you configure more than 
one processing slot per machine ( taskmanager.numberOfTaskSlots). The TM will 
divide its memory among all its slots. So it would be possible to start one TM 
for each machine with 100GB+ memory and 48 slots each.

Our pipeline for each dataset looks like this:

Read avro file -> FlatMap -> Validate each record with a flatmap ->
Read Parquet -> FlatMap -> Filter Live Rows -> CoGroup with the validated avro 
file above -> }
Read Parquet -> FlatMap -> Filter Dead Rows  
------------------------------------>  } Union cogroup with dead rows and write 
result to parquet file.

I don’t understand why this logic couldn’t run with a single task manager and 
just take longer. We’re having a lot of trouble trying to change the tuning to 
reduce the memory burn. We run the above pipeline with parallelism 40 for all 
17 datasets in a single job.

We’re running this config now which is not really justifiable for what we’re 
doing.

20 nodes 2 slots, 40 parallelism 36GB mem = 720GB of heap…

Thanks

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Wednesday, April 19, 2017 10:52 AM
To: Newport, Billy [Tech]
Cc: user@flink.apache.org
Subject: Re: Flink memory usage

Hi Billy,
Flink's internal operators are implemented to not allocate heap space 
proportional to the size of the input data.
Whenever Flink needs to hold data in memory (e.g., for sorting or building a 
hash table) the data is serialized into managed memory. If all memory is in 
use, Flink starts spilling to disk. This blog post discusses how Flink uses its 
managed memory [1] (still up to date, even though it's almost 2 years old).
The runtime code should actually quite stable. Most of the code has been there 
for several years (even before Flink was donated to the ASF) and we haven't 
seen many bugs reported for the DataSet runtime. Of course this does not mean 
that the code doesn't contain bugs.

However, Flink does not take care of the user code. For example a 
GroupReduceFunction that collects a lot of data, e.g., in a List on the heap, 
can still kill a program.
I would check if you have user functions that require lots of heap memory.
Also reducing the size of the managed memory to have more heap space available 
might help.
If that doesn't solve the problem, it would be good if you could share some 
details about your job (which operators, which local strategies, how many 
operators) that might help to identify the misbehaving operator.

Thanks, Fabian

[1] 
https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__flink.apache.org_news_2015_05_11_Juggling-2Dwith-2DBits-2Dand-2DBytes.html&d=DgMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=gXNF2FIfEb4pGn-GTNBdJ8q7RfWbahzA3eykq55STe0&s=PSRQ06vPRHlB-80MrNWAIGluVV4I-c7kZ35Dm-OIRzs&e=>


2017-04-19 16:09 GMT+02:00 Newport, Billy 
<billy.newp...@gs.com<mailto:billy.newp...@gs.com>>:
How does Flink use memory? We’re seeing cases when running a job on larger 
datasets where it throws OOM exceptions during the job. We’re using the Dataset 
API. Shouldn’t flink be streaming from disk to disk? We workaround by using 
fewer slots but it seems unintuitive that I need to change these settings given 
Flink != Spark. Why isn’t Flinks memory usage constant? Why couldn’t I run a 
job with a single task and a single slot for any size job successfully other 
than it takes much longer to run.

Thanks
Billy



Reply via email to