Hmm, that is really weird.
Can you point me to a branch in your repository and the test case that
gives the error?

Then I have a look at it and try to figure out what's going wrong.

Cheers, Fabian

2015-03-30 10:43 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>:

> Hello,
>
> I went on and did some further debugging on this issue. Even though the
> exception said that the problem comes from here:
> 4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)] ERROR
> org.apache.flink.runtime.operators.RegularPactTask  - Error in task code:
> Join(Join at weighEdges(NodeSplitting.java:117)) (1/4)
> java.lang.Exception: The data preparation for task 'Join(Join at
> weighEdges(NodeSplitting.java:117))' , caused an error: Too few memory
> segments provided. Hash Join needs at least 33 memory segments.
>     at
>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
>     at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>     at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>     at java.lang.Thread.run(Thread.java:745)
>
> which is basically a chain of two joins, schema that I have repeated
> several times, including in the getTriplets() method and it passed every
> time. I thought that this could not be right!
>
> So I picked each intermediate data set formed, printed it and added a
> System.exit(0) afterwards. The exception comes from this method:
> aggregatePartialValuesSplitVertices. Even though this computes the correct
> result, it then throws the memory segment exception(!!!!!! Just for the
> Cluster test - everything else works).
>
> The code in the function is:
>
> private static DataSet<Vertex<String, Long>>
> aggregatePartialValuesSplitVertices(DataSet<Vertex<String, Long>>
> resultedVertices) {
>
>    return resultedVertices.flatMap(new FlatMapFunction<Vertex<String,
> Long>, Vertex<String, Long>>() {
>
>       @Override
>       public void flatMap(Vertex<String, Long> vertex,
> Collector<Vertex<String, Long>> collector) throws Exception {
>          int pos = vertex.getId().indexOf("_");
>
>          // if there is a splitted vertex
>          if(pos > -1) {
>             collector.collect(new Vertex<String,
> Long>(vertex.getId().substring(0, pos), vertex.getValue()));
>          } else {
>             collector.collect(vertex);
>          }
>       }
>    }).groupBy(0).reduceGroup(new GroupReduceFunction<Vertex<String,
> Long>, Vertex<String, Long>>() {
>
>       @Override
>       public void reduce(Iterable<Vertex<String, Long>> iterable,
>                      Collector<Vertex<String, Long>> collector) throws
> Exception {
>          long sum = 0;
>          Vertex<String, Long> vertex = new Vertex<String, Long>();
>
>          Iterator<Vertex<String, Long>> iterator = iterable.iterator();
>          while (iterator.hasNext()) {
>             vertex = iterator.next();
>             sum += vertex.getValue();
>          }
>
>          collector.collect(new Vertex<String, Long>(vertex.getId(), sum));
>       }
>    });
>
> To me, nothing seems out of the ordinary here. This is regular user code.
> And the behaviour in the end is definitely not the one expected. Any idea
> why this might be happening?
>
> Thanks!
> Andra
>
> On Fri, Mar 27, 2015 at 12:08 AM, Andra Lungu <lungu.an...@gmail.com>
> wrote:
>
> > Opps! Sorry! Did not know the mailing list does not support attachments
> :)
> > https://gist.github.com/andralungu/fba36d77f79189daa183
> >
> > On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu <lungu.an...@gmail.com>
> > wrote:
> >
> >> Hi Fabian,
> >>
> >> I uploaded a file with my execution plan.
> >>
> >> On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske <fhue...@gmail.com>
> >> wrote:
> >>
> >>> Hi Andra,
> >>>
> >>> the error is independent of the size of the data set. A HashTable needs
> >>> at
> >>> least 33 memory pages to operate.
> >>> Since you have 820MB of managed memory and the size of a memory page is
> >>> 32KB, there should be more than 25k pages available.
> >>>
> >>> Can you post the execution plan of the program you execute (
> >>> ExecutionEnvironment.getExecutionPlan() )?
> >>>
> >>> Best, Fabian
> >>>
> >>> 2015-03-26 23:31 GMT+01:00 Andra Lungu <lungu.an...@gmail.com>:
> >>>
> >>> > For 20 edges and 5 nodes, that should be more thank enough.
> >>> >
> >>> > On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu <lungu.an...@gmail.com
> >
> >>> > wrote:
> >>> >
> >>> > > Sure,
> >>> > >
> >>> > > 3470 [main] INFO
> org.apache.flink.runtime.taskmanager.TaskManager  -
> >>> > > Using 820 MB for Flink managed memory.
> >>> > >
> >>> > > On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger <
> rmetz...@apache.org
> >>> >
> >>> > > wrote:
> >>> > >
> >>> > >> Hi,
> >>> > >>
> >>> > >> during startup, Flink will log something like:
> >>> > >> 16:48:09,669 INFO
> org.apache.flink.runtime.taskmanager.TaskManager
> >>> > >>      - Using 1193 MB for Flink managed memory.
> >>> > >>
> >>> > >> Can you tell us how much memory Flink is managing in your case?
> >>> > >>
> >>> > >>
> >>> > >>
> >>> > >> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu <
> lungu.an...@gmail.com
> >>> >
> >>> > >> wrote:
> >>> > >>
> >>> > >> > Hello everyone,
> >>> > >> >
> >>> > >> > I guess I need to revive this old discussion:
> >>> > >> >
> >>> > >> >
> >>> > >>
> >>> >
> >>>
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html
> >>> > >> >
> >>> > >> > At that point, the fix was to kindly ask Alex to make his
> project
> >>> work
> >>> > >> with
> >>> > >> > 0.9.
> >>> > >> >
> >>> > >> > Now, I am not that lucky!
> >>> > >> >
> >>> > >> > This is the code:
> >>> > >> >
> https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
> >>> > >> >
> >>> > >> > The main program(NodeSplitting) is working nicely, I get the
> >>> correct
> >>> > >> > result. But if you run the test,  you will see that collection
> >>> works
> >>> > and
> >>> > >> > cluster fails miserably with this exception:
> >>> > >> >
> >>> > >> > Caused by: java.lang.Exception: The data preparation for task
> >>> > >> 'Join(Join at
> >>> > >> > weighEdges(NodeSplitting.java:112))
> >>> > (04e172e761148a65783a4363406e08c0)'
> >>> > >> ,
> >>> > >> > caused an error: Too few memory segments provided. Hash Join
> >>> needs at
> >>> > >> least
> >>> > >> > 33 memory segments.
> >>> > >> >     at
> >>> > >> >
> >>> > >> >
> >>> > >>
> >>> >
> >>>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
> >>> > >> >     at
> >>> > >> >
> >>> > >> >
> >>> > >>
> >>> >
> >>>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
> >>> > >> >     at
> >>> > >> >
> >>> > >> >
> >>> > >>
> >>> >
> >>>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209)
> >>> > >> >     at java.lang.Thread.run(Thread.java:745)
> >>> > >> > Caused by: java.lang.IllegalArgumentException: Too few memory
> >>> segments
> >>> > >> > provided. Hash Join needs at least 33 memory segments.
> >>> > >> >
> >>> > >> > I am running locally, from IntelliJ, on a tiny graph.
> >>> > >> > $ cat /proc/meminfo
> >>> > >> > MemTotal:       11405696 kB
> >>> > >> > MemFree:         5586012 kB
> >>> > >> > Buffers:          178100 kB
> >>> > >> >
> >>> > >> > I am sure I did not run out of memory...
> >>> > >> >
> >>> > >> > Any thoughts on this?
> >>> > >> >
> >>> > >> > Thanks!
> >>> > >> > Andra
> >>> > >> >
> >>> > >>
> >>> > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >
>

Reply via email to