Dear community,
for my diploma thesis, we are implementing a distributed version of Fruchterman-Reingold visualization algorithm, using GraphX and Kubernetes. Our solution is a backend that continously computes new positions of vertices in a graph and sends them via RabbitMQ to a consumer. Fruchterman-Reingold is an iterative algorithm, meaning that in each iteration repulsive and attractive forces between vertices are computed and then new positions of vertices based on those forces are computed. Graph vertices and edges are stored in a GraphX graph structure. Forces between vertices are computed using MapReduce(between each pair of vertices) and aggregateMessages(for vertices connected via edges). After an iteration of the algorithm, the recomputed positions from the RDD are serialized using collect and sent to the RabbitMQ queue.

Here comes the issue. The first two iterations of the algorithm seem to be quick, but at the third iteration, the algorithm is very slow until it reaches a point at which it cannot finish an iteration in real time. It seems like caching of the graph may be an issue, because if we serialize the graph after each iteration in an array and create new graph from the array in the new iteration, we get a constant usage of memory and each iteration takes the same amount of time. We had already tried to cache/persist/checkpoint the graph after each iteration but it didn't help, so maybe we are doing something wrong. We do not think that serializing the graph into an array should be the solution for such a complex library like Apache Spark. I'm also not very confident how this fix will affect performance for large graphs or in parallel environment. We are attaching a short example of code that shows doing an iteration of the algorithm, input and output example.

We would appreciate if you could help us fix this issue or give us any meaningful ideas, as we had tried everything that came to mind.

We look forward to your reply.
Thank you, Marek Berith
 def iterate(
      sc: SparkContext,
      graph: graphx.Graph[GeneralVertex, EdgeProperty],
      metaGraph: graphx.Graph[GeneralVertex, EdgeProperty])
      : (graphx.Graph[GeneralVertex, EdgeProperty], graphx.Graph[GeneralVertex, 
EdgeProperty]) = {
    val attractiveDisplacement: VertexRDD[(VertexId, Vector)] =
      this.calculateAttractiveForces(graph)
    val repulsiveDisplacement: RDD[(VertexId, Vector)] = 
this.calculateRepulsiveForces(graph)
    val metaVertexDisplacement: RDD[(VertexId, Vector)] =
      this.calculateMetaVertexForces(graph, metaGraph.vertices)
    val metaEdgeDisplacement: RDD[(VertexId, Vector)] =
      this.calculateMetaEdgeForces(metaGraph)
    val displacements: RDD[(VertexId, Vector)] = this.combineDisplacements(
      attractiveDisplacement,
      repulsiveDisplacement,
      metaVertexDisplacement,
      metaEdgeDisplacement)
    val newVertices: RDD[(VertexId, GeneralVertex)] = 
this.displaceVertices(graph, displacements)
    val newGraph = graphx.Graph(newVertices, graph.edges)
    // persist or checkpoint or cache? or something else?
    newGraph.persist()
    metaGraph.persist()
    (newGraph, metaGraph)
  }
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to