would be good if you can contribute this as an example. BFS is a common enough algo.
Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi <https://twitter.com/mayur_rustagi> On Sat, Apr 19, 2014 at 4:16 AM, Ghufran Malik <[email protected]> wrote: > Ahh nvm I found the solution :) > > triplet.srcAttr != Double.PositiveInfinity && triplet.dstAttr == > Double.PositiveInfinity > > as my new if condition. > > > ---------- Forwarded message ---------- > From: Ghufran Malik <[email protected]> > Date: 18 April 2014 23:15 > Subject: BFS implemented > To: [email protected] > > > Hi I have sucessfully implemented the Breadth First Search algorithm using > the Pregel operator in graphX as follows: > > val graph = GraphLoader.edgeListFile(sc, "graphx/data/test_graph.txt") > > val root: VertexId = 1 > val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else > Double.PositiveInfinity) > > > val bfs = initialGraph.pregel(Double.PositiveInfinity, 20)( (id, attr, > msg) => math.min(attr, msg), triplet => { if (triplet.srcAttr != > Double.PositiveInfinity) { Iterator((triplet.dstId, triplet.srcAttr+1)) } > else { Iterator.empty } }, (a,b) => math.min(a,b) ) > > println(bfs.vertices.collect.mkString("\n")) > > where the test_graph.txt is: > 1 2 > 2 1 > 2 3 > 2 4 > 3 2 > 3 3 > 4 2 > 4 3 > > and the result outputted after I run my algorithm is: > (4,2.0) > (2,1.0) > (3,2.0) > (1,0.0) > > which is the correct result. > > I was hoping someone could improve upon my implementation by suggesting a > way in which I do not need the max iteration number (20). If I remove this > my job will continue on for sometime until eventual I receive the error: > > > 7) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > ....................carries on and on................................. > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at scala.collection.immutable.$colon$colon.readObject(List.scala:362) > at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:483) > 14/04/18 23:11:14 ERROR TaskSetManager: Task 81094.0:0 failed 1 times; > aborting job > 14/04/18 23:11:14 INFO DAGScheduler: Failed to run reduce at > VertexRDD.scala:91 > 14/04/18 23:11:14 INFO TaskSchedulerImpl: Remove TaskSet 81094.0 from pool > org.apache.spark.SparkException: Job aborted: Task 81094.0:0 failed 1 > times (most recent failure: Exception failure: java.lang.StackOverflowError) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > > >
