Hmm, that is really weird. Can you point me to a branch in your repository and the test case that gives the error?
Then I have a look at it and try to figure out what's going wrong. Cheers, Fabian 2015-03-30 10:43 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>: > Hello, > > I went on and did some further debugging on this issue. Even though the > exception said that the problem comes from here: > 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)] ERROR > org.apache.flink.runtime.operators.RegularPactTask - Error in task code: > Join(Join at weighEdges(NodeSplitting.java:117)) (1/4) > java.lang.Exception: The data preparation for task 'Join(Join at > weighEdges(NodeSplitting.java:117))' , caused an error: Too few memory > segments provided. Hash Join needs at least 33 memory segments. > at > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) > at > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > at > > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) > at java.lang.Thread.run(Thread.java:745) > > which is basically a chain of two joins, schema that I have repeated > several times, including in the getTriplets() method and it passed every > time. I thought that this could not be right! > > So I picked each intermediate data set formed, printed it and added a > System.exit(0) afterwards. The exception comes from this method: > aggregatePartialValuesSplitVertices. Even though this computes the correct > result, it then throws the memory segment exception(!!!!!! Just for the > Cluster test - everything else works). > > The code in the function is: > > private static DataSet<Vertex<String, Long>> > aggregatePartialValuesSplitVertices(DataSet<Vertex<String, Long>> > resultedVertices) { > > return resultedVertices.flatMap(new FlatMapFunction<Vertex<String, > Long>, Vertex<String, Long>>() { > > @Override > public void flatMap(Vertex<String, Long> vertex, > Collector<Vertex<String, Long>> collector) throws Exception { > int pos = vertex.getId().indexOf("_"); > > // if there is a splitted vertex > if(pos > -1) { > collector.collect(new Vertex<String, > Long>(vertex.getId().substring(0, pos), vertex.getValue())); > } else { > collector.collect(vertex); > } > } > }).groupBy(0).reduceGroup(new GroupReduceFunction<Vertex<String, > Long>, Vertex<String, Long>>() { > > @Override > public void reduce(Iterable<Vertex<String, Long>> iterable, > Collector<Vertex<String, Long>> collector) throws > Exception { > long sum = 0; > Vertex<String, Long> vertex = new Vertex<String, Long>(); > > Iterator<Vertex<String, Long>> iterator = iterable.iterator(); > while (iterator.hasNext()) { > vertex = iterator.next(); > sum += vertex.getValue(); > } > > collector.collect(new Vertex<String, Long>(vertex.getId(), sum)); > } > }); > > To me, nothing seems out of the ordinary here. This is regular user code. > And the behaviour in the end is definitely not the one expected. Any idea > why this might be happening? > > Thanks! > Andra > > On Fri, Mar 27, 2015 at 12:08 AM, Andra Lungu <lungu.an...@gmail.com> > wrote: > > > Opps! Sorry! Did not know the mailing list does not support attachments > :) > > https://gist.github.com/andralungu/fba36d77f79189daa183 > > > > On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu <lungu.an...@gmail.com> > > wrote: > > > >> Hi Fabian, > >> > >> I uploaded a file with my execution plan. > >> > >> On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske <fhue...@gmail.com> > >> wrote: > >> > >>> Hi Andra, > >>> > >>> the error is independent of the size of the data set. A HashTable needs > >>> at > >>> least 33 memory pages to operate. > >>> Since you have 820MB of managed memory and the size of a memory page is > >>> 32KB, there should be more than 25k pages available. > >>> > >>> Can you post the execution plan of the program you execute ( > >>> ExecutionEnvironment.getExecutionPlan() )? > >>> > >>> Best, Fabian > >>> > >>> 2015-03-26 23:31 GMT+01:00 Andra Lungu <lungu.an...@gmail.com>: > >>> > >>> > For 20 edges and 5 nodes, that should be more thank enough. > >>> > > >>> > On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu <lungu.an...@gmail.com > > > >>> > wrote: > >>> > > >>> > > Sure, > >>> > > > >>> > > 3470 [main] INFO > org.apache.flink.runtime.taskmanager.TaskManager - > >>> > > Using 820 MB for Flink managed memory. > >>> > > > >>> > > On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger < > rmetz...@apache.org > >>> > > >>> > > wrote: > >>> > > > >>> > >> Hi, > >>> > >> > >>> > >> during startup, Flink will log something like: > >>> > >> 16:48:09,669 INFO > org.apache.flink.runtime.taskmanager.TaskManager > >>> > >> - Using 1193 MB for Flink managed memory. > >>> > >> > >>> > >> Can you tell us how much memory Flink is managing in your case? > >>> > >> > >>> > >> > >>> > >> > >>> > >> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu < > lungu.an...@gmail.com > >>> > > >>> > >> wrote: > >>> > >> > >>> > >> > Hello everyone, > >>> > >> > > >>> > >> > I guess I need to revive this old discussion: > >>> > >> > > >>> > >> > > >>> > >> > >>> > > >>> > http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html > >>> > >> > > >>> > >> > At that point, the fix was to kindly ask Alex to make his > project > >>> work > >>> > >> with > >>> > >> > 0.9. > >>> > >> > > >>> > >> > Now, I am not that lucky! > >>> > >> > > >>> > >> > This is the code: > >>> > >> > > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit > >>> > >> > > >>> > >> > The main program(NodeSplitting) is working nicely, I get the > >>> correct > >>> > >> > result. But if you run the test, you will see that collection > >>> works > >>> > and > >>> > >> > cluster fails miserably with this exception: > >>> > >> > > >>> > >> > Caused by: java.lang.Exception: The data preparation for task > >>> > >> 'Join(Join at > >>> > >> > weighEdges(NodeSplitting.java:112)) > >>> > (04e172e761148a65783a4363406e08c0)' > >>> > >> , > >>> > >> > caused an error: Too few memory segments provided. Hash Join > >>> needs at > >>> > >> least > >>> > >> > 33 memory segments. > >>> > >> > at > >>> > >> > > >>> > >> > > >>> > >> > >>> > > >>> > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) > >>> > >> > at > >>> > >> > > >>> > >> > > >>> > >> > >>> > > >>> > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > >>> > >> > at > >>> > >> > > >>> > >> > > >>> > >> > >>> > > >>> > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209) > >>> > >> > at java.lang.Thread.run(Thread.java:745) > >>> > >> > Caused by: java.lang.IllegalArgumentException: Too few memory > >>> segments > >>> > >> > provided. Hash Join needs at least 33 memory segments. > >>> > >> > > >>> > >> > I am running locally, from IntelliJ, on a tiny graph. > >>> > >> > $ cat /proc/meminfo > >>> > >> > MemTotal: 11405696 kB > >>> > >> > MemFree: 5586012 kB > >>> > >> > Buffers: 178100 kB > >>> > >> > > >>> > >> > I am sure I did not run out of memory... > >>> > >> > > >>> > >> > Any thoughts on this? > >>> > >> > > >>> > >> > Thanks! > >>> > >> > Andra > >>> > >> > > >>> > >> > >>> > > > >>> > > > >>> > > >>> > >> > >> > > >