Hi Shivaram, Thank you for suggestion! If I do .cache and .count, each iteration take much more time, which is spent in GC. Is it normal?
10 июля 2015 г., в 21:23, Shivaram Venkataraman <shiva...@eecs.berkeley.edu<mailto:shiva...@eecs.berkeley.edu>> написал(а): I think you need to do `newRDD.cache()` and `newRDD.count` before you do oldRDD.unpersist(true) -- Otherwise it might be recomputing all the previous iterations each time. Thanks Shivaram On Fri, Jul 10, 2015 at 7:44 PM, Ulanov, Alexander <alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote: Hi, I am interested how scalable can be the model parallelism within Spark. Suppose, the model contains N weights of type Double and N is so large that does not fit into the memory of a single node. So, we can store the model in RDD[Double] within several nodes. To train the model, one needs to perform K iterations that update all the weights and check the convergence. Then we also need to exchange some weights between the nodes to synchronize the model or update the global state. I’ve sketched the code that does iterative updates with RDD (without global update yet). Surprisingly, each iteration takes more time than previous as shown below (time in seconds). Could you suggest what is the reason for that? I’ve checked GC, it does something within few milliseconds. Configuration: Spark 1.4, 1 master and 5 worker nodes, 5 executors, Intel Xeon 2.2, 16GB RAM each Iteration 0 time:1.127990986 Iteration 1 time:1.391120414 Iteration 2 time:1.6429691381000002 Iteration 3 time:1.9344402954 Iteration 4 time:2.2075294246999997 Iteration 5 time:2.6328659593 Iteration 6 time:2.7911690492999996 Iteration 7 time:3.0850374104 Iteration 8 time:3.4031050061 Iteration 9 time:3.8826580919 Code: val modelSize = 1000000000 val numIterations = 10 val parallelizm = 5 var oldRDD = sc.parallelize(1 to modelSize, parallelizm).map(x => 0.1) var newRDD = sc.parallelize(1 to 1, parallelizm).map(x => 0.1) var i = 0 while (i < numIterations) { val t = System.nanoTime() // updating the weights val newRDD = oldRDD.map(x => x * x) oldRDD.unpersist(true) // “checking” convergence newRDD.mean println("Iteration " + i + " time:" + (System.nanoTime() - t) / 1e9 / numIterations) oldRDD = newRDD i += 1 } Best regards, Alexander --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org