I think you need to do `newRDD.cache()` and `newRDD.count` before you do oldRDD.unpersist(true) -- Otherwise it might be recomputing all the previous iterations each time.
Thanks Shivaram On Fri, Jul 10, 2015 at 7:44 PM, Ulanov, Alexander <alexander.ula...@hp.com> wrote: > Hi, > > > > I am interested how scalable can be the model parallelism within Spark. > Suppose, the model contains N weights of type Double and N is so large that > does not fit into the memory of a single node. So, we can store the model > in RDD[Double] within several nodes. To train the model, one needs to > perform K iterations that update all the weights and check the convergence. > Then we also need to exchange some weights between the nodes to synchronize > the model or update the global state. I’ve sketched the code that does > iterative updates with RDD (without global update yet). Surprisingly, each > iteration takes more time than previous as shown below (time in seconds). > Could you suggest what is the reason for that? I’ve checked GC, it does > something within few milliseconds. > > > > Configuration: Spark 1.4, 1 master and 5 worker nodes, 5 executors, Intel > Xeon 2.2, 16GB RAM each > > Iteration 0 time:1.127990986 > > Iteration 1 time:1.391120414 > > Iteration 2 time:1.6429691381000002 > > Iteration 3 time:1.9344402954 > > Iteration 4 time:2.2075294246999997 > > Iteration 5 time:2.6328659593 > > Iteration 6 time:2.7911690492999996 > > Iteration 7 time:3.0850374104 > > Iteration 8 time:3.4031050061 > > Iteration 9 time:3.8826580919 > > > > Code: > > val modelSize = 1000000000 > > val numIterations = 10 > > val parallelizm = 5 > > var oldRDD = sc.parallelize(1 to modelSize, parallelizm).map(x => 0.1) > > var newRDD = sc.parallelize(1 to 1, parallelizm).map(x => 0.1) > > var i = 0 > > while (i < numIterations) { > > val t = System.nanoTime() > > // updating the weights > > val newRDD = oldRDD.map(x => x * x) > > oldRDD.unpersist(true) > > // “checking” convergence > > newRDD.mean > > println("Iteration " + i + " time:" + (System.nanoTime() - t) / 1e9 / > numIterations) > > oldRDD = newRDD > > i += 1 > > } > > > > > > Best regards, Alexander >