Hi All,
I am running a customized label propagation using Pregel. After a few
iterations, the program becomes slow and wastes a lot of time in mapPartitions
(at GraphImpl.scala:184 or VertexRDD.scala:318, or VertexRDD.scala:323). And
the amount of shuffle write reaches 15GB, while the size of the raw data with
(srcid, dstid, weight) is only 30MB.
I wonder whether this is normal?
Below please find my customized label propagation implementation. I have
changed "map" into "mapValues in line 183 to decrease shuffling, but it makes
no difference":
"
154 def adsorption(sc : SparkContext, graph : Graph[(Int, Map[VertexId,
Double], String), Double], tol: Double)
155 : Graph[(Int, Map[VertexId, Double], Double, Int, String), Double] =
156 {
157 val adsorptionGraph: Graph[(Int, Map[VertexId, Double], Double,
Int, String), Double] = graph
158 .outerJoinVertices(graph.inDegrees){
159 case (vid, u, inDegOpt) => (u._1, u._2, 1.0,
inDegOpt.getOrElse(0), u._3)
160 }
162 .cache()
167
168 def sendMessage(edge : EdgeTriplet[(Int, Map[VertexId, Double],
Double, Int, String), Double])={
175 val dstAttr = edge.dstAttr
176
177 if (dstAttr._3 >= tol && (dstAttr._1 == 1 || dstAttr._1 == 3))
178 {
181 val indegree = dstAttr._4.toDouble
182
183 val mapToSend =
edge.srcAttr._2.mapValues{_/indegree}.map(identity)
187 Iterator((edge.dstId, mapToSend))
188 }
189 else
190 {
192 Iterator.empty
193 }
194 }
195
196 def mergeMessage(label1:Map[VertexId, Double], label2:Map[VertexId,
Double]): Map[VertexId, Double] =
197 {
202 val mm = (label1.keySet ++ label2.keySet).map{i=>
203 val count1Val = label1.getOrElse(i, 0.0)
204 val count2Val = label2.getOrElse(i, 0.0)
205 i->(count1Val + count2Val)
206 }.toMap
211 }
212
213 def vertexProgram(vid: VertexId, attr: (Int, Map[VertexId, Double],
Double, Int, String), message: Map[VertexId, Double])={
218
219 if (message.isEmpty) attr
220 else
221 {
223
224 val oldlabel = attr._2
227
228 var accum = 0.0
229
230 message.foreach(x=> (accum = accum + x._2))
233
234 val newlabel = message.map(x=>(x._1->x._2/accum))
235
236 val diff =
(newlabel.keySet--oldlabel.keySet).toSet.size.toDouble /
oldlabel.keySet.size.toDouble
239
240 (attr._1, newlabel, diff, attr._4, attr._5)
241 }
242 }
243
244 // empty initial message
245 val initialMessage = Map[VertexId, Double]()
246
247 Pregel(adsorptionGraph, initialMessage, maxIterations = 3,
activeDirection = EdgeDirection.In)(
248 vprog = vertexProgram,
249 sendMsg = sendMessage,
250 mergeMsg = mergeMessage
251 )
252 }
"