Hi I have sucessfully implemented the Breadth First Search algorithm using
the Pregel operator in graphX as follows:
val graph = GraphLoader.edgeListFile(sc, "graphx/data/test_graph.txt")
val root: VertexId = 1
val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else
Double.PositiveInfinity)
val bfs = initialGraph.pregel(Double.PositiveInfinity, 20)( (id, attr, msg)
=> math.min(attr, msg), triplet => { if (triplet.srcAttr !=
Double.PositiveInfinity) { Iterator((triplet.dstId, triplet.srcAttr+1)) }
else { Iterator.empty } }, (a,b) => math.min(a,b) )
println(bfs.vertices.collect.mkString("\n"))
where the test_graph.txt is:
1 2
2 1
2 3
2 4
3 2
3 3
4 2
4 3
and the result outputted after I run my algorithm is:
(4,2.0)
(2,1.0)
(3,2.0)
(1,0.0)
which is the correct result.
I was hoping someone could improve upon my implementation by suggesting a
way in which I do not need the max iteration number (20). If I remove this
my job will continue on for sometime until eventual I receive the error:
7)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
....................carries on and on.................................
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
14/04/18 23:11:14 ERROR TaskSetManager: Task 81094.0:0 failed 1 times;
aborting job
14/04/18 23:11:14 INFO DAGScheduler: Failed to run reduce at
VertexRDD.scala:91
14/04/18 23:11:14 INFO TaskSchedulerImpl: Remove TaskSet 81094.0 from pool
org.apache.spark.SparkException: Job aborted: Task 81094.0:0 failed 1 times
(most recent failure: Exception failure: java.lang.StackOverflowError)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)