Well, each combiner, reducer, join, coGroup, and solutionset needs a share of memory (maps & filters don't). In your case it was pretty much at the edge, the hash joins require 33 buffers and got 32. So one memory-consuming operator less might fix it. I did not look in detail at the other job, but it did not seem so much more complex than the other. As said before, LOCs or total number of operators are not the important thing here. It's the number of memory consumers.
I am not sure how hard the 80MB limit is. Maybe it is possible to increase that a bit. 2015-03-30 23:25 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>: > Hi Fabian, > > I'll see what I can do :). > I am just a bit shocked. If this set of coGroups and joins was too much for > a test case, how come the following worked? > > > https://github.com/andralungu/flink/commit/f60b022de056ac259459b68eee6ff0ae9993f0f8 > > 400 lines of complex computations :) And I have an even bigger one for > which the test also passed... > > > On Mon, Mar 30, 2015 at 2:31 PM, Fabian Hueske <fhue...@gmail.com> wrote: > > > Hi Andra, > > > > I found the cause for the exception. Your test case is simply too complex > > for our testing environment. > > We restrict the TM memory for testcases to 80MB in order to execute > > multiple tests in parallel on Travis. > > I counted the memory consumers in your job and got: > > > > - 2 Combine > > - 4 GroupReduce > > - 4 CoGroup > > - 2 Joins > > - 1 SolutionSet > > > > Those are quite a few memory consumers for 20MB per slot (4 slots per > TM). > > > > Do you see a way to reduce the number of operators in your testcase, > maybe > > by splitting it in half? > > > > 2015-03-30 11:01 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>: > > > > > Sure, > > > > > > It was in the first mail but that was sent a while ago :) > > > > > > This is the code: > > > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit > > > I also added the log4j file in case it helps! > > > > > > The error is totally reproducible. 2 out of 2 people got the same. > > > Steps to reproduce: > > > 1). Clone the code; switch to alphaSplit branch > > > 2). Run CounDegreeITCase.java > > > > > > Hope we can get to the bottom of this! If you need something, just ask. > > > > > > > > > On Mon, Mar 30, 2015 at 10:54 AM, Fabian Hueske <fhue...@gmail.com> > > wrote: > > > > > > > Hmm, that is really weird. > > > > Can you point me to a branch in your repository and the test case > that > > > > gives the error? > > > > > > > > Then I have a look at it and try to figure out what's going wrong. > > > > > > > > Cheers, Fabian > > > > > > > > 2015-03-30 10:43 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>: > > > > > > > > > Hello, > > > > > > > > > > I went on and did some further debugging on this issue. Even though > > the > > > > > exception said that the problem comes from here: > > > > > 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)] > ERROR > > > > > org.apache.flink.runtime.operators.RegularPactTask - Error in task > > > code: > > > > > Join(Join at weighEdges(NodeSplitting.java:117)) (1/4) > > > > > java.lang.Exception: The data preparation for task 'Join(Join at > > > > > weighEdges(NodeSplitting.java:117))' , caused an error: Too few > > memory > > > > > segments provided. Hash Join needs at least 33 memory segments. > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) > > > > > at java.lang.Thread.run(Thread.java:745) > > > > > > > > > > which is basically a chain of two joins, schema that I have > repeated > > > > > several times, including in the getTriplets() method and it passed > > > every > > > > > time. I thought that this could not be right! > > > > > > > > > > So I picked each intermediate data set formed, printed it and > added a > > > > > System.exit(0) afterwards. The exception comes from this method: > > > > > aggregatePartialValuesSplitVertices. Even though this computes the > > > > correct > > > > > result, it then throws the memory segment exception(!!!!!! Just for > > the > > > > > Cluster test - everything else works). > > > > > > > > > > The code in the function is: > > > > > > > > > > private static DataSet<Vertex<String, Long>> > > > > > aggregatePartialValuesSplitVertices(DataSet<Vertex<String, Long>> > > > > > resultedVertices) { > > > > > > > > > > return resultedVertices.flatMap(new > FlatMapFunction<Vertex<String, > > > > > Long>, Vertex<String, Long>>() { > > > > > > > > > > @Override > > > > > public void flatMap(Vertex<String, Long> vertex, > > > > > Collector<Vertex<String, Long>> collector) throws Exception { > > > > > int pos = vertex.getId().indexOf("_"); > > > > > > > > > > // if there is a splitted vertex > > > > > if(pos > -1) { > > > > > collector.collect(new Vertex<String, > > > > > Long>(vertex.getId().substring(0, pos), vertex.getValue())); > > > > > } else { > > > > > collector.collect(vertex); > > > > > } > > > > > } > > > > > }).groupBy(0).reduceGroup(new GroupReduceFunction<Vertex<String, > > > > > Long>, Vertex<String, Long>>() { > > > > > > > > > > @Override > > > > > public void reduce(Iterable<Vertex<String, Long>> iterable, > > > > > Collector<Vertex<String, Long>> collector) > > throws > > > > > Exception { > > > > > long sum = 0; > > > > > Vertex<String, Long> vertex = new Vertex<String, Long>(); > > > > > > > > > > Iterator<Vertex<String, Long>> iterator = > > iterable.iterator(); > > > > > while (iterator.hasNext()) { > > > > > vertex = iterator.next(); > > > > > sum += vertex.getValue(); > > > > > } > > > > > > > > > > collector.collect(new Vertex<String, Long>(vertex.getId(), > > > > sum)); > > > > > } > > > > > }); > > > > > > > > > > To me, nothing seems out of the ordinary here. This is regular user > > > code. > > > > > And the behaviour in the end is definitely not the one expected. > Any > > > idea > > > > > why this might be happening? > > > > > > > > > > Thanks! > > > > > Andra > > > > > > > > > > On Fri, Mar 27, 2015 at 12:08 AM, Andra Lungu < > lungu.an...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > Opps! Sorry! Did not know the mailing list does not support > > > attachments > > > > > :) > > > > > > https://gist.github.com/andralungu/fba36d77f79189daa183 > > > > > > > > > > > > On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu < > > lungu.an...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > >> Hi Fabian, > > > > > >> > > > > > >> I uploaded a file with my execution plan. > > > > > >> > > > > > >> On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske < > > fhue...@gmail.com> > > > > > >> wrote: > > > > > >> > > > > > >>> Hi Andra, > > > > > >>> > > > > > >>> the error is independent of the size of the data set. A > HashTable > > > > needs > > > > > >>> at > > > > > >>> least 33 memory pages to operate. > > > > > >>> Since you have 820MB of managed memory and the size of a memory > > > page > > > > is > > > > > >>> 32KB, there should be more than 25k pages available. > > > > > >>> > > > > > >>> Can you post the execution plan of the program you execute ( > > > > > >>> ExecutionEnvironment.getExecutionPlan() )? > > > > > >>> > > > > > >>> Best, Fabian > > > > > >>> > > > > > >>> 2015-03-26 23:31 GMT+01:00 Andra Lungu <lungu.an...@gmail.com > >: > > > > > >>> > > > > > >>> > For 20 edges and 5 nodes, that should be more thank enough. > > > > > >>> > > > > > > >>> > On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu < > > > > lungu.an...@gmail.com > > > > > > > > > > > >>> > wrote: > > > > > >>> > > > > > > >>> > > Sure, > > > > > >>> > > > > > > > >>> > > 3470 [main] INFO > > > > > org.apache.flink.runtime.taskmanager.TaskManager - > > > > > >>> > > Using 820 MB for Flink managed memory. > > > > > >>> > > > > > > > >>> > > On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger < > > > > > rmetz...@apache.org > > > > > >>> > > > > > > >>> > > wrote: > > > > > >>> > > > > > > > >>> > >> Hi, > > > > > >>> > >> > > > > > >>> > >> during startup, Flink will log something like: > > > > > >>> > >> 16:48:09,669 INFO > > > > > org.apache.flink.runtime.taskmanager.TaskManager > > > > > >>> > >> - Using 1193 MB for Flink managed memory. > > > > > >>> > >> > > > > > >>> > >> Can you tell us how much memory Flink is managing in your > > > case? > > > > > >>> > >> > > > > > >>> > >> > > > > > >>> > >> > > > > > >>> > >> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu < > > > > > lungu.an...@gmail.com > > > > > >>> > > > > > > >>> > >> wrote: > > > > > >>> > >> > > > > > >>> > >> > Hello everyone, > > > > > >>> > >> > > > > > > >>> > >> > I guess I need to revive this old discussion: > > > > > >>> > >> > > > > > > >>> > >> > > > > > > >>> > >> > > > > > >>> > > > > > > >>> > > > > > > > > > > > > > > > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html > > > > > >>> > >> > > > > > > >>> > >> > At that point, the fix was to kindly ask Alex to make > his > > > > > project > > > > > >>> work > > > > > >>> > >> with > > > > > >>> > >> > 0.9. > > > > > >>> > >> > > > > > > >>> > >> > Now, I am not that lucky! > > > > > >>> > >> > > > > > > >>> > >> > This is the code: > > > > > >>> > >> > > > > > > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit > > > > > >>> > >> > > > > > > >>> > >> > The main program(NodeSplitting) is working nicely, I get > > the > > > > > >>> correct > > > > > >>> > >> > result. But if you run the test, you will see that > > > collection > > > > > >>> works > > > > > >>> > and > > > > > >>> > >> > cluster fails miserably with this exception: > > > > > >>> > >> > > > > > > >>> > >> > Caused by: java.lang.Exception: The data preparation for > > > task > > > > > >>> > >> 'Join(Join at > > > > > >>> > >> > weighEdges(NodeSplitting.java:112)) > > > > > >>> > (04e172e761148a65783a4363406e08c0)' > > > > > >>> > >> , > > > > > >>> > >> > caused an error: Too few memory segments provided. Hash > > Join > > > > > >>> needs at > > > > > >>> > >> least > > > > > >>> > >> > 33 memory segments. > > > > > >>> > >> > at > > > > > >>> > >> > > > > > > >>> > >> > > > > > > >>> > >> > > > > > >>> > > > > > > >>> > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) > > > > > >>> > >> > at > > > > > >>> > >> > > > > > > >>> > >> > > > > > > >>> > >> > > > > > >>> > > > > > > >>> > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > > > > > >>> > >> > at > > > > > >>> > >> > > > > > > >>> > >> > > > > > > >>> > >> > > > > > >>> > > > > > > >>> > > > > > > > > > > > > > > > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209) > > > > > >>> > >> > at java.lang.Thread.run(Thread.java:745) > > > > > >>> > >> > Caused by: java.lang.IllegalArgumentException: Too few > > > memory > > > > > >>> segments > > > > > >>> > >> > provided. Hash Join needs at least 33 memory segments. > > > > > >>> > >> > > > > > > >>> > >> > I am running locally, from IntelliJ, on a tiny graph. > > > > > >>> > >> > $ cat /proc/meminfo > > > > > >>> > >> > MemTotal: 11405696 kB > > > > > >>> > >> > MemFree: 5586012 kB > > > > > >>> > >> > Buffers: 178100 kB > > > > > >>> > >> > > > > > > >>> > >> > I am sure I did not run out of memory... > > > > > >>> > >> > > > > > > >>> > >> > Any thoughts on this? > > > > > >>> > >> > > > > > > >>> > >> > Thanks! > > > > > >>> > >> > Andra > > > > > >>> > >> > > > > > > >>> > >> > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > > >