Dear community,
for my diploma thesis, we are implementing a distributed version of
Fruchterman-Reingold visualization algorithm, using GraphX and Kubernetes. Our
solution is a backend that continously computes new positions of vertices in a
graph and sends them via RabbitMQ to a consumer. Fruchterman-Reingold is an
iterative algorithm, meaning that in each iteration repulsive and attractive
forces between vertices are computed and then new positions of vertices based
on those forces are computed. Graph vertices and edges are stored in a GraphX
graph structure. Forces between vertices are computed using MapReduce(between
each pair of vertices) and aggregateMessages(for vertices connected via
edges). After an iteration of the algorithm, the recomputed positions from the
RDD are serialized using collect and sent to the RabbitMQ queue.
Here comes the issue. The first two iterations of the algorithm seem to be
quick, but at the third iteration, the algorithm is very slow until it reaches
a point at which it cannot finish an iteration in real time. It seems like
caching of the graph may be an issue, because if we serialize the graph after
each iteration in an array and create new graph from the array in the new
iteration, we get a constant usage of memory and each iteration takes the same
amount of time. We had already tried to cache/persist/checkpoint the graph
after each iteration but it didn't help, so maybe we are doing something
wrong. We do not think that serializing the graph into an array should be the
solution for such a complex library like Apache Spark. I'm also not very
confident how this fix will affect performance for large graphs or in parallel
environment. We are attaching a short example of code that shows doing an
iteration of the algorithm, input and output example.
We would appreciate if you could help us fix this issue or give us any
meaningful ideas, as we had tried everything that came to mind.
We look forward to your reply.
Thank you, Marek Berith
def iterate(
sc: SparkContext,
graph: graphx.Graph[GeneralVertex, EdgeProperty],
metaGraph: graphx.Graph[GeneralVertex, EdgeProperty])
: (graphx.Graph[GeneralVertex, EdgeProperty], graphx.Graph[GeneralVertex,
EdgeProperty]) = {
val attractiveDisplacement: VertexRDD[(VertexId, Vector)] =
this.calculateAttractiveForces(graph)
val repulsiveDisplacement: RDD[(VertexId, Vector)] =
this.calculateRepulsiveForces(graph)
val metaVertexDisplacement: RDD[(VertexId, Vector)] =
this.calculateMetaVertexForces(graph, metaGraph.vertices)
val metaEdgeDisplacement: RDD[(VertexId, Vector)] =
this.calculateMetaEdgeForces(metaGraph)
val displacements: RDD[(VertexId, Vector)] = this.combineDisplacements(
attractiveDisplacement,
repulsiveDisplacement,
metaVertexDisplacement,
metaEdgeDisplacement)
val newVertices: RDD[(VertexId, GeneralVertex)] =
this.displaceVertices(graph, displacements)
val newGraph = graphx.Graph(newVertices, graph.edges)
// persist or checkpoint or cache? or something else?
newGraph.persist()
metaGraph.persist()
(newGraph, metaGraph)
}
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org