Cool, that’s good to hear. We’d also like to add spilling in Python itself, or at least make it exit with a good message if it can’t do it.
Matei On May 14, 2014, at 10:47 AM, Jim Blomo <jim.bl...@gmail.com> wrote: > That worked amazingly well, thank you Matei! Numbers that worked for > me were 400 for the textFile()s, 1500 for the join()s. > > On Mon, May 12, 2014 at 7:58 PM, Matei Zaharia <matei.zaha...@gmail.com> > wrote: >> Hey Jim, unfortunately external spilling is not implemented in Python right >> now. While it would be possible to update combineByKey to do smarter stuff >> here, one simple workaround you can try is to launch more map tasks (or more >> reduce tasks). To set the minimum number of map tasks, you can pass it as a >> second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”, >> 1000)). >> >> Matei >> >> On May 12, 2014, at 5:47 PM, Jim Blomo <jim.bl...@gmail.com> wrote: >> >>> Thanks, Aaron, this looks like a good solution! Will be trying it out >>> shortly. >>> >>> I noticed that the S3 exception seem to occur more frequently when the >>> box is swapping. Why is the box swapping? combineByKey seems to make >>> the assumption that it can fit an entire partition in memory when >>> doing the combineLocally step. I'm going to try to break this apart >>> but will need some sort of heuristic options include looking at memory >>> usage via the resource module and trying to keep below >>> 'spark.executor.memory', or using batchSize to limit the number of >>> entries in the dictionary. Let me know if you have any opinions. >>> >>> On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson <ilike...@gmail.com> wrote: >>>> I'd just like to update this thread by pointing to the PR based on our >>>> initial design: https://github.com/apache/spark/pull/640 >>>> >>>> This solution is a little more general and avoids catching IOException >>>> altogether. Long live exception propagation! >>>> >>>> >>>> On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell <pwend...@gmail.com> >>>> wrote: >>>>> >>>>> Hey Jim, >>>>> >>>>> This IOException thing is a general issue that we need to fix and your >>>>> observation is spot-in. There is actually a JIRA for it here I created a >>>>> few >>>>> days ago: >>>>> https://issues.apache.org/jira/browse/SPARK-1579 >>>>> >>>>> Aaron is assigned on that one but not actively working on it, so we'd >>>>> welcome a PR from you on this if you are interested. >>>>> >>>>> The first thought we had was to set a volatile flag when the reader sees >>>>> an exception (indicating there was a failure in the task) and avoid >>>>> swallowing the IOException in the writer if this happens. But I think >>>>> there >>>>> is a race here where the writer sees the error first before the reader >>>>> knows >>>>> what is going on. >>>>> >>>>> Anyways maybe if you have a simpler solution you could sketch it out in >>>>> the JIRA and we could talk over there. The current proposal in the JIRA is >>>>> somewhat complicated... >>>>> >>>>> - Patrick >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo <jim.bl...@gmail.com> wrote: >>>>>> >>>>>> FYI, it looks like this "stdin writer to Python finished early" error was >>>>>> caused by a break in the connection to S3, from which the data was being >>>>>> pulled. A recent commit to PythonRDD noted that the current exception >>>>>> catching can potentially mask an exception for the data source, and that >>>>>> is >>>>>> indeed what I see happening. The underlying libraries (jets3t and >>>>>> httpclient) do have retry capabilities, but I don't see a great way of >>>>>> setting them through Spark code. Instead I added the patch below which >>>>>> kills the worker on the exception. This allows me to completely load the >>>>>> data source after a few worker retries. >>>>>> >>>>>> Unfortunately, java.net.SocketException is the same error that is >>>>>> sometimes expected from the client when using methods like take(). One >>>>>> approach around this conflation is to create a new locally scoped >>>>>> exception >>>>>> class, eg. WriterException, catch java.net.SocketException during output >>>>>> writing, then re-throw the new exception. The worker thread could then >>>>>> distinguish between the reasons java.net.SocketException might be thrown. >>>>>> Perhaps there is a more elegant way to do this in Scala, though? >>>>>> >>>>>> Let me know if I should open a ticket or discuss this on the developers >>>>>> list instead. Best, >>>>>> >>>>>> Jim >>>>>> >>>>>> diff --git >>>>>> a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala >>>>>> b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala >>>>>> index 0d71fdb..f31158c 100644 >>>>>> --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala >>>>>> +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala >>>>>> @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag]( >>>>>> readerException = e >>>>>> Try(worker.shutdownOutput()) // kill Python worker process >>>>>> >>>>>> + case e: java.net.SocketException => >>>>>> + // This can happen if a connection to the datasource, eg S3, >>>>>> resets >>>>>> + // or is otherwise broken >>>>>> + readerException = e >>>>>> + Try(worker.shutdownOutput()) // kill Python worker process >>>>>> + >>>>>> case e: IOException => >>>>>> // This can happen for legitimate reasons if the Python code >>>>>> stops returning data >>>>>> // before we are done passing elements through, e.g., for >>>>>> take(). Just log a message to >>>>>> >>>>>> >>>>>> On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo <jim.bl...@gmail.com> wrote: >>>>>>> >>>>>>> This dataset is uncompressed text at ~54GB. stats() returns (count: >>>>>>> 56757667, mean: 1001.68740583, stdev: 601.775217822, max: 8965, min: >>>>>>> 343) >>>>>>> >>>>>>> On Wed, Apr 9, 2014 at 6:59 PM, Matei Zaharia <matei.zaha...@gmail.com> >>>>>>> wrote: >>>>>>>> Okay, thanks. Do you have any info on how large your records and data >>>>>>>> file are? I'd like to reproduce and fix this. >>>>>>>> >>>>>>>> Matei >>>>>>>> >>>>>>>> On Apr 9, 2014, at 3:52 PM, Jim Blomo <jim.bl...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Matei, thanks for working with me to find these issues. >>>>>>>>> >>>>>>>>> To summarize, the issues I've seen are: >>>>>>>>> 0.9.0: >>>>>>>>> - https://issues.apache.org/jira/browse/SPARK-1323 >>>>>>>>> >>>>>>>>> SNAPSHOT 2014-03-18: >>>>>>>>> - When persist() used and batchSize=1, java.lang.OutOfMemoryError: >>>>>>>>> Java heap space. To me this indicates a memory leak since Spark >>>>>>>>> should simply be counting records of size < 3MB >>>>>>>>> - Without persist(), "stdin writer to Python finished early" hangs >>>>>>>>> the >>>>>>>>> application, unknown root cause >>>>>>>>> >>>>>>>>> I've recently rebuilt another SNAPSHOT, git commit 16b8308 with >>>>>>>>> debugging turned on. This gives me the stacktrace on the new "stdin" >>>>>>>>> problem: >>>>>>>>> >>>>>>>>> 14/04/09 22:22:45 DEBUG PythonRDD: stdin writer to Python finished >>>>>>>>> early >>>>>>>>> java.net.SocketException: Connection reset >>>>>>>>> at java.net.SocketInputStream.read(SocketInputStream.java:196) >>>>>>>>> at java.net.SocketInputStream.read(SocketInputStream.java:122) >>>>>>>>> at >>>>>>>>> sun.security.ssl.InputRecord.readFully(InputRecord.java:442) >>>>>>>>> at >>>>>>>>> sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554) >>>>>>>>> at sun.security.ssl.InputRecord.read(InputRecord.java:509) >>>>>>>>> at >>>>>>>>> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927) >>>>>>>>> at >>>>>>>>> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884) >>>>>>>>> at >>>>>>>>> sun.security.ssl.AppInputStream.read(AppInputStream.java:102) >>>>>>>>> at >>>>>>>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273) >>>>>>>>> at >>>>>>>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334) >>>>>>>>> at >>>>>>>>> org.apache.commons.httpclient.WireLogInputStream.read(WireLogInputStream.java:69) >>>>>>>>> at >>>>>>>>> org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170) >>>>>>>>> at java.io.FilterInputStream.read(FilterInputStream.java:133) >>>>>>>>> at >>>>>>>>> org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108) >>>>>>>>> at >>>>>>>>> org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76) >>>>>>>>> at >>>>>>>>> org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136) >>>>>>>>> at >>>>>>>>> org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98) >>>>>>>>> at >>>>>>>>> java.io.BufferedInputStream.read1(BufferedInputStream.java:273) >>>>>>>>> at >>>>>>>>> java.io.BufferedInputStream.read(BufferedInputStream.java:334) >>>>>>>>> at java.io.DataInputStream.read(DataInputStream.java:100) >>>>>>>>> at >>>>>>>>> org.apache.hadoop.util.LineReader.readLine(LineReader.java:134) >>>>>>>>> at >>>>>>>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133) >>>>>>>>> at >>>>>>>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38) >>>>>>>>> at >>>>>>>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:192) >>>>>>>>> at >>>>>>>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:175) >>>>>>>>> at >>>>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>>>>>>>> at >>>>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27) >>>>>>>>> at >>>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>>> at >>>>>>>>> scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) >>>>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>>>>>> at >>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>>>>>>> at >>>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242) >>>>>>>>> at >>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Apr 3, 2014 at 8:37 PM, Matei Zaharia >>>>>>>>> <matei.zaha...@gmail.com> wrote: >>>>>>>>>> Cool, thanks for the update. Have you tried running a branch with >>>>>>>>>> this fix (e.g. branch-0.9, or the 0.9.1 release candidate?) Also, >>>>>>>>>> what >>>>>>>>>> memory leak issue are you referring to, is it separate from this? >>>>>>>>>> (Couldn't >>>>>>>>>> find it earlier in the thread.) >>>>>>>>>> >>>>>>>>>> To turn on debug logging, copy conf/log4j.properties.template to >>>>>>>>>> conf/log4j.properties and change the line log4j.rootCategory=INFO, >>>>>>>>>> console >>>>>>>>>> to log4j.rootCategory=DEBUG, console. Then make sure this file is >>>>>>>>>> present in >>>>>>>>>> "conf" on all workers. >>>>>>>>>> >>>>>>>>>> BTW I've managed to run PySpark with this fix on some reasonably >>>>>>>>>> large S3 data (multiple GB) and it was fine. It might happen only if >>>>>>>>>> records >>>>>>>>>> are large, or something like that. How much heap are you giving to >>>>>>>>>> your >>>>>>>>>> executors, and does it show that much in the web UI? >>>>>>>>>> >>>>>>>>>> Matei >>>>>>>>>> >>>>>>>>>> On Mar 29, 2014, at 10:44 PM, Jim Blomo <jim.bl...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> I think the problem I ran into in 0.9 is covered in >>>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-1323 >>>>>>>>>>> >>>>>>>>>>> When I kill the python process, the stacktrace I gets indicates >>>>>>>>>>> that >>>>>>>>>>> this happens at initialization. It looks like the initial write to >>>>>>>>>>> the Python process does not go through, and then the iterator hangs >>>>>>>>>>> waiting for output. I haven't had luck turning on debugging for >>>>>>>>>>> the >>>>>>>>>>> executor process. Still trying to learn the lgo4j properties that >>>>>>>>>>> need to be set. >>>>>>>>>>> >>>>>>>>>>> No luck yet on tracking down the memory leak. >>>>>>>>>>> >>>>>>>>>>> 14/03/30 05:15:04 ERROR executor.Executor: Exception in task ID 11 >>>>>>>>>>> org.apache.spark.SparkException: Python worker exited unexpectedly >>>>>>>>>>> (crashed) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:168) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:113) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:231) >>>>>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:222) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) >>>>>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:52) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:212) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:43) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) >>>>>>>>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>>>>>>>> at javax.security.auth.Subject.doAs(Subject.java:415) >>>>>>>>>>> at >>>>>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:42) >>>>>>>>>>> at >>>>>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) >>>>>>>>>>> at >>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>>>>>> at >>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>>>>>> at java.lang.Thread.run(Thread.java:724) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sat, Mar 29, 2014 at 3:17 PM, Jim Blomo <jim.bl...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>>> I've only tried 0.9, in which I ran into the `stdin writer to >>>>>>>>>>>> Python >>>>>>>>>>>> finished early` so frequently I wasn't able to load even a 1GB >>>>>>>>>>>> file. >>>>>>>>>>>> Let me know if I can provide any other info! >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Mar 27, 2014 at 8:48 PM, Matei Zaharia >>>>>>>>>>>> <matei.zaha...@gmail.com> wrote: >>>>>>>>>>>>> I see, did this also fail with previous versions of Spark (0.9 or >>>>>>>>>>>>> 0.8)? We'll try to look into these, seems like a serious error. >>>>>>>>>>>>> >>>>>>>>>>>>> Matei >>>>>>>>>>>>> >>>>>>>>>>>>> On Mar 27, 2014, at 7:27 PM, Jim Blomo <jim.bl...@gmail.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, Matei. I am running "Spark 1.0.0-SNAPSHOT built for >>>>>>>>>>>>>> Hadoop >>>>>>>>>>>>>> 1.0.4" from GitHub on 2014-03-18. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I tried batchSizes of 512, 10, and 1 and each got me further but >>>>>>>>>>>>>> none >>>>>>>>>>>>>> have succeeded. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I can get this to work -- with manual interventions -- if I omit >>>>>>>>>>>>>> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set >>>>>>>>>>>>>> batchSize=1. 5 >>>>>>>>>>>>>> of the 175 executors hung, and I had to kill the python process >>>>>>>>>>>>>> to get >>>>>>>>>>>>>> things going again. The only indication of this in the logs was >>>>>>>>>>>>>> `INFO >>>>>>>>>>>>>> python.PythonRDD: stdin writer to Python finished early`. >>>>>>>>>>>>>> >>>>>>>>>>>>>> With batchSize=1 and persist, a new memory error came up in >>>>>>>>>>>>>> several >>>>>>>>>>>>>> tasks, before the app was failed: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in >>>>>>>>>>>>>> thread Thread[stdin writer for python,5,main] >>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space >>>>>>>>>>>>>> at java.util.Arrays.copyOfRange(Arrays.java:2694) >>>>>>>>>>>>>> at java.lang.String.<init>(String.java:203) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561) >>>>>>>>>>>>>> at java.nio.CharBuffer.toString(CharBuffer.java:1201) >>>>>>>>>>>>>> at org.apache.hadoop.io.Text.decode(Text.java:350) >>>>>>>>>>>>>> at org.apache.hadoop.io.Text.decode(Text.java:327) >>>>>>>>>>>>>> at org.apache.hadoop.io.Text.toString(Text.java:254) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.Iterator$$anon$12.next(Iterator.scala:357) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) >>>>>>>>>>>>>> >>>>>>>>>>>>>> There are other exceptions, but I think they all stem from the >>>>>>>>>>>>>> above, >>>>>>>>>>>>>> eg. org.apache.spark.SparkException: Error sending message to >>>>>>>>>>>>>> BlockManagerMaster >>>>>>>>>>>>>> >>>>>>>>>>>>>> Let me know if there are other settings I should try, or if I >>>>>>>>>>>>>> should >>>>>>>>>>>>>> try a newer snapshot. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks again! >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia >>>>>>>>>>>>>> <matei.zaha...@gmail.com> wrote: >>>>>>>>>>>>>>> Hey Jim, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> In Spark 0.9 we added a "batchSize" parameter to PySpark that >>>>>>>>>>>>>>> makes it group multiple objects together before passing them >>>>>>>>>>>>>>> between Java >>>>>>>>>>>>>>> and Python, but this may be too high by default. Try passing >>>>>>>>>>>>>>> batchSize=10 to >>>>>>>>>>>>>>> your SparkContext constructor to lower it (the default is >>>>>>>>>>>>>>> 1024). Or even >>>>>>>>>>>>>>> batchSize=1 to match earlier versions. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Matei >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mar 21, 2014, at 6:18 PM, Jim Blomo <jim.bl...@gmail.com> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi all, I'm wondering if there's any settings I can use to >>>>>>>>>>>>>>>> reduce the >>>>>>>>>>>>>>>> memory needed by the PythonRDD when computing simple stats. I >>>>>>>>>>>>>>>> am >>>>>>>>>>>>>>>> getting OutOfMemoryError exceptions while calculating count() >>>>>>>>>>>>>>>> on big, >>>>>>>>>>>>>>>> but not absurd, records. It seems like PythonRDD is trying to >>>>>>>>>>>>>>>> keep >>>>>>>>>>>>>>>> too many of these records in memory, when all that is needed >>>>>>>>>>>>>>>> is to >>>>>>>>>>>>>>>> stream through them and count. Any tips for getting through >>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>> workload? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Code: >>>>>>>>>>>>>>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed >>>>>>>>>>>>>>>> data >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> # the biggest individual text line is ~3MB >>>>>>>>>>>>>>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda >>>>>>>>>>>>>>>> (y,s): >>>>>>>>>>>>>>>> (loads(y), loads(s))) >>>>>>>>>>>>>>>> parsed.persist(StorageLevel.MEMORY_AND_DISK) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> parsed.count() >>>>>>>>>>>>>>>> # will never finish: executor.Executor: Uncaught exception >>>>>>>>>>>>>>>> will FAIL >>>>>>>>>>>>>>>> all executors >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Incidentally the whole app appears to be killed, but this >>>>>>>>>>>>>>>> error is not >>>>>>>>>>>>>>>> propagated to the shell. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Cluster: >>>>>>>>>>>>>>>> 15 m2.xlarges (17GB memory, 17GB swap, >>>>>>>>>>>>>>>> spark.executor.memory=10GB) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Exception: >>>>>>>>>>>>>>>> java.lang.OutOfMemoryError: Java heap space >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94) >>>>>>>>>>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:220) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85) >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>>> >>>> >>