Thank you for your answers, and sorry for my lack of understanding.
So I tried what you suggested, with/without unpersisting and with .cache()
(also persist(StorageLevel.MEMORY_AND_DISK) but this is not allowed for msg
because you can't change the Storage level apparently) for msg, g and
newVerts, but it gave the exact same result mentionned in my previous post.

For example before failing, in one iteration I get for the innerJoin task
the following log :

14/08/27 10:30:58 INFO executor.Executor: Running task ID 37
14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_0 locally
14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_1 locally
14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_2 locally
14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_3 locally
14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_6 locally
14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_7 locally
14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_4 locally
14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_10
locally
14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_11
locally
14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_8 locally
14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_14
locally
14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_15
locally
14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_12
locally
14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_18
locally
14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_19
locally
14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_16
locally
14/08/27 10:30:58 INFO storage.BlockManager: Found block rdd_122_0 locally
14/08/27 10:30:58 INFO
storage.BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight:
50331648, targetRequestSize: 10066329
14/08/27 10:30:58 INFO
storage.BlockFetcherIterator$BasicBlockFetcherIterator: Getting 1 non-empty
blocks out of 1 blocks
14/08/27 10:30:58 INFO
storage.BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote
fetches in 0 ms

If I understand this correctly, this means that it is looking for the whole
list of broadcasted variables, even though it should only need the 3 current
values... I'm a little confused about this.

For the sake of completeness, here is the same portion of code I am using
with cache() et unpersist removed (I tried multiple combinations, I also
tried without broadcasting variables (e.g. directly feeding the innerJoin
with the value instead of the broadcast) :

  def run(graph : Graph[Long,Long],m : Long)(implicit sc : SparkContext) = 
{
    val spd = SparsePD
    // Init fusinoMap
    var fusionMap = Map[Long, Long]().withDefault(x => x)
    // Init tots
    val tots = Map[Long, Double]().withDefaultValue(1.0)
    var totBcst = sc.broadcast(tots)
    var fusionBcst = sc.broadcast(fusionMap)
    val mC = sc.broadcast(m)
    // Initial distributions
    var g = graph.mapVertices({ case (vid, deg) =>
VertexProp(somedistrib.withDefaultValue(0.0), deg) })
    var newVerts = g.vertices
    //Initial messages
    var msg = g.mapReduceTriplets(MFExecutor.sendMsgMF,
MFExecutor.mergeMsgMF)
    var iter = 0

    while (iter < 20) {
      // Messages
      val oldMessages = msg
      val oldVerts = newVerts
      newVerts =
newVerts.innerJoin(msg)(MFExecutor.vprogMF(mC,totBcst,fusionBcst)).persist(StorageLevel.MEMORY_AND_DISK)
      newVerts.checkpoint() // Tue la lineage de newVerts ?
      newVerts.count() // Matérialise les deux lignes précédentes
      val prevG = g
      g = graph.outerJoinVertices(newVerts)({case (vid,deg,newOpt) =>
newOpt.getOrElse(VertexProp(Map(vid -> 1.0).withDefaultValue(0.0),
deg))}).cache()
      //g = g.outerJoinVertices(newVerts)({case (vid,old,newOpt) =>
newOpt.getOrElse(old)})

      // 1st global var
      val fusionAcc = sc.accumulable[Map[Long, Long], (Long,
Long)](fusionMap)(FusionAccumulable)
      g.triplets.filter(tp => testEq(fusionBcst)(tp.srcId,tp.dstId)&&
(spd.dotPD(tp.dstAttr.prob, tp.srcAttr.prob) > 0.9)).foreach(tp => 
fusionAcc += (tp.dstId, tp.srcId))
      //fusionBcst.unpersist(blocking = false)
      fusionMap = fusionAcc.value
      fusionBcst = sc.broadcast(fusionMap)

      //2nd global var
      val totAcc = sc.accumulator[Map[Long, Double]](Map[Long,
Double]().withDefaultValue(0.0))(TotAccumulable)
      newVerts.foreach({ case (vid, vprop) => totAcc +=
vprop.prob.mapValues(p => p * vprop.deg).withDefaultValue(0.0)})
      //totBcst.unpersist(blocking = false)
      totBcst = sc.broadcast(totAcc.value)

      // New messages
      msg = g.mapReduceTriplets(MFExecutor.sendMsgMF,
MFExecutor.mergeMsgMF).cache()

      // Unpersist options
      //oldMessages.unpersist(blocking = false)
      //oldVerts.unpersist(blocking=false)
      //prevG.unpersistVertices(blocking=false)

      iter = iter + 1
    }
    fusionMap
  }



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-GraphX-pregel-like-with-global-variables-accumulator-broadcast-tp12742p12895.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to