Hi Billy,

if it's possible that you can share some parts of your code privately with
me, then I can try to figure out what's going wrong.

Cheers,
Till

On Thu, Apr 20, 2017 at 6:00 PM, Newport, Billy <billy.newp...@gs.com>
wrote:

> Ok
>
> The concensus seems to be that it’s us not Flink J So we’ll look harder
> at what we’re doing in case there is anything silly. We are using 16K
> network buffers BTW which is around 0.5GB with the defaults.
>
>
>
>
>
> *From:* Till Rohrmann [mailto:trohrm...@apache.org]
> *Sent:* Thursday, April 20, 2017 11:52 AM
> *To:* Stefano Bortoli
> *Cc:* Newport, Billy [Tech]; Fabian Hueske; user@flink.apache.org
>
> *Subject:* Re: Flink memory usage
>
>
>
> Hi Billy,
>
>
>
> if you didn't split the different data sets up into different slot sharing
> groups, then your maximum parallelism is 40. Thus, it should be enough to
> assign 40^2 * 20 * 4 = 128000 network buffers. If that is not enough
> because you have more than 4 shuffling steps in parallel running then you
> have to increase the last term.
>
>
>
> OOM exceptions should actually only occur due to user code objects. Given
> that you have reserved a massive amount of memory for the network buffers
> the remaining heap for the user code is probably very small. Try whether
> you can decrease the number of network buffers. Moreover, check whether
> your user code keeps somewhere references to objects which could cause the
> OOM.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Thu, Apr 20, 2017 at 5:42 PM, Stefano Bortoli <
> stefano.bort...@huawei.com> wrote:
>
> I think that if you have a lot of memory available, the GC gets kind of
> lazy. In our case, the issue was just the latency caused by the GC, cause
> we were loading more data than it could fit in memory. Hence optimizing the
> code gave us a lot of improvements. FlatMaps are also dangerous as objects
> can multiply beyond expected, making co-group extremely costly. :-) A
> distinct() well placed saves a lot of time and memory.
>
>
>
> My point is that having worked with scarce resources I learned that almost
> all the time the issue was my code, not the framework.
>
>
>
> Good luck.
>
>
>
> Stefano
>
>
>
> *From:* Newport, Billy [mailto:billy.newp...@gs.com]
> *Sent:* Thursday, April 20, 2017 4:46 PM
> *To:* Stefano Bortoli <stefano.bort...@huawei.com>; 'Fabian Hueske' <
> fhue...@gmail.com>
>
>
> *Cc:* 'user@flink.apache.org' <user@flink.apache.org>
> *Subject:* RE: Flink memory usage
>
>
>
> Your reuse idea kind of implies that it’s a GC generation rate issue, i.e.
> it’s not collecting fast enough so it’s running out of memory versus heap
> that’s actually anchored, right?
>
>
>
>
>
> *From:* Stefano Bortoli [mailto:stefano.bort...@huawei.com
> <stefano.bort...@huawei.com>]
> *Sent:* Thursday, April 20, 2017 10:33 AM
> *To:* Newport, Billy [Tech]; 'Fabian Hueske'
> *Cc:* 'user@flink.apache.org'
> *Subject:* RE: Flink memory usage
>
>
>
> Hi Billy,
>
>
>
> The only suggestion I can give is to check very well in your code for
> useless variable allocations, and foster reuse as much as possible. Don’t
> create a new collection at any map execution, but rather clear, reuse the
> collected output of the flatMap, and so on.  In the past we run long
> process of lot of data and small memory without problems. Many more complex
> co-group, joins and so on without any issue.
>
>
>
> My2c. Hope it helps.
>
>
>
> Stefano
>
>
>
> *From:* Newport, Billy [mailto:billy.newp...@gs.com <billy.newp...@gs.com>]
>
> *Sent:* Thursday, April 20, 2017 1:31 PM
> *To:* 'Fabian Hueske' <fhue...@gmail.com>
> *Cc:* 'user@flink.apache.org' <user@flink.apache.org>
> *Subject:* RE: Flink memory usage
>
>
>
> I don’t think our function are memory heavy they typically are cogroups
> and merge the records on the left with the records on the right.
>
>
>
> We’re currently requiring 720GB of heap to do our processing which frankly
> appears ridiculous to us. Could too much parallelism be causing the
> problem? Looking at:
>
>
>
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Optimal-Configuration-for-Cluster-td5024.html
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_Optimal-2DConfiguration-2Dfor-2DCluster-2Dtd5024.html&d=DgMGaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=ORtElREFIpUSPJ6hOwnIj181geKdc8QSLx-WpIoc7FE&s=9H61hE0TVJvlOBRwSHrvnOhxKioqoepvCgjG0ZJefIY&e=>
>
>
>
> If we are processing 17 “datasets” in a single job and each has an
> individual parallelism of 40 is that a total parallelism (potential) of
> 17*40 and given your network buffers calculation of parallelism squared,
> would that do it or only if we explicitly configure it that way:
>
>
>
> taskmanager.network.numberOfBuffers: p ^ 2 * t * 4
>
>
> where p is the maximum parallelism of the job and t is the number of task
> manager.
>
> You can process more than one parallel task per TM if you configure more
> than one processing slot per machine ( taskmanager.numberOfTaskSlots).
> The TM will divide its memory among all its slots. So it would be possible
> to start one TM for each machine with 100GB+ memory and 48 slots each.
>
>
>
> Our pipeline for each dataset looks like this:
>
>
>
> Read avro file -> FlatMap -> Validate each record with a flatmap ->
>
> Read Parquet -> FlatMap -> Filter Live Rows -> CoGroup with the validated
> avro file above -> }
>
> Read Parquet -> FlatMap -> Filter Dead Rows  ------------------------------
> ----à  } Union cogroup with dead rows and write result to parquet file.
>
>
>
> I don’t understand why this logic couldn’t run with a single task manager
> and just take longer. We’re having a lot of trouble trying to change the
> tuning to reduce the memory burn. We run the above pipeline with
> parallelism 40 for all 17 datasets in a single job.
>
>
>
> We’re running this config now which is not really justifiable for what
> we’re doing.
>
>
>
> 20 nodes 2 slots, 40 parallelism 36GB mem = 720GB of heap…
>
>
>
> Thanks
>
>
>
> *From:* Fabian Hueske [mailto:fhue...@gmail.com <fhue...@gmail.com>]
> *Sent:* Wednesday, April 19, 2017 10:52 AM
> *To:* Newport, Billy [Tech]
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink memory usage
>
>
>
> Hi Billy,
>
> Flink's internal operators are implemented to not allocate heap space
> proportional to the size of the input data.
>
> Whenever Flink needs to hold data in memory (e.g., for sorting or building
> a hash table) the data is serialized into managed memory. If all memory is
> in use, Flink starts spilling to disk. This blog post discusses how Flink
> uses its managed memory [1] (still up to date, even though it's almost 2
> years old).
>
> The runtime code should actually quite stable. Most of the code has been
> there for several years (even before Flink was donated to the ASF) and we
> haven't seen many bugs reported for the DataSet runtime. Of course this
> does not mean that the code doesn't contain bugs.
>
>
>
> However, Flink does not take care of the user code. For example a
> GroupReduceFunction that collects a lot of data, e.g., in a List on the
> heap, can still kill a program.
>
> I would check if you have user functions that require lots of heap memory.
>
> Also reducing the size of the managed memory to have more heap space
> available might help.
>
> If that doesn't solve the problem, it would be good if you could share
> some details about your job (which operators, which local strategies, how
> many operators) that might help to identify the misbehaving operator.
>
>
>
> Thanks, Fabian
>
>
> [1] https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-
> and-Bytes.html
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__flink.apache.org_news_2015_05_11_Juggling-2Dwith-2DBits-2Dand-2DBytes.html&d=DgMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=gXNF2FIfEb4pGn-GTNBdJ8q7RfWbahzA3eykq55STe0&s=PSRQ06vPRHlB-80MrNWAIGluVV4I-c7kZ35Dm-OIRzs&e=>
>
>
>
> 2017-04-19 16:09 GMT+02:00 Newport, Billy <billy.newp...@gs.com>:
>
> How does Flink use memory? We’re seeing cases when running a job on larger
> datasets where it throws OOM exceptions during the job. We’re using the
> Dataset API. Shouldn’t flink be streaming from disk to disk? We workaround
> by using fewer slots but it seems unintuitive that I need to change these
> settings given Flink != Spark. Why isn’t Flinks memory usage constant? Why
> couldn’t I run a job with a single task and a single slot for any size job
> successfully other than it takes much longer to run.
>
>
>
> Thanks
>
> Billy
>
>
>
>
>
>
>
>
>

Reply via email to