Hello,

I went on and did some further debugging on this issue. Even though the
exception said that the problem comes from here:
4837 [Join(Join at* weighEdges(NodeSplitting.java:117)*) (1/4)] ERROR
org.apache.flink.runtime.operators.RegularPactTask  - Error in task code:
Join(Join at weighEdges(NodeSplitting.java:117)) (1/4)
java.lang.Exception: The data preparation for task 'Join(Join at
weighEdges(NodeSplitting.java:117))' , caused an error: Too few memory
segments provided. Hash Join needs at least 33 memory segments.
    at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
    at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
    at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
    at java.lang.Thread.run(Thread.java:745)

which is basically a chain of two joins, schema that I have repeated
several times, including in the getTriplets() method and it passed every
time. I thought that this could not be right!

So I picked each intermediate data set formed, printed it and added a
System.exit(0) afterwards. The exception comes from this method:
aggregatePartialValuesSplitVertices. Even though this computes the correct
result, it then throws the memory segment exception(!!!!!! Just for the
Cluster test - everything else works).

The code in the function is:

private static DataSet<Vertex<String, Long>>
aggregatePartialValuesSplitVertices(DataSet<Vertex<String, Long>>
resultedVertices) {

   return resultedVertices.flatMap(new FlatMapFunction<Vertex<String,
Long>, Vertex<String, Long>>() {

      @Override
      public void flatMap(Vertex<String, Long> vertex,
Collector<Vertex<String, Long>> collector) throws Exception {
         int pos = vertex.getId().indexOf("_");

         // if there is a splitted vertex
         if(pos > -1) {
            collector.collect(new Vertex<String,
Long>(vertex.getId().substring(0, pos), vertex.getValue()));
         } else {
            collector.collect(vertex);
         }
      }
   }).groupBy(0).reduceGroup(new GroupReduceFunction<Vertex<String,
Long>, Vertex<String, Long>>() {

      @Override
      public void reduce(Iterable<Vertex<String, Long>> iterable,
                     Collector<Vertex<String, Long>> collector) throws
Exception {
         long sum = 0;
         Vertex<String, Long> vertex = new Vertex<String, Long>();

         Iterator<Vertex<String, Long>> iterator = iterable.iterator();
         while (iterator.hasNext()) {
            vertex = iterator.next();
            sum += vertex.getValue();
         }

         collector.collect(new Vertex<String, Long>(vertex.getId(), sum));
      }
   });

To me, nothing seems out of the ordinary here. This is regular user code.
And the behaviour in the end is definitely not the one expected. Any idea
why this might be happening?

Thanks!
Andra

On Fri, Mar 27, 2015 at 12:08 AM, Andra Lungu <lungu.an...@gmail.com> wrote:

> Opps! Sorry! Did not know the mailing list does not support attachments :)
> https://gist.github.com/andralungu/fba36d77f79189daa183
>
> On Fri, Mar 27, 2015 at 12:02 AM, Andra Lungu <lungu.an...@gmail.com>
> wrote:
>
>> Hi Fabian,
>>
>> I uploaded a file with my execution plan.
>>
>> On Thu, Mar 26, 2015 at 11:50 PM, Fabian Hueske <fhue...@gmail.com>
>> wrote:
>>
>>> Hi Andra,
>>>
>>> the error is independent of the size of the data set. A HashTable needs
>>> at
>>> least 33 memory pages to operate.
>>> Since you have 820MB of managed memory and the size of a memory page is
>>> 32KB, there should be more than 25k pages available.
>>>
>>> Can you post the execution plan of the program you execute (
>>> ExecutionEnvironment.getExecutionPlan() )?
>>>
>>> Best, Fabian
>>>
>>> 2015-03-26 23:31 GMT+01:00 Andra Lungu <lungu.an...@gmail.com>:
>>>
>>> > For 20 edges and 5 nodes, that should be more thank enough.
>>> >
>>> > On Thu, Mar 26, 2015 at 11:24 PM, Andra Lungu <lungu.an...@gmail.com>
>>> > wrote:
>>> >
>>> > > Sure,
>>> > >
>>> > > 3470 [main] INFO  org.apache.flink.runtime.taskmanager.TaskManager  -
>>> > > Using 820 MB for Flink managed memory.
>>> > >
>>> > > On Thu, Mar 26, 2015 at 4:48 PM, Robert Metzger <rmetz...@apache.org
>>> >
>>> > > wrote:
>>> > >
>>> > >> Hi,
>>> > >>
>>> > >> during startup, Flink will log something like:
>>> > >> 16:48:09,669 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>>> > >>      - Using 1193 MB for Flink managed memory.
>>> > >>
>>> > >> Can you tell us how much memory Flink is managing in your case?
>>> > >>
>>> > >>
>>> > >>
>>> > >> On Thu, Mar 26, 2015 at 4:46 PM, Andra Lungu <lungu.an...@gmail.com
>>> >
>>> > >> wrote:
>>> > >>
>>> > >> > Hello everyone,
>>> > >> >
>>> > >> > I guess I need to revive this old discussion:
>>> > >> >
>>> > >> >
>>> > >>
>>> >
>>> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Memory-segment-error-when-migrating-functional-code-from-Flink-0-9-to-0-8-td3687.html
>>> > >> >
>>> > >> > At that point, the fix was to kindly ask Alex to make his project
>>> work
>>> > >> with
>>> > >> > 0.9.
>>> > >> >
>>> > >> > Now, I am not that lucky!
>>> > >> >
>>> > >> > This is the code:
>>> > >> > https://github.com/andralungu/gelly-partitioning/tree/alphaSplit
>>> > >> >
>>> > >> > The main program(NodeSplitting) is working nicely, I get the
>>> correct
>>> > >> > result. But if you run the test,  you will see that collection
>>> works
>>> > and
>>> > >> > cluster fails miserably with this exception:
>>> > >> >
>>> > >> > Caused by: java.lang.Exception: The data preparation for task
>>> > >> 'Join(Join at
>>> > >> > weighEdges(NodeSplitting.java:112))
>>> > (04e172e761148a65783a4363406e08c0)'
>>> > >> ,
>>> > >> > caused an error: Too few memory segments provided. Hash Join
>>> needs at
>>> > >> least
>>> > >> > 33 memory segments.
>>> > >> >     at
>>> > >> >
>>> > >> >
>>> > >>
>>> >
>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471)
>>> > >> >     at
>>> > >> >
>>> > >> >
>>> > >>
>>> >
>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
>>> > >> >     at
>>> > >> >
>>> > >> >
>>> > >>
>>> >
>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:209)
>>> > >> >     at java.lang.Thread.run(Thread.java:745)
>>> > >> > Caused by: java.lang.IllegalArgumentException: Too few memory
>>> segments
>>> > >> > provided. Hash Join needs at least 33 memory segments.
>>> > >> >
>>> > >> > I am running locally, from IntelliJ, on a tiny graph.
>>> > >> > $ cat /proc/meminfo
>>> > >> > MemTotal:       11405696 kB
>>> > >> > MemFree:         5586012 kB
>>> > >> > Buffers:          178100 kB
>>> > >> >
>>> > >> > I am sure I did not run out of memory...
>>> > >> >
>>> > >> > Any thoughts on this?
>>> > >> >
>>> > >> > Thanks!
>>> > >> > Andra
>>> > >> >
>>> > >>
>>> > >
>>> > >
>>> >
>>>
>>
>>
>

Reply via email to