Hey guys, I’m trying to run connected components on graphs that end up running for a fairly large number of iterations (25-30) and take 5-6 hours. I find more than half the time I end up getting fetch failures and losing an executor after a number of iterations. Then it has to go back and recompute pieces that it lost, which don’t seem to be getting persisted at the same level as the graph so those iterations take exponentially longer and I have to kill the job because it’s not worth waiting for it to finish.
The approach I’m currently trying is checkpointing the vertices and edges (and
maybe the messages?) in Pregel. What I’ve been testing with so far is the below
patch, which seems to be working (actually I haven’t had any failures since I
added this change, so I don’t know if I did get one if it would recompute from
the start or not) but I’m also seeing things like 5 instances of VertexRDDs
being persisted all at the same time and “reduce at VertexRDD.scala:111” runs
twice each time. I was wondering if this is the proper / most efficient way of
doing this checkpointing, and if not what would work better?
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index 5e55620..5be40c3 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -134,6 +134,11 @@ object Pregel extends Logging {
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) =>
newOpt.getOrElse(old) }
g.cache()
+ g.vertices.checkpoint()
+ g.vertices.count()
+ g.edges.checkpoint()
+ g.edges.count()
+
val oldMessages = messages
// Send new messages. Vertices that didn't get any messages don't appear
in newVerts, so don't
// get to send messages. We must cache messages so it can be
materialized on the next line,
@@ -142,6 +147,7 @@ object Pregel extends Logging {
// The call to count() materializes `messages`, `newVerts`, and the
vertices of `g`. This
// hides oldMessages (depended on by newVerts), newVerts (depended on by
messages), and the
// vertices of prevG (depended on by newVerts, oldMessages, and the
vertices of g).
+ messages.checkpoint()
activeMessages = messages.count()
logInfo("Pregel finished iteration " + i)
Best Regards,
Jeffrey Picard
signature.asc
Description: Message signed with OpenPGP using GPGMail
