Thank you for your answers, and sorry for my lack of understanding. So I tried what you suggested, with/without unpersisting and with .cache() (also persist(StorageLevel.MEMORY_AND_DISK) but this is not allowed for msg because you can't change the Storage level apparently) for msg, g and newVerts, but it gave the exact same result mentionned in my previous post.
For example before failing, in one iteration I get for the innerJoin task the following log : 14/08/27 10:30:58 INFO executor.Executor: Running task ID 37 14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_0 locally 14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_1 locally 14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_2 locally 14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_3 locally 14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_6 locally 14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_7 locally 14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_4 locally 14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_10 locally 14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_11 locally 14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_8 locally 14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_14 locally 14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_15 locally 14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_12 locally 14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_18 locally 14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_19 locally 14/08/27 10:30:58 INFO storage.BlockManager: Found block broadcast_16 locally 14/08/27 10:30:58 INFO storage.BlockManager: Found block rdd_122_0 locally 14/08/27 10:30:58 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/08/27 10:30:58 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 14/08/27 10:30:58 INFO storage.BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms If I understand this correctly, this means that it is looking for the whole list of broadcasted variables, even though it should only need the 3 current values... I'm a little confused about this. For the sake of completeness, here is the same portion of code I am using with cache() et unpersist removed (I tried multiple combinations, I also tried without broadcasting variables (e.g. directly feeding the innerJoin with the value instead of the broadcast) : def run(graph : Graph[Long,Long],m : Long)(implicit sc : SparkContext) = { val spd = SparsePD // Init fusinoMap var fusionMap = Map[Long, Long]().withDefault(x => x) // Init tots val tots = Map[Long, Double]().withDefaultValue(1.0) var totBcst = sc.broadcast(tots) var fusionBcst = sc.broadcast(fusionMap) val mC = sc.broadcast(m) // Initial distributions var g = graph.mapVertices({ case (vid, deg) => VertexProp(somedistrib.withDefaultValue(0.0), deg) }) var newVerts = g.vertices //Initial messages var msg = g.mapReduceTriplets(MFExecutor.sendMsgMF, MFExecutor.mergeMsgMF) var iter = 0 while (iter < 20) { // Messages val oldMessages = msg val oldVerts = newVerts newVerts = newVerts.innerJoin(msg)(MFExecutor.vprogMF(mC,totBcst,fusionBcst)).persist(StorageLevel.MEMORY_AND_DISK) newVerts.checkpoint() // Tue la lineage de newVerts ? newVerts.count() // Matérialise les deux lignes précédentes val prevG = g g = graph.outerJoinVertices(newVerts)({case (vid,deg,newOpt) => newOpt.getOrElse(VertexProp(Map(vid -> 1.0).withDefaultValue(0.0), deg))}).cache() //g = g.outerJoinVertices(newVerts)({case (vid,old,newOpt) => newOpt.getOrElse(old)}) // 1st global var val fusionAcc = sc.accumulable[Map[Long, Long], (Long, Long)](fusionMap)(FusionAccumulable) g.triplets.filter(tp => testEq(fusionBcst)(tp.srcId,tp.dstId)&& (spd.dotPD(tp.dstAttr.prob, tp.srcAttr.prob) > 0.9)).foreach(tp => fusionAcc += (tp.dstId, tp.srcId)) //fusionBcst.unpersist(blocking = false) fusionMap = fusionAcc.value fusionBcst = sc.broadcast(fusionMap) //2nd global var val totAcc = sc.accumulator[Map[Long, Double]](Map[Long, Double]().withDefaultValue(0.0))(TotAccumulable) newVerts.foreach({ case (vid, vprop) => totAcc += vprop.prob.mapValues(p => p * vprop.deg).withDefaultValue(0.0)}) //totBcst.unpersist(blocking = false) totBcst = sc.broadcast(totAcc.value) // New messages msg = g.mapReduceTriplets(MFExecutor.sendMsgMF, MFExecutor.mergeMsgMF).cache() // Unpersist options //oldMessages.unpersist(blocking = false) //oldVerts.unpersist(blocking=false) //prevG.unpersistVertices(blocking=false) iter = iter + 1 } fusionMap } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-GraphX-pregel-like-with-global-variables-accumulator-broadcast-tp12742p12895.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org