Responses inline. On Wed, Jul 23, 2014 at 4:13 AM, lalit1303 <la...@sigmoidanalytics.com> wrote: > Hi, > Thanks TD for your reply. I am still not able to resolve the problem for my > use case. > I have let's say 1000 different RDD's, and I am applying a transformation > function on each RDD and I want the output of all rdd's combined to a single > output RDD. For, this I am doing the following: > > *<Loop Start>* > tempRDD = jaRDD.rdd().repartition(1).mapPartitions(....).toJavaRDD(); > *//creating new rdd in every loop* > outRDD = outRDD.union(tempRDD); *//keep joining RDD's to get the output into > a single RDD* > > *//after every 10 iteration, in order to truncate the lineage* > cachRDD = outRDD.cache(); > cachRDD.checkpoint(); > System.out.println(cachRDD.collect().size()); > outRDD = new JavaRDD<String>(cachRDD.rdd(),cachRDD.classTag()); > *<Loop Ends>* > > *//finally after whole computation* > outRDD.saveAsTextFile(..) > > The above operations is overall slow, runs successfully when performed less > iterations i.e. ~100. But, when the num of iterations in increased to ~1000, > The whole job is taking more than *30 mins* and ultimately break down giving > OutOfMemory error. The total size of data is around 1.4 MB. As of now, I am > running the job on spark standalone mode with 2 cores and 2.9 GB memory.
I think this is happening because how you are caching the output RDD that are being generated repeatedly. In every iteration, it is building this new union RDD which contains the data of the previous union RDD plus some new data. Since each of these union RDDs are cached, the underlying data is being cached repeatedly. So the cached Iteration 1: union RDD: X MB Iteration 2: union RDD: 2X MB | Total size cached: 3X Iteration 3: union RDD: 3X MB | Total size cached: 6X MB Iteration 4: union RDD: 4X MB | Total size cached: 10X MB ... If you do the math, that is a quadratic increase in the size of the data being processed and cached, wrt the # iterations. This leads to both increase in run time and memory usage. > I also observed that when collect() operation is performed, the number of > tasks keeps on increasing as the loop proceeds, like on first collect() 22 > total task, then ~40 total tasks ... ~300 task for single collect. > Does this means that all the operations are repeatedly performed, and RDD > lineage is not broken?? > Same reason as above. Each union RDD is build by appending the partitions of the previous union RDD plus the new set of partitions (~22 partitions). So Nth union RDD has N * 22 partitions, hence that many tasks. You could change this by also doing repartitioning when you want to cache+checkpoint the union RDD (therefore, outRDD.repartition(100).cache().checkpoint().count()). And do you really need all the data to be collected at the driver? If you are doing the cachRDD.collect() just to forced the checkpoint, then use cachRDD.count() > > Can you please elaborate on the point from your last post i.e. how to > perform: "*Create a modified RDD R` which has the same data as RDD R but > does not have the lineage. This is done by creating a new BlockRDD using the > ids of blocks of data representing the in-memory R*" > Please refer to the lines in the function: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala#L74 What those lines do is save the data of the associated RDD to HDFS files, and then create a new CheckpointRDD from the same files.Then the dependency of the associated RDD is changed to use the new RDD. This truncates the lineage because the associated RDD's parent is not the new RDD which has a very short lineage (links to checkpoint files). And the previous dependencies (parent RDDs) are forgotten. This implementation can be modified by forcing the data of the associated RDD to be cached with StorageLevel.MEMORY_AND_DISK_2. And then instead of CheckpointRDD, you can create a new BlockRDD (using the names of the blocks that are used to cache the RDD), which is then set as the new dependency. This is definitely a behind-the-public API implementation, that is > > > ----- > Lalit Yadav > la...@sigmoidanalytics.com > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p10488.html > Sent from the Apache Spark User List mailing list archive at Nabble.com.