That worked amazingly well, thank you Matei! Numbers that worked for me were 400 for the textFile()s, 1500 for the join()s.
On Mon, May 12, 2014 at 7:58 PM, Matei Zaharia <matei.zaha...@gmail.com> wrote: > Hey Jim, unfortunately external spilling is not implemented in Python right > now. While it would be possible to update combineByKey to do smarter stuff > here, one simple workaround you can try is to launch more map tasks (or more > reduce tasks). To set the minimum number of map tasks, you can pass it as a > second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”, > 1000)). > > Matei > > On May 12, 2014, at 5:47 PM, Jim Blomo <jim.bl...@gmail.com> wrote: > >> Thanks, Aaron, this looks like a good solution! Will be trying it out >> shortly. >> >> I noticed that the S3 exception seem to occur more frequently when the >> box is swapping. Why is the box swapping? combineByKey seems to make >> the assumption that it can fit an entire partition in memory when >> doing the combineLocally step. I'm going to try to break this apart >> but will need some sort of heuristic options include looking at memory >> usage via the resource module and trying to keep below >> 'spark.executor.memory', or using batchSize to limit the number of >> entries in the dictionary. Let me know if you have any opinions. >> >> On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson <ilike...@gmail.com> wrote: >>> I'd just like to update this thread by pointing to the PR based on our >>> initial design: https://github.com/apache/spark/pull/640 >>> >>> This solution is a little more general and avoids catching IOException >>> altogether. Long live exception propagation! >>> >>> >>> On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell <pwend...@gmail.com> wrote: >>>> >>>> Hey Jim, >>>> >>>> This IOException thing is a general issue that we need to fix and your >>>> observation is spot-in. There is actually a JIRA for it here I created a >>>> few >>>> days ago: >>>> https://issues.apache.org/jira/browse/SPARK-1579 >>>> >>>> Aaron is assigned on that one but not actively working on it, so we'd >>>> welcome a PR from you on this if you are interested. >>>> >>>> The first thought we had was to set a volatile flag when the reader sees >>>> an exception (indicating there was a failure in the task) and avoid >>>> swallowing the IOException in the writer if this happens. But I think there >>>> is a race here where the writer sees the error first before the reader >>>> knows >>>> what is going on. >>>> >>>> Anyways maybe if you have a simpler solution you could sketch it out in >>>> the JIRA and we could talk over there. The current proposal in the JIRA is >>>> somewhat complicated... >>>> >>>> - Patrick >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo <jim.bl...@gmail.com> wrote: >>>>> >>>>> FYI, it looks like this "stdin writer to Python finished early" error was >>>>> caused by a break in the connection to S3, from which the data was being >>>>> pulled. A recent commit to PythonRDD noted that the current exception >>>>> catching can potentially mask an exception for the data source, and that >>>>> is >>>>> indeed what I see happening. The underlying libraries (jets3t and >>>>> httpclient) do have retry capabilities, but I don't see a great way of >>>>> setting them through Spark code. Instead I added the patch below which >>>>> kills the worker on the exception. This allows me to completely load the >>>>> data source after a few worker retries. >>>>> >>>>> Unfortunately, java.net.SocketException is the same error that is >>>>> sometimes expected from the client when using methods like take(). One >>>>> approach around this conflation is to create a new locally scoped >>>>> exception >>>>> class, eg. WriterException, catch java.net.SocketException during output >>>>> writing, then re-throw the new exception. The worker thread could then >>>>> distinguish between the reasons java.net.SocketException might be thrown. >>>>> Perhaps there is a more elegant way to do this in Scala, though? >>>>> >>>>> Let me know if I should open a ticket or discuss this on the developers >>>>> list instead. Best, >>>>> >>>>> Jim >>>>> >>>>> diff --git >>>>> a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala >>>>> b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala >>>>> index 0d71fdb..f31158c 100644 >>>>> --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala >>>>> +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala >>>>> @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag]( >>>>> readerException = e >>>>> Try(worker.shutdownOutput()) // kill Python worker process >>>>> >>>>> + case e: java.net.SocketException => >>>>> + // This can happen if a connection to the datasource, eg S3, >>>>> resets >>>>> + // or is otherwise broken >>>>> + readerException = e >>>>> + Try(worker.shutdownOutput()) // kill Python worker process >>>>> + >>>>> case e: IOException => >>>>> // This can happen for legitimate reasons if the Python code >>>>> stops returning data >>>>> // before we are done passing elements through, e.g., for >>>>> take(). Just log a message to >>>>> >>>>> >>>>> On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo <jim.bl...@gmail.com> wrote: >>>>>> >>>>>> This dataset is uncompressed text at ~54GB. stats() returns (count: >>>>>> 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min: >>>>>> 343) >>>>>> >>>>>> On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia <matei.zaha...@gmail.com> >>>>>> wrote: >>>>>>> Okay, thanks. Do you have any info on how large your records and data >>>>>>> file are? I'd like to reproduce and fix this. >>>>>>> >>>>>>> Matei >>>>>>> >>>>>>> On Apr 9, 2014, at 3:52 PM, Jim Blomo <jim.bl...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Matei, thanks for working with me to find these issues. >>>>>>>> >>>>>>>> To summarize, the issues I've seen are: >>>>>>>> 0.9.0: >>>>>>>> - https://issues.apache.org/jira/browse/SPARK-1323 >>>>>>>> >>>>>>>> SNAPSHOT 2014-03-18: >>>>>>>> - When persist() used and batchSize=1, java.lang.OutOfMemoryError: >>>>>>>> Java heap space. To me this indicates a memory leak since Spark >>>>>>>> should simply be counting records of size < 3MB >>>>>>>> - Without persist(), "stdin writer to Python finished early" hangs >>>>>>>> the >>>>>>>> application, unknown root cause >>>>>>>> >>>>>>>> I've recently rebuilt another SNAPSHOT, git commit 16b8308 with >>>>>>>> debugging turned on. This gives me the stacktrace on the new "stdin" >>>>>>>> problem: >>>>>>>> >>>>>>>> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished >>>>>>>> early >>>>>>>> java.net.SocketException: Connection reset >>>>>>>> at java.net.SocketInputStream.read(SocketInputStream.java:196) >>>>>>>> at java.net.SocketInputStream.read(SocketInputStream.java:122) >>>>>>>> at >>>>>>>> sun.security.ssl.InputRecord.readFully(InputRecord.java:442) >>>>>>>> at >>>>>>>> sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554) >>>>>>>> at sun.security.ssl.InputRecord.read(InputRecord.java:509) >>>>>>>> at >>>>>>>> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927) >>>>>>>> at >>>>>>>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884) >>>>>>>> at >>>>>>>> sun.security.ssl.AppInputStream.read(AppInputStream.java:102) >>>>>>>> at >>>>>>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273) >>>>>>>> at >>>>>>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334) >>>>>>>> at >>>>>>>> org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69) >>>>>>>> at >>>>>>>> org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170) >>>>>>>> at java.io.FilterInputStream.read(FilterInputStream.java:133) >>>>>>>> at >>>>>>>> org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108) >>>>>>>> at >>>>>>>> org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76) >>>>>>>> at >>>>>>>> org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136) >>>>>>>> at >>>>>>>> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98) >>>>>>>> at >>>>>>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273) >>>>>>>> at >>>>>>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334) >>>>>>>> at java.io.DataInputStream.read(DataInputStream.java:100) >>>>>>>> at >>>>>>>> org.apache.hadoop.util.LineReader.readLine(LineReader.java:134) >>>>>>>> at >>>>>>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133) >>>>>>>> at >>>>>>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175) >>>>>>>> at >>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>>>>>>> at >>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27) >>>>>>>> at >>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>> at >>>>>>>> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) >>>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>>>>> at >>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>>>>>> at >>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242) >>>>>>>> at >>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia >>>>>>>> <matei.zaha...@gmail.com> wrote: >>>>>>>>> Cool, thanks for the update. Have you tried running a branch with >>>>>>>>> this fix (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, what >>>>>>>>> memory leak issue are you referring to, is it separate from this? >>>>>>>>> (Couldn't >>>>>>>>> find it earlier in the thread.) >>>>>>>>> >>>>>>>>> To turn on debug logging, copy conf/log4j.properties.template to >>>>>>>>> conf/log4j.properties and change the line log4j.rootCategory=INFO, >>>>>>>>> console >>>>>>>>> to log4j.rootCategory=DEBUG, console. Then make sure this file is >>>>>>>>> present in >>>>>>>>> "conf" on all workers. >>>>>>>>> >>>>>>>>> BTW I've managed to run PySpark with this fix on some reasonably >>>>>>>>> large S3 data (multiple GB) and it was fine. It might happen only if >>>>>>>>> records >>>>>>>>> are large, or something like that. How much heap are you giving to >>>>>>>>> your >>>>>>>>> executors, and does it show that much in the web UI? >>>>>>>>> >>>>>>>>> Matei >>>>>>>>> >>>>>>>>> On Mar 29, 2014, at 10:44 PM, Jim Blomo <jim.bl...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> I think the problem I ran into in 0.9 is covered in >>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-1323 >>>>>>>>>> >>>>>>>>>> When I kill the python process, the stacktrace I gets indicates >>>>>>>>>> that >>>>>>>>>> this happens at initialization. It looks like the initial write to >>>>>>>>>> the Python process does not go through, and then the iterator hangs >>>>>>>>>> waiting for output. I haven't had luck turning on debugging for >>>>>>>>>> the >>>>>>>>>> executor process. Still trying to learn the lgo4j properties that >>>>>>>>>> need to be set. >>>>>>>>>> >>>>>>>>>> No luck yet on tracking down the memory leak. >>>>>>>>>> >>>>>>>>>> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11 >>>>>>>>>> org.apache.spark.SparkException: Python worker exited unexpectedly >>>>>>>>>> (crashed) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231) >>>>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:222) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) >>>>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:52) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) >>>>>>>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>>>>>>> at javax.security.auth.Subject.doAs(Subject.java:415) >>>>>>>>>> at >>>>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) >>>>>>>>>> at >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>>>>> at >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>>>>> at java.lang.Thread.run(Thread.java:724) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <jim.bl...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>>> I've only tried 0.9, in which I ran into the `stdin writer to >>>>>>>>>>> Python >>>>>>>>>>> finished early` so frequently I wasn't able to load even a 1GB >>>>>>>>>>> file. >>>>>>>>>>> Let me know if I can provide any other info! >>>>>>>>>>> >>>>>>>>>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia >>>>>>>>>>> <matei.zaha...@gmail.com> wrote: >>>>>>>>>>>> I see, did this also fail with previous versions of Spark (0.9 or >>>>>>>>>>>> 0.8)? We'll try to look into these, seems like a serious error. >>>>>>>>>>>> >>>>>>>>>>>> Matei >>>>>>>>>>>> >>>>>>>>>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <jim.bl...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks, Matei. I am running "Spark 1.0.0-SNAPSHOT built for >>>>>>>>>>>>> Hadoop >>>>>>>>>>>>> 1.0.4" from GitHub on 2014-03-18. >>>>>>>>>>>>> >>>>>>>>>>>>> I tried batchSizes of 512, 10, and 1 and each got me further but >>>>>>>>>>>>> none >>>>>>>>>>>>> have succeeded. >>>>>>>>>>>>> >>>>>>>>>>>>> I can get this to work -- with manual interventions -- if I omit >>>>>>>>>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set >>>>>>>>>>>>> batchSize=1. 5 >>>>>>>>>>>>> of the 175 executors hung, and I had to kill the python process >>>>>>>>>>>>> to get >>>>>>>>>>>>> things going again. The only indication of this in the logs was >>>>>>>>>>>>> `INFO >>>>>>>>>>>>> python.PythonRDD: stdin writer to Python finished early`. >>>>>>>>>>>>> >>>>>>>>>>>>> With batchSize=1 and persist, a new memory error came up in >>>>>>>>>>>>> several >>>>>>>>>>>>> tasks, before the app was failed: >>>>>>>>>>>>> >>>>>>>>>>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in >>>>>>>>>>>>> thread Thread[stdin writer for python,5,main] >>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space >>>>>>>>>>>>> at java.util.Arrays.copyOfRange(Arrays.java:2694) >>>>>>>>>>>>> at java.lang.String.<init>(String.java:203) >>>>>>>>>>>>> at >>>>>>>>>>>>> java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561) >>>>>>>>>>>>> at java.nio.CharBuffer.toString(CharBuffer.java:1201) >>>>>>>>>>>>> at org.apache.hadoop.io.Text.decode(Text.java:350) >>>>>>>>>>>>> at org.apache.hadoop.io.Text.decode(Text.java:327) >>>>>>>>>>>>> at org.apache.hadoop.io.Text.toString(Text.java:254) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.collection.Iterator$$anon$12.next(Iterator.scala:357) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>>>>>>>>>> at >>>>>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) >>>>>>>>>>>>> >>>>>>>>>>>>> There are other exceptions, but I think they all stem from the >>>>>>>>>>>>> above, >>>>>>>>>>>>> eg. org.apache.spark.SparkException: Error sending message to >>>>>>>>>>>>> BlockManagerMaster >>>>>>>>>>>>> >>>>>>>>>>>>> Let me know if there are other settings I should try, or if I >>>>>>>>>>>>> should >>>>>>>>>>>>> try a newer snapshot. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks again! >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia >>>>>>>>>>>>> <matei.zaha...@gmail.com> wrote: >>>>>>>>>>>>>> Hey Jim, >>>>>>>>>>>>>> >>>>>>>>>>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that >>>>>>>>>>>>>> makes it group multiple objects together before passing them >>>>>>>>>>>>>> between Java >>>>>>>>>>>>>> and Python, but this may be too high by default. Try passing >>>>>>>>>>>>>> batchSize=10 to >>>>>>>>>>>>>> your SparkContext constructor to lower it (the default is 1024). >>>>>>>>>>>>>> Or even >>>>>>>>>>>>>> batchSize=1 to match earlier versions. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Matei >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <jim.bl...@gmail.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi all, I'm wondering if there's any settings I can use to >>>>>>>>>>>>>>> reduce the >>>>>>>>>>>>>>> memory needed by the PythonRDD when computing simple stats. I >>>>>>>>>>>>>>> am >>>>>>>>>>>>>>> getting OutOfMemoryError exceptions while calculating count() >>>>>>>>>>>>>>> on big, >>>>>>>>>>>>>>> but not absurd, records. It seems like PythonRDD is trying to >>>>>>>>>>>>>>> keep >>>>>>>>>>>>>>> too many of these records in memory, when all that is needed >>>>>>>>>>>>>>> is to >>>>>>>>>>>>>>> stream through them and count. Any tips for getting through >>>>>>>>>>>>>>> this >>>>>>>>>>>>>>> workload? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Code: >>>>>>>>>>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed >>>>>>>>>>>>>>> data >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> # the biggest individual text line is ~3MB >>>>>>>>>>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda >>>>>>>>>>>>>>> (y,s): >>>>>>>>>>>>>>> (loads(y), loads(s))) >>>>>>>>>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> parsed.count() >>>>>>>>>>>>>>> # will never finish: executor.Executor: Uncaught exception >>>>>>>>>>>>>>> will FAIL >>>>>>>>>>>>>>> all executors >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Incidentally the whole app appears to be killed, but this >>>>>>>>>>>>>>> error is not >>>>>>>>>>>>>>> propagated to the shell. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Cluster: >>>>>>>>>>>>>>> 15 m2.xlarges (17GB memory, 17GB swap, >>>>>>>>>>>>>>> spark.executor.memory=10GB) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Exception: >>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94) >>>>>>>>>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:220) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >>>>> >>>> >>> >