Hi Vasia, 
 
thanks for your reply. 
 
Currently I am testing it on my normal workstation (16GB Ram) but I also tried 
it on out cluster. 
Both are failing at the same amount of nodes, so I guess it has something to do 
with Gelly
or with the properties.
 
The configured memory is default. I did not change it because I thought that 
flink is not the problem
but I might be wrong. 
 
The Input should not be much... I wrote an API for Virtuoso which is requesting 
a RDF-graph. But
I limited it to 10 Data Sets only.
 
This is my code, it is a bit messy and their might be improvement:
 

public static final class PathMessageFunction
            extends
            ScatterFunction<String, Tuple2<HashMap<String, List<String>>, 
HashMap<Integer, List<String>>>, List<String>, String> {
        @Override
        public void sendMessages(
                Vertex<String, Tuple2<HashMap<String, List<String>>, 
HashMap<Integer, List<String>>>> vertex)
                throws Exception {
            
            // The list "path" collects the ID's of the verticies a message was 
send to.   
            
            List<String> path = new ArrayList<String>();
            if (super.getSuperstepNumber() == 1) {
                path.add(vertex.getId());
            }
            if (super.getSuperstepNumber() > 1) {
                for (String values : 
vertex.f1.f1.get(super.getSuperstepNumber() - 1)) {
                            path.add(values + ";" + vertex.getId());
                }
            }
    
            // The Path-List is send to the next neighbouring Nodes. 
            
            for (Edge<String, String> edge : getEdges()) {
                sendMessageTo(edge.getTarget(), path);
            }
        }
    }
    
    
    
    public static final class PathUpdateFunction
            extends
            GatherFunction<String, Tuple2<HashMap<String, List<String>>, 
HashMap<Integer, List<String>>>, List<String>> {
        @Override
        public void updateVertex(
                Vertex<String, Tuple2<HashMap<String, List<String>>, 
HashMap<Integer, List<String>>>> vertex,
                MessageIterator<List<String>> messenger)
                throws Exception {
        
            List<String> newValues = new ArrayList<String>();
                        
            // The Path-List which was send as a message is also stored within 
the vertex value, therefore the Paths are saved to a new List "newValues".
            // This List should not contain the ID of the vertex itself to 
avoid cycles.
            
            for (List<String> msg : messenger) {
                for (String value : msg) {
                    if (!value.contains(vertex.getId())) {
                        newValues.add(value);
                    }
                }
            }
            
            // Creation of a new HashMap with the new and old values for the 
setNewVertexValue function 
                        
            HashMap<Integer, List<String>> newHashMap = vertex.f1.f1;
            newHashMap.put(super.getSuperstepNumber(), newValues);
            
            
            HashMap<String, List<String>> multiPaths = new HashMap<String, 
List<String>>();
            
            
            // Here it gets a bit complicated... However... I try to analyze 
the given paths for possible combinations of them.
            // For example... I got the path "a;b;c" and the patch "c;d;e", so 
I predict that "a;b;c;d;e" should also be possible.
            
            for (int i = 0; i < oriList.size(); i++) {
                
                String oriTemp = oriList.get(i);
                String destTemp = destList.get(i);
                
                String oriDest = oriTemp + destTemp;
                
                List<String> tempList = new ArrayList<String>();
                List<String> setsWithOrigin = new ArrayList<String>();
                List<String> setsWithDestination = new ArrayList<String>();
                for (Entry<Integer, List<String>> entry : 
newHashMap.entrySet()) {
                    for (String value : entry.getValue()) {
                        if (value.contains(oriTemp)) {
                            setsWithOrigin.add(value);
                        }
                        if (value.contains(destTemp)) {
                            setsWithDestination.add(value);
                        }
                    }
                }
                for (String originIter : setsWithOrigin) {
                    for (String destinationIter : setsWithDestination) {
                        String concat = "";
                        if ((originIter.indexOf(oriTemp) == 0 && destinationIter
                                .indexOf(destTemp) == 0)) {
                            String reverse = destinationIter;
                            if (destinationIter.length() > 1) {
                                reverse = "";
                                int d = destinationIter.length();
                                for (int a = 0; a < destinationIter.length(); 
a++) {
                                    reverse = reverse
                                            + destinationIter.substring(d - 1,
                                                    d);
                                    d--;
                                }
                            }
                            concat = originIter + ";" + vertex.getId() + ";"
                                    + reverse;
                        }
                        if (isFormatValid(concat) && concat.length() > 0) {
                            if (!tempList.contains(concat)) {
                                tempList.add(concat);
                            }
                        }
                    }
                }
                multiPaths.put(oriDest, tempList);
            }
                        
            
            // The combined paths are also saved into a HashMap which is 
additionally set as a Vertex Value
            // Later the paths are filtered for redundance
            
            Tuple2<HashMap<String, List<String>>, HashMap<Integer, 
List<String>>> testTuple3 = new Tuple2<HashMap<String, List<String>>, 
HashMap<Integer, List<String>>>(
                    multiPaths, newHashMap);
            
            
            setNewVertexValue(testTuple3);
        }
    }
 
 
Let me know if you need any further information.
Thanks in advance.
 
All the best, 
Dennis  
 

Gesendet: Donnerstag, 06. Oktober 2016 um 15:22 Uhr
Von: "Vasiliki Kalavri" <vasilikikala...@gmail.com>
An: dev@flink.apache.org
Betreff: Re: Flink Gelly
Hi Dennis,

can you give us some details about your setup? e.g. where you are running
your job, your input size, the configured memory, etc. It would also be
helpful if you could share your code. Getting an out of memory error with
just 100 nodes seems weird.

Best,
-Vasia.

On 6 October 2016 at 13:29, <d...@web.de> wrote:

>
> Dear ladies and gentlemen,
>
> I got a problem using Gelly in Flink. Currently I am loading a Virtuoso
> Graph into
> Flink's Gelly and I want to analyze it for the different paths one can
> take to link
> the different nodes. Therefore I am using the ScatterGatherIteration.
> However, my code just works with about ten to twenty nodes. When I try to
> upload
> a hundred nodes, the following error occurs:
>
> Exception in thread "main" org.apache.flink.runtime.
> client.JobExecutionException: Job execution failed.
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$
> mcV$sp(JobManager.scala:822)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Memory ran out. Compaction failed.
> numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow
> segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory:
> 33685504 Message: null
> at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> insertRecordIntoPartition(CompactingHashTable.java:457)
> at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> insertOrReplaceRecord(CompactingHashTable.java:392)
> at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollect
> or.collect(SolutionSetUpdateOutputCollector.java:54)
> at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue(
> GatherFunction.java:123)
> at org.apache.flink.quickstart.PathRank$PathUpdateFunction.
> updateVertex(PathRank.java:357)
> at org.apache.flink.graph.spargel.ScatterGatherIteration$
> GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
> at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr
> iver.run(CoGroupWithSolutionSetSecondDriver.java:218)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(
> AbstractIterativeTask.java:146)
> at org.apache.flink.runtime.iterative.task.IterationTailTask.run(
> IterationTailTask.java:107)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
>
> I tried to google it a bit, and this problems seems to occur often when
> using Gelly. I hope you have any ideas or approaches how I can handle this
> error.
>
> Thank you in advance!
> All the best,
> Dennis
>

Reply via email to