Hi Vasia, thanks for your reply. Currently I am testing it on my normal workstation (16GB Ram) but I also tried it on out cluster. Both are failing at the same amount of nodes, so I guess it has something to do with Gelly or with the properties. The configured memory is default. I did not change it because I thought that flink is not the problem but I might be wrong. The Input should not be much... I wrote an API for Virtuoso which is requesting a RDF-graph. But I limited it to 10 Data Sets only. This is my code, it is a bit messy and their might be improvement:
public static final class PathMessageFunction extends ScatterFunction<String, Tuple2<HashMap<String, List<String>>, HashMap<Integer, List<String>>>, List<String>, String> { @Override public void sendMessages( Vertex<String, Tuple2<HashMap<String, List<String>>, HashMap<Integer, List<String>>>> vertex) throws Exception { // The list "path" collects the ID's of the verticies a message was send to. List<String> path = new ArrayList<String>(); if (super.getSuperstepNumber() == 1) { path.add(vertex.getId()); } if (super.getSuperstepNumber() > 1) { for (String values : vertex.f1.f1.get(super.getSuperstepNumber() - 1)) { path.add(values + ";" + vertex.getId()); } } // The Path-List is send to the next neighbouring Nodes. for (Edge<String, String> edge : getEdges()) { sendMessageTo(edge.getTarget(), path); } } } public static final class PathUpdateFunction extends GatherFunction<String, Tuple2<HashMap<String, List<String>>, HashMap<Integer, List<String>>>, List<String>> { @Override public void updateVertex( Vertex<String, Tuple2<HashMap<String, List<String>>, HashMap<Integer, List<String>>>> vertex, MessageIterator<List<String>> messenger) throws Exception { List<String> newValues = new ArrayList<String>(); // The Path-List which was send as a message is also stored within the vertex value, therefore the Paths are saved to a new List "newValues". // This List should not contain the ID of the vertex itself to avoid cycles. for (List<String> msg : messenger) { for (String value : msg) { if (!value.contains(vertex.getId())) { newValues.add(value); } } } // Creation of a new HashMap with the new and old values for the setNewVertexValue function HashMap<Integer, List<String>> newHashMap = vertex.f1.f1; newHashMap.put(super.getSuperstepNumber(), newValues); HashMap<String, List<String>> multiPaths = new HashMap<String, List<String>>(); // Here it gets a bit complicated... However... I try to analyze the given paths for possible combinations of them. // For example... I got the path "a;b;c" and the patch "c;d;e", so I predict that "a;b;c;d;e" should also be possible. for (int i = 0; i < oriList.size(); i++) { String oriTemp = oriList.get(i); String destTemp = destList.get(i); String oriDest = oriTemp + destTemp; List<String> tempList = new ArrayList<String>(); List<String> setsWithOrigin = new ArrayList<String>(); List<String> setsWithDestination = new ArrayList<String>(); for (Entry<Integer, List<String>> entry : newHashMap.entrySet()) { for (String value : entry.getValue()) { if (value.contains(oriTemp)) { setsWithOrigin.add(value); } if (value.contains(destTemp)) { setsWithDestination.add(value); } } } for (String originIter : setsWithOrigin) { for (String destinationIter : setsWithDestination) { String concat = ""; if ((originIter.indexOf(oriTemp) == 0 && destinationIter .indexOf(destTemp) == 0)) { String reverse = destinationIter; if (destinationIter.length() > 1) { reverse = ""; int d = destinationIter.length(); for (int a = 0; a < destinationIter.length(); a++) { reverse = reverse + destinationIter.substring(d - 1, d); d--; } } concat = originIter + ";" + vertex.getId() + ";" + reverse; } if (isFormatValid(concat) && concat.length() > 0) { if (!tempList.contains(concat)) { tempList.add(concat); } } } } multiPaths.put(oriDest, tempList); } // The combined paths are also saved into a HashMap which is additionally set as a Vertex Value // Later the paths are filtered for redundance Tuple2<HashMap<String, List<String>>, HashMap<Integer, List<String>>> testTuple3 = new Tuple2<HashMap<String, List<String>>, HashMap<Integer, List<String>>>( multiPaths, newHashMap); setNewVertexValue(testTuple3); } } Let me know if you need any further information. Thanks in advance. All the best, Dennis Gesendet: Donnerstag, 06. Oktober 2016 um 15:22 Uhr Von: "Vasiliki Kalavri" <vasilikikala...@gmail.com> An: dev@flink.apache.org Betreff: Re: Flink Gelly Hi Dennis, can you give us some details about your setup? e.g. where you are running your job, your input size, the configured memory, etc. It would also be helpful if you could share your code. Getting an out of memory error with just 100 nodes seems weird. Best, -Vasia. On 6 October 2016 at 13:29, <d...@web.de> wrote: > > Dear ladies and gentlemen, > > I got a problem using Gelly in Flink. Currently I am loading a Virtuoso > Graph into > Flink's Gelly and I want to analyze it for the different paths one can > take to link > the different nodes. Therefore I am using the ScatterGatherIteration. > However, my code just works with about ten to twenty nodes. When I try to > upload > a hundred nodes, the following error occurs: > > Exception in thread "main" org.apache.flink.runtime. > client.JobExecutionException: Job execution failed. > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$ > mcV$sp(JobManager.scala:822) > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) > at scala.concurrent.impl.Future$PromiseCompletingRunnable. > liftedTree1$1(Future.scala:24) > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run( > Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( > AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > ForkJoinPool.java:1979) > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Memory ran out. Compaction failed. > numPartitions: 32 minPartition: 1 maxPartition: 431 number of overflow > segments: 0 bucketSize: 251 Overall memory: 45613056 Partition memory: > 33685504 Message: null > at org.apache.flink.runtime.operators.hash.CompactingHashTable. > insertRecordIntoPartition(CompactingHashTable.java:457) > at org.apache.flink.runtime.operators.hash.CompactingHashTable. > insertOrReplaceRecord(CompactingHashTable.java:392) > at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollect > or.collect(SolutionSetUpdateOutputCollector.java:54) > at org.apache.flink.graph.spargel.GatherFunction.setNewVertexValue( > GatherFunction.java:123) > at org.apache.flink.quickstart.PathRank$PathUpdateFunction. > updateVertex(PathRank.java:357) > at org.apache.flink.graph.spargel.ScatterGatherIteration$ > GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389) > at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr > iver.run(CoGroupWithSolutionSetSecondDriver.java:218) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) > at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run( > AbstractIterativeTask.java:146) > at org.apache.flink.runtime.iterative.task.IterationTailTask.run( > IterationTailTask.java:107) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > > > I tried to google it a bit, and this problems seems to occur often when > using Gelly. I hope you have any ideas or approaches how I can handle this > error. > > Thank you in advance! > All the best, > Dennis >