[
https://issues.apache.org/jira/browse/SPARK-17097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15536356#comment-15536356
]
Seth Bromberger commented on SPARK-17097:
-----------------------------------------
Great explanation. Thank you very much!
> Pregel does not keep vertex state properly; fails to terminate
> ---------------------------------------------------------------
>
> Key: SPARK-17097
> URL: https://issues.apache.org/jira/browse/SPARK-17097
> Project: Spark
> Issue Type: Bug
> Components: GraphX
> Affects Versions: 1.6.0
> Environment: Scala 2.10.5, Spark 1.6.0 with GraphX and Pregel
> Reporter: Seth Bromberger
>
> Consider the following minimum example:
> {code:title=PregelBug.scala|borderStyle=solid}
> package testGraph
> import org.apache.spark.{SparkConf, SparkContext}
> import org.apache.spark.graphx.{Edge, EdgeTriplet, Graph, _}
> object PregelBug {
> def main(args: Array[String]) = {
> //FIXME breaks if TestVertex is a case class; works if not case class
> case class TestVertex(inId: VertexId,
> inData: String,
> inLabels: collection.mutable.HashSet[String]) extends
> Serializable {
> val id = inId
> val value = inData
> val labels = inLabels
> }
> class TestLink(inSrc: VertexId, inDst: VertexId, inData: String) extends
> Serializable {
> val src = inSrc
> val dst = inDst
> val data = inData
> }
> val startString = "XXXSTARTXXX"
> val conf = new SparkConf().setAppName("pregeltest").setMaster("local[*]")
> val sc = new SparkContext(conf)
> val vertexes = Vector(
> new TestVertex(0, "label0", collection.mutable.HashSet[String]()),
> new TestVertex(1, "label1", collection.mutable.HashSet[String]())
> )
> val links = Vector(
> new TestLink(0, 1, "linkData01")
> )
> val vertexes_packaged = vertexes.map(v => (v.id, v))
> val links_packaged = links.map(e => Edge(e.src, e.dst, e))
> val graph = Graph[TestVertex,
> TestLink](sc.parallelize(vertexes_packaged), sc.parallelize(links_packaged))
> def vertexProgram (vertexId: VertexId, vdata: TestVertex, message:
> Vector[String]): TestVertex = {
> message.foreach {
> case `startString` =>
> if (vdata.id == 0L)
> vdata.labels.add(vdata.value)
> case m =>
> if (!vdata.labels.contains(m))
> vdata.labels.add(m)
> }
> new TestVertex(vdata.id, vdata.value, vdata.labels)
> }
> def sendMessage (triplet: EdgeTriplet[TestVertex, TestLink]):
> Iterator[(VertexId, Vector[String])] = {
> val srcLabels = triplet.srcAttr.labels
> val dstLabels = triplet.dstAttr.labels
> val msgsSrcDst = srcLabels.diff(dstLabels)
> .map(label => (triplet.dstAttr.id, Vector[String](label)))
> val msgsDstSrc = dstLabels.diff(dstLabels)
> .map(label => (triplet.srcAttr.id, Vector[String](label)))
> msgsSrcDst.toIterator ++ msgsDstSrc.toIterator
> }
> def mergeMessage (m1: Vector[String], m2: Vector[String]): Vector[String]
> = m1.union(m2).distinct
> val g = graph.pregel(Vector[String](startString))(vertexProgram,
> sendMessage, mergeMessage)
> println("---pregel done---")
> println("vertex info:")
> g.vertices.foreach(
> v => {
> val labels = v._2.labels
> println(
> "vertex " + v._1 +
> ": name = " + v._2.id +
> ", labels = " + labels)
> }
> )
> }
> }
> {code}
> This code never terminates even though we expect it to. To fix, we simply
> remove the "case" designation for the TestVertex class (see FIXME comment),
> and then it behaves as expected.
> (Apologies if this has been fixed in later versions; we're unfortunately
> pegged to 2.10.5 / 1.6.0 for now.)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]