Actually the program hangs just by calling dataAllRDD.count(). I suspect
creating the RDD is not successful when its elements are too big. When nY =
3000, dataAllRDD.count() works (each element of dataAll = 3000*400*64 bits =
9.6 MB), but when nY = 4000, it hangs (4000*400*64 bits = 12.8 MB).
What are the limiting factors to the size of the elements of an RDD?
sparkuser2345 wrote
> I have an array 'dataAll' of key-value pairs where each value is an array
> of arrays. I would like to parallelize a task over the elements of
> 'dataAll' to the workers. In the dummy example below, the number of
> elements in 'dataAll' is 3 but in real application it would be tens to
> hundreds.
>
> Without parallelizing dataAll, 'result' is calculated in less than a
> second:
>
> import org.jblas.DoubleMatrix
>
> val nY = 5000
> val nX = 400
>
> val dataAll = Array((1, Array.fill(nY)(Array.fill(nX)(1.0))),
> (2, Array.fill(nY)(Array.fill(nX)(1.0))),
> (3, Array.fill(nY)(Array.fill(nX)(1.0))))
>
> val w1 = DoubleMatrix.ones(400)
>
> // This finishes in less than a second:
> val result = dataAll.map { dat =>
> val c = dat._1
> val dataArr = dat._2
> // Map over the Arrays within dataArr:
> val test = dataArr.map { arr =>
> val test2 = new DoubleMatrix(arr.length, 1, arr:_*)
> val out = test2.dot(w1)
> out
> }
> (c, test)
> }
>
> However, when I parallelize dataAll, the same task freezes:
>
> val dataAllRDD = sc.parallelize(dataAll, 3)
>
> // This doesn't finish in several minutes:
> val result = dataAllRDD.map { dat =>
> val c = dat._1
> val dataArr = dat._2
> // Map over the Arrays within dataArr:
> val test = dataArr.map { arr =>
> val test2 = new DoubleMatrix(arr.length, 1, arr:_*)
> val out = test2.dot(w1)
> out
> }
> (c, test)
> }.collect
>
> After sending the above task, nothing is written to the worker logs (as
> viewed through the web UI), but the following output is printed in the
> Spark shell where I'm running the task:
>
> 14/08/11 18:17:31 INFO SparkContext: Starting job: collect at
> <console>
> :33
> 14/08/11 18:17:31 INFO DAGScheduler: Got job 0 (collect at
> <console>
> :33) with 3 output partitions (allowLocal=false)
> 14/08/11 18:17:31 INFO DAGScheduler: Final stage: Stage 0 (collect at
> <console>
> :33)
> 14/08/11 18:17:31 INFO DAGScheduler: Parents of final stage: List()
> 14/08/11 18:17:31 INFO DAGScheduler: Missing parents: List()
> 14/08/11 18:17:31 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[1] at
> map at
> <console>
> :23), which has no missing parents
> 14/08/11 18:17:32 INFO DAGScheduler: Submitting 3 missing tasks from Stage
> 0 (MappedRDD[1] at map at
> <console>
> :23)
> 14/08/11 18:17:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
> 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
> executor 2:
> <executor_2_IP>
> (PROCESS_LOCAL)
> 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:0 as 16154060
> bytes in 69 ms
> 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on
> executor 1:
> <executor_1_IP>
> (PROCESS_LOCAL)
> 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:1 as 16154060
> bytes in 81 ms
> 14/08/11 18:17:32 INFO TaskSetManager: Starting task 0.0:2 as TID 2 on
> executor 0:
> <executor_0_IP>
> (PROCESS_LOCAL)
> 14/08/11 18:17:32 INFO TaskSetManager: Serialized task 0.0:2 as 16154060
> bytes in 66 ms
>
>
> dataAllRDD.map does work with smaller array though (e.g. nY = 100;
> finishes in less than a second).
>
> Why is dataAllRDD.map so much slower than dataAll.map, or even not
> executing at all?
>
> The Spark version I'm using is 0.9.0.
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Parallelizing-a-task-makes-it-freeze-tp11900p11967.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]