Sounds useful Robin. Thanks. I will try that. But fyi in another case I tested 
with adding only one vertex to the graph. In that case also the latency for 
subsequent addition was increasing like for first addition of a vertex its 3 
seconds, then for second its 7 seconds and so on. This is a case when I want to 
add vertices to graph as and when they are coming in our system since it’s a 
real time system which I am trying to build so vertices will be keep on coming.

Thanks.
From: Robin East [mailto:robin.e...@xense.co.uk]
Sent: Wednesday, February 24, 2016 3:54 PM
To: Udbhav Agarwal <udbhav.agar...@syncoms.com>
Cc: user@spark.apache.org
Subject: Re: Reindexing in graphx

It looks like you adding vertices one-by-one, you definitely don’t want to do 
that. What happens when you batch together 400 vertices into an RDD and then 
add 400 in one go?
-------------------------------------------------------------------------------
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action




On 24 Feb 2016, at 05:49, Udbhav Agarwal 
<udbhav.agar...@syncoms.com<mailto:udbhav.agar...@syncoms.com>> wrote:

Thank you Robin for your reply.
Actually I am adding bunch of vertices in a graph in graphx using the following 
method . I am facing the problem of latency. First time an addition of say 400 
vertices to a graph with 100,000 nodes takes around 7 seconds. next time its 
taking 15 seconds. So every subsequent adds are taking more time than the 
previous one. Hence I tried to do reindex() so the subsequent operations can 
also be performed fast.
FYI My cluster is presently having one machine with 8 core and 8 gb ram. I am 
running in local mode.

def addVertex(rdd: RDD[String], sc: SparkContext, session: String): Long = {
    val defaultUser = (0, 0)
    rdd.collect().foreach { x =>
      {
        val aVertex: RDD[(VertexId, (Int, Int))] = 
sc.parallelize(Array((x.toLong, (100, 100))))
        gVertices = gVertices.union(aVertex)
      }
    }
    inputGraph = Graph(gVertices, gEdges, defaultUser)
    inputGraph.cache()
    gVertices = inputGraph.vertices
    gVertices.cache()
    val count = gVertices.count
    println(count);

    return 1;
  }


From: Robin East [mailto:robin.e...@xense.co.uk]
Sent: Tuesday, February 23, 2016 8:15 PM
To: Udbhav Agarwal 
<udbhav.agar...@syncoms.com<mailto:udbhav.agar...@syncoms.com>>
Subject: Re: Reindexing in graphx

Hi

Well this is the line that is failing in VertexRDDImpl:

require(partitionsRDD.partitioner.isDefined)

But really you shouldn’t need to be calling the reindex() function as it deals 
with some internals of the GraphX implementation - it looks to me like it ought 
to be a private method. Perhaps you could explain what you are trying to 
achieve.
-------------------------------------------------------------------------------
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action




On 23 Feb 2016, at 12:18, Udbhav Agarwal 
<udbhav.agar...@syncoms.com<mailto:udbhav.agar...@syncoms.com>> wrote:

Hi,
I am trying to add vertices to a graph in graphx and I want to do reindexing in 
the graph. I can see there is an option of vertices.reindex() in graphX. But 
when I am doing graph.vertices.reindex() am getting
Java.lang.IllegalArgumentException: requirement failed.
Please help me know what I am missing with the syntax as I have seen the API 
documentation where only vertices.reindex() is mentioned.

Thanks,
Udbhav Agarwal

Reply via email to