Robert or Stephan know the Travis setup quite well. They might know, if we can give a bit more than 80MB. But at some point there will be a hard limit. Once we have dynamic memory management (most of) such problems should be solved.
2015-03-30 23:46 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>: > Oh! In that case, who should I refer to? :D > [It's kind of ugly to split this kind of test. I mean if a person is > counting the degrees, then that's the result that should be tested - at > least in my opinion] > > In any case, thanks for the help :) > > On Mon, Mar 30, 2015 at 11:37 PM, Fabian Hueske <fhue...@gmail.com> wrote: > > > Well, each combiner, reducer, join, coGroup, and solutionset needs a > share > > of memory (maps & filters don't). > > In your case it was pretty much at the edge, the hash joins require 33 > > buffers and got 32. So one memory-consuming operator less might fix it. > > I did not look in detail at the other job, but it did not seem so much > more > > complex than the other. As said before, LOCs or total number of operators > > are not the important thing here. It's the number of memory consumers. > > > > I am not sure how hard the 80MB limit is. Maybe it is possible to > increase > > that a bit. > > > > 2015-03-30 23:25 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>: > > > > > Hi Fabian, > > > > > > I'll see what I can do :). > > > I am just a bit shocked. If this set of coGroups and joins was too much > > for > > > a test case, how come the following worked? > > > > > > > > > > > > https://github.com/andralungu/flink/commit/f60b022de056ac259459b68eee6ff0ae9993f0f8 > > > > > > 400 lines of complex computations :) And I have an even bigger one for > > > which the test also passed... > > > > > > > > > On Mon, Mar 30, 2015 at 2:31 PM, Fabian Hueske <fhue...@gmail.com> > > wrote: > > > > > > > Hi Andra, > > > > > > > > I found the cause for the exception. Your test case is simply too > > complex > > > > for our testing environment. > > > > We restrict the TM memory for testcases to 80MB in order to execute > > > > multiple tests in parallel on Travis. > > > > I counted the memory consumers in your job and got: > > > > > > > > - 2 Combine > > > > - 4 GroupReduce > > > > - 4 CoGroup > > > > - 2 Joins > > > > - 1 SolutionSet > > > > > > > > Those are quite a few memory consumers for 20MB per slot (4 slots per > > > TM). > > > > > > > > Do you see a way to reduce the number of operators in your testcase, > > > maybe > > > > by splitting it in half? > > > > > > > > 2015-03-30 11:01 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>: > > > > > > > > > Sure, > > > > > > > > > > It was in the first mail but that was sent a while ago :) > > > > > > > > > > This is the code: > > > > > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit > > > > > I also added the log4j file in case it helps! > > > > > > > > > > The error is totally reproducible. 2 out of 2 people got the same. > > > > > Steps to reproduce: > > > > > 1). Clone the code; switch to alphaSplit branch > > > > > 2). Run CounDegreeITCase.java > > > > > > > > > > Hope we can get to the bottom of this! If you need something, just > > ask. > > > > > > > > > > > > > > > On Mon, Mar 30, 2015 at 10:54 AM, Fabian Hueske <fhue...@gmail.com > > > > > > wrote: > > > > > > > > > > > Hmm, that is really weird. > > > > > > Can you point me to a branch in your repository and the test case > > > that > > > > > > gives the error? > > > > > > > > > > > > Then I have a look at it and try to figure out what's going > wrong. > > > > > > > > > > > > Cheers, Fabian > > > > > > > > > > > > 2015-03-30 10:43 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>: > > > > > > > > > > > > > Hello, > > > > > > > > > > > > > > I went on and did some further debugging on this issue. Even > > though > > > > the > > > > > > > exception said that the problem comes from here: > > > > > > > 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)] > > > ERROR > > > > > > > org.apache.flink.runtime.operators.RegularPactTask - Error in > > task > > > > > code: > > > > > > > Join(Join at weighEdges(NodeSplitting.java:117)) (1/4) > > > > > > > java.lang.Exception: The data preparation for task 'Join(Join > at > > > > > > > weighEdges(NodeSplitting.java:117))' , caused an error: Too few > > > > memory > > > > > > > segments provided. Hash Join needs at least 33 memory segments. > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) > > > > > > > at java.lang.Thread.run(Thread.java:745) > > > > > > > > > > > > > > which is basically a chain of two joins, schema that I have > > > repeated > > > > > > > several times, including in the getTriplets() method and it > > passed > > > > > every > > > > > > > time. I thought that this could not be right! > > > > > > > > > > > > > > So I picked each intermediate data set formed, printed it and > > > added a > > > > > > > System.exit(0) afterwards. The exception comes from this > method: > > > > > > > aggregatePartialValuesSplitVertices. Even though this computes > > the > > > > > > correct > > > > > > > result, it then throws the memory segment exception(!!!!!! Just > > for > > > > the > > > > > > > Cluster test - everything else works). > > > > > > > > > > > > > > The code in the function is: > > > > > > > > > > > > > > private static DataSet<Vertex<String, Long>> > > > > > > > aggregatePartialValuesSplitVertices(DataSet<Vertex<String, > Long>> > > > > > > > resultedVertices) { > > > > > > > > > > > > > > return resultedVertices.flatMap(new > > > FlatMapFunction<Vertex<String, > > > > > > > Long>, Vertex<String, Long>>() { > > > > > > > > > > > > > > @Override > > > > > > > public void flatMap(Vertex<String, Long> vertex, > > > > > > > Collector<Vertex<String, Long>> collector) throws Exception { > > > > > > > int pos = vertex.getId().indexOf("_"); > > > > > > > > > > > > > > // if there is a splitted vertex > > > > > > > if(pos > -1) { > > > > > > > collector.collect(new Vertex<String, > > > > > > > Long>(vertex.getId().substring(0, pos), vertex.getValue())); > > > > > > > } else { > > > > > > > collector.collect(vertex); > > > > > > > } > > > > > > > } > > > > > > > }).groupBy(0).reduceGroup(new > > GroupReduceFunction<Vertex<String, > > > > > > > Long>, Vertex<String, Long>>() { > > > > > > > > > > > > > > @Override > > > > > > > public void reduce(Iterable<Vertex<String, Long>> > iterable, > > > > > > > Collector<Vertex<String, Long>> collector) > > > > throws > > > > > > > Exception { > > > > > > > long sum = 0; > > > > > > > Vertex<String, Long> vertex = new Vertex<String, > > Long>(); > > > > > > > > > > > > > > Iterator<Vertex<String, Long>> iterator = > > > > iterable.iterator(); > > > > > > > while (iterator.hasNext()) { > > > > > > > vertex = iterator.next(); > > > > > > > sum += vertex.getValue(); > > > > > > > } > > > > > > > > > > > > > > collector.collect(new Vertex<String, > > Long>(vertex.getId(), > > > > > > sum)); > > > > > > > } > > > > > > > }); > > > > > > > > > > > > > > To me, nothing seems out of the ordinary here. This is regular > > user > > > > > code. > > > > > > > And the behaviour in the end is definitely not the one > expected. > > > Any > > > > > idea > > > > > > > why this might be happening? > > > > > > > > > > > > > > Thanks! > > > > > > > Andra > > > > > > > > > > > > > > On Fri, Mar 27, 2015 at 12:08 AM, Andra Lungu < > > > lungu.an...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Opps! Sorry! Did not know the mailing list does not support > > > > > attachments > > > > > > > :) > > > > > > > > https://gist.github.com/andralungu/fba36d77f79189daa183 > > > > > > > > > > > > > > > > On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu < > > > > lungu.an...@gmail.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > >> Hi Fabian, > > > > > > > >> > > > > > > > >> I uploaded a file with my execution plan. > > > > > > > >> > > > > > > > >> On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske < > > > > fhue...@gmail.com> > > > > > > > >> wrote: > > > > > > > >> > > > > > > > >>> Hi Andra, > > > > > > > >>> > > > > > > > >>> the error is independent of the size of the data set. A > > > HashTable > > > > > > needs > > > > > > > >>> at > > > > > > > >>> least 33 memory pages to operate. > > > > > > > >>> Since you have 820MB of managed memory and the size of a > > memory > > > > > page > > > > > > is > > > > > > > >>> 32KB, there should be more than 25k pages available. > > > > > > > >>> > > > > > > > >>> Can you post the execution plan of the program you execute > ( > > > > > > > >>> ExecutionEnvironment.getExecutionPlan() )? > > > > > > > >>> > > > > > > > >>> Best, Fabian > > > > > > > >>> > > > > > > > >>> 2015-03-26 23:31 GMT+01:00 Andra Lungu < > > lungu.an...@gmail.com > > > >: > > > > > > > >>> > > > > > > > >>> > For 20 edges and 5 nodes, that should be more thank > enough. > > > > > > > >>> > > > > > > > > >>> > On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu < > > > > > > lungu.an...@gmail.com > > > > > > > > > > > > > > > >>> > wrote: > > > > > > > >>> > > > > > > > > >>> > > Sure, > > > > > > > >>> > > > > > > > > > >>> > > 3470 [main] INFO > > > > > > > org.apache.flink.runtime.taskmanager.TaskManager - > > > > > > > >>> > > Using 820 MB for Flink managed memory. > > > > > > > >>> > > > > > > > > > >>> > > On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger < > > > > > > > rmetz...@apache.org > > > > > > > >>> > > > > > > > > >>> > > wrote: > > > > > > > >>> > > > > > > > > > >>> > >> Hi, > > > > > > > >>> > >> > > > > > > > >>> > >> during startup, Flink will log something like: > > > > > > > >>> > >> 16:48:09,669 INFO > > > > > > > org.apache.flink.runtime.taskmanager.TaskManager > > > > > > > >>> > >> - Using 1193 MB for Flink managed memory. > > > > > > > >>> > >> > > > > > > > >>> > >> Can you tell us how much memory Flink is managing in > > your > > > > > case? > > > > > > > >>> > >> > > > > > > > >>> > >> > > > > > > > >>> > >> > > > > > > > >>> > >> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu < > > > > > > > lungu.an...@gmail.com > > > > > > > >>> > > > > > > > > >>> > >> wrote: > > > > > > > >>> > >> > > > > > > > >>> > >> > Hello everyone, > > > > > > > >>> > >> > > > > > > > > >>> > >> > I guess I need to revive this old discussion: > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > > >>> > > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html > > > > > > > >>> > >> > > > > > > > > >>> > >> > At that point, the fix was to kindly ask Alex to > make > > > his > > > > > > > project > > > > > > > >>> work > > > > > > > >>> > >> with > > > > > > > >>> > >> > 0.9. > > > > > > > >>> > >> > > > > > > > > >>> > >> > Now, I am not that lucky! > > > > > > > >>> > >> > > > > > > > > >>> > >> > This is the code: > > > > > > > >>> > >> > > > > > > > > > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit > > > > > > > >>> > >> > > > > > > > > >>> > >> > The main program(NodeSplitting) is working nicely, I > > get > > > > the > > > > > > > >>> correct > > > > > > > >>> > >> > result. But if you run the test, you will see that > > > > > collection > > > > > > > >>> works > > > > > > > >>> > and > > > > > > > >>> > >> > cluster fails miserably with this exception: > > > > > > > >>> > >> > > > > > > > > >>> > >> > Caused by: java.lang.Exception: The data preparation > > for > > > > > task > > > > > > > >>> > >> 'Join(Join at > > > > > > > >>> > >> > weighEdges(NodeSplitting.java:112)) > > > > > > > >>> > (04e172e761148a65783a4363406e08c0)' > > > > > > > >>> > >> , > > > > > > > >>> > >> > caused an error: Too few memory segments provided. > > Hash > > > > Join > > > > > > > >>> needs at > > > > > > > >>> > >> least > > > > > > > >>> > >> > 33 memory segments. > > > > > > > >>> > >> > at > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > > >>> > > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) > > > > > > > >>> > >> > at > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > > >>> > > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > > > > > > > >>> > >> > at > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > > >>> > > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209) > > > > > > > >>> > >> > at java.lang.Thread.run(Thread.java:745) > > > > > > > >>> > >> > Caused by: java.lang.IllegalArgumentException: Too > few > > > > > memory > > > > > > > >>> segments > > > > > > > >>> > >> > provided. Hash Join needs at least 33 memory > segments. > > > > > > > >>> > >> > > > > > > > > >>> > >> > I am running locally, from IntelliJ, on a tiny > graph. > > > > > > > >>> > >> > $ cat /proc/meminfo > > > > > > > >>> > >> > MemTotal: 11405696 kB > > > > > > > >>> > >> > MemFree: 5586012 kB > > > > > > > >>> > >> > Buffers: 178100 kB > > > > > > > >>> > >> > > > > > > > > >>> > >> > I am sure I did not run out of memory... > > > > > > > >>> > >> > > > > > > > > >>> > >> > Any thoughts on this? > > > > > > > >>> > >> > > > > > > > > >>> > >> > Thanks! > > > > > > > >>> > >> > Andra > > > > > > > >>> > >> > > > > > > > > >>> > >> > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >