I think you need to do `newRDD.cache()` and `newRDD.count` before you do
oldRDD.unpersist(true) -- Otherwise it might be recomputing all the
previous iterations each time.

Thanks
Shivaram

On Fri, Jul 10, 2015 at 7:44 PM, Ulanov, Alexander <alexander.ula...@hp.com>
wrote:

>  Hi,
>
>
>
> I am interested how scalable can be the model parallelism within Spark.
> Suppose, the model contains N weights of type Double and N is so large that
> does not fit into the memory of a single node. So, we can store the model
> in RDD[Double] within several nodes. To train the model, one needs to
> perform K iterations that update all the weights and check the convergence.
> Then we also need to exchange some weights between the nodes to synchronize
> the model or update the global state. I’ve sketched the code that does
> iterative updates with RDD (without global update yet). Surprisingly, each
> iteration takes more time than previous as shown below (time in seconds).
> Could you suggest what is the reason for that? I’ve checked GC, it does
> something within few milliseconds.
>
>
>
> Configuration: Spark 1.4, 1 master and 5 worker nodes, 5 executors, Intel
> Xeon 2.2, 16GB RAM each
>
>  Iteration 0 time:1.127990986
>
> Iteration 1 time:1.391120414
>
> Iteration 2 time:1.6429691381000002
>
> Iteration 3 time:1.9344402954
>
> Iteration 4 time:2.2075294246999997
>
> Iteration 5 time:2.6328659593
>
> Iteration 6 time:2.7911690492999996
>
> Iteration 7 time:3.0850374104
>
> Iteration 8 time:3.4031050061
>
> Iteration 9 time:3.8826580919
>
>
>
> Code:
>
> val modelSize = 1000000000
>
> val numIterations = 10
>
> val parallelizm = 5
>
> var oldRDD = sc.parallelize(1 to modelSize, parallelizm).map(x => 0.1)
>
> var newRDD = sc.parallelize(1 to 1, parallelizm).map(x => 0.1)
>
> var i = 0
>
> while (i < numIterations) {
>
>   val t = System.nanoTime()
>
>   // updating the weights
>
>   val newRDD = oldRDD.map(x => x * x)
>
>   oldRDD.unpersist(true)
>
>   // “checking” convergence
>
>   newRDD.mean
>
>   println("Iteration " + i + " time:" + (System.nanoTime() - t) / 1e9 /
> numIterations)
>
>   oldRDD = newRDD
>
>   i += 1
>
> }
>
>
>
>
>
> Best regards, Alexander
>

Reply via email to