Hi Billy, if it's possible that you can share some parts of your code privately with me, then I can try to figure out what's going wrong.
Cheers, Till On Thu, Apr 20, 2017 at 6:00 PM, Newport, Billy <billy.newp...@gs.com> wrote: > Ok > > The concensus seems to be that it’s us not Flink J So we’ll look harder > at what we’re doing in case there is anything silly. We are using 16K > network buffers BTW which is around 0.5GB with the defaults. > > > > > > *From:* Till Rohrmann [mailto:trohrm...@apache.org] > *Sent:* Thursday, April 20, 2017 11:52 AM > *To:* Stefano Bortoli > *Cc:* Newport, Billy [Tech]; Fabian Hueske; user@flink.apache.org > > *Subject:* Re: Flink memory usage > > > > Hi Billy, > > > > if you didn't split the different data sets up into different slot sharing > groups, then your maximum parallelism is 40. Thus, it should be enough to > assign 40^2 * 20 * 4 = 128000 network buffers. If that is not enough > because you have more than 4 shuffling steps in parallel running then you > have to increase the last term. > > > > OOM exceptions should actually only occur due to user code objects. Given > that you have reserved a massive amount of memory for the network buffers > the remaining heap for the user code is probably very small. Try whether > you can decrease the number of network buffers. Moreover, check whether > your user code keeps somewhere references to objects which could cause the > OOM. > > > > Cheers, > > Till > > > > On Thu, Apr 20, 2017 at 5:42 PM, Stefano Bortoli < > stefano.bort...@huawei.com> wrote: > > I think that if you have a lot of memory available, the GC gets kind of > lazy. In our case, the issue was just the latency caused by the GC, cause > we were loading more data than it could fit in memory. Hence optimizing the > code gave us a lot of improvements. FlatMaps are also dangerous as objects > can multiply beyond expected, making co-group extremely costly. :-) A > distinct() well placed saves a lot of time and memory. > > > > My point is that having worked with scarce resources I learned that almost > all the time the issue was my code, not the framework. > > > > Good luck. > > > > Stefano > > > > *From:* Newport, Billy [mailto:billy.newp...@gs.com] > *Sent:* Thursday, April 20, 2017 4:46 PM > *To:* Stefano Bortoli <stefano.bort...@huawei.com>; 'Fabian Hueske' < > fhue...@gmail.com> > > > *Cc:* 'user@flink.apache.org' <user@flink.apache.org> > *Subject:* RE: Flink memory usage > > > > Your reuse idea kind of implies that it’s a GC generation rate issue, i.e. > it’s not collecting fast enough so it’s running out of memory versus heap > that’s actually anchored, right? > > > > > > *From:* Stefano Bortoli [mailto:stefano.bort...@huawei.com > <stefano.bort...@huawei.com>] > *Sent:* Thursday, April 20, 2017 10:33 AM > *To:* Newport, Billy [Tech]; 'Fabian Hueske' > *Cc:* 'user@flink.apache.org' > *Subject:* RE: Flink memory usage > > > > Hi Billy, > > > > The only suggestion I can give is to check very well in your code for > useless variable allocations, and foster reuse as much as possible. Don’t > create a new collection at any map execution, but rather clear, reuse the > collected output of the flatMap, and so on. In the past we run long > process of lot of data and small memory without problems. Many more complex > co-group, joins and so on without any issue. > > > > My2c. Hope it helps. > > > > Stefano > > > > *From:* Newport, Billy [mailto:billy.newp...@gs.com <billy.newp...@gs.com>] > > *Sent:* Thursday, April 20, 2017 1:31 PM > *To:* 'Fabian Hueske' <fhue...@gmail.com> > *Cc:* 'user@flink.apache.org' <user@flink.apache.org> > *Subject:* RE: Flink memory usage > > > > I don’t think our function are memory heavy they typically are cogroups > and merge the records on the left with the records on the right. > > > > We’re currently requiring 720GB of heap to do our processing which frankly > appears ridiculous to us. Could too much parallelism be causing the > problem? Looking at: > > > > http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/Optimal-Configuration-for-Cluster-td5024.html > <https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_Optimal-2DConfiguration-2Dfor-2DCluster-2Dtd5024.html&d=DgMGaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=ORtElREFIpUSPJ6hOwnIj181geKdc8QSLx-WpIoc7FE&s=9H61hE0TVJvlOBRwSHrvnOhxKioqoepvCgjG0ZJefIY&e=> > > > > If we are processing 17 “datasets” in a single job and each has an > individual parallelism of 40 is that a total parallelism (potential) of > 17*40 and given your network buffers calculation of parallelism squared, > would that do it or only if we explicitly configure it that way: > > > > taskmanager.network.numberOfBuffers: p ^ 2 * t * 4 > > > where p is the maximum parallelism of the job and t is the number of task > manager. > > You can process more than one parallel task per TM if you configure more > than one processing slot per machine ( taskmanager.numberOfTaskSlots). > The TM will divide its memory among all its slots. So it would be possible > to start one TM for each machine with 100GB+ memory and 48 slots each. > > > > Our pipeline for each dataset looks like this: > > > > Read avro file -> FlatMap -> Validate each record with a flatmap -> > > Read Parquet -> FlatMap -> Filter Live Rows -> CoGroup with the validated > avro file above -> } > > Read Parquet -> FlatMap -> Filter Dead Rows ------------------------------ > ----à } Union cogroup with dead rows and write result to parquet file. > > > > I don’t understand why this logic couldn’t run with a single task manager > and just take longer. We’re having a lot of trouble trying to change the > tuning to reduce the memory burn. We run the above pipeline with > parallelism 40 for all 17 datasets in a single job. > > > > We’re running this config now which is not really justifiable for what > we’re doing. > > > > 20 nodes 2 slots, 40 parallelism 36GB mem = 720GB of heap… > > > > Thanks > > > > *From:* Fabian Hueske [mailto:fhue...@gmail.com <fhue...@gmail.com>] > *Sent:* Wednesday, April 19, 2017 10:52 AM > *To:* Newport, Billy [Tech] > *Cc:* user@flink.apache.org > *Subject:* Re: Flink memory usage > > > > Hi Billy, > > Flink's internal operators are implemented to not allocate heap space > proportional to the size of the input data. > > Whenever Flink needs to hold data in memory (e.g., for sorting or building > a hash table) the data is serialized into managed memory. If all memory is > in use, Flink starts spilling to disk. This blog post discusses how Flink > uses its managed memory [1] (still up to date, even though it's almost 2 > years old). > > The runtime code should actually quite stable. Most of the code has been > there for several years (even before Flink was donated to the ASF) and we > haven't seen many bugs reported for the DataSet runtime. Of course this > does not mean that the code doesn't contain bugs. > > > > However, Flink does not take care of the user code. For example a > GroupReduceFunction that collects a lot of data, e.g., in a List on the > heap, can still kill a program. > > I would check if you have user functions that require lots of heap memory. > > Also reducing the size of the managed memory to have more heap space > available might help. > > If that doesn't solve the problem, it would be good if you could share > some details about your job (which operators, which local strategies, how > many operators) that might help to identify the misbehaving operator. > > > > Thanks, Fabian > > > [1] https://flink.apache.org/news/2015/05/11/Juggling-with-Bits- > and-Bytes.html > <https://urldefense.proofpoint.com/v2/url?u=https-3A__flink.apache.org_news_2015_05_11_Juggling-2Dwith-2DBits-2Dand-2DBytes.html&d=DgMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=gXNF2FIfEb4pGn-GTNBdJ8q7RfWbahzA3eykq55STe0&s=PSRQ06vPRHlB-80MrNWAIGluVV4I-c7kZ35Dm-OIRzs&e=> > > > > 2017-04-19 16:09 GMT+02:00 Newport, Billy <billy.newp...@gs.com>: > > How does Flink use memory? We’re seeing cases when running a job on larger > datasets where it throws OOM exceptions during the job. We’re using the > Dataset API. Shouldn’t flink be streaming from disk to disk? We workaround > by using fewer slots but it seems unintuitive that I need to change these > settings given Flink != Spark. Why isn’t Flinks memory usage constant? Why > couldn’t I run a job with a single task and a single slot for any size job > successfully other than it takes much longer to run. > > > > Thanks > > Billy > > > > > > > > >