Patrick, that was the problem. Individual partitions were too big to fit in memory.
I also believe the Snappy compression codec in Hadoop is not splittable. > This means that each of your JSON files is read in its entirety as one > spark partition. If you have files that are larger than the standard block > size (128MB), it will exacerbate this shortcoming of Spark. Incidentally, > this means minPartitions won't help you at all here. My source data is just plain JSON text (i.e. no Snappy compression), so I was able to solve the problem (i.e. get the call to cache() to work) by increasing minPartitions to a large enough number that I guess made a single partition small enough to fit in memory. It’s great that the issue reported in SPARK-1777 <https://issues.apache.org/jira/browse/SPARK-1777> was identified and fixed. It makes it so that users don’t have to think “Will this crash my job?” before calling cache(). That’s great. Otherwise, I would’ve suggested somehow improving the error message thrown, but that isn’t necessary anymore. A (potential) workaround would be to first persist your data to disk, then > re-partition it, then cache it. I'm not 100% sure whether that will work > though. > val a = > sc.textFile("s3n://some-path/*.json").persist(DISK_ONLY).repartition(larger > nr of partitions).cache() This didn’t work for me, by the way. I got the following in PySpark when I called a.count(): 14/08/04 18:24:14 WARN TaskSetManager: Loss was due to java.lang.IllegalArgumentException java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:415) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:341) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:508) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:96) ... This looks like a bug, no? I also tried this: a = sc.textFile('s3n://path-to-stuff/*.json').repartition(sc.defaultParallelism * 300).cache() Which gave: 14/08/04 18:20:43 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ArrayBuffer$.newBuilder(ArrayBuffer.scala:189) at scala.collection.generic.GenericTraversableTemplate$class.newBuilder(GenericTraversableTemplate.scala:64) at scala.collection.AbstractTraversable.newBuilder(Traversable.scala:105) at scala.collection.IndexedSeqOptimized$class.reverse(IndexedSeqOptimized.scala:210) at scala.collection.mutable.ArrayBuffer.reverse(ArrayBuffer.scala:47) ... So it seems that repartitioning an RDD on the fly implicitly requires that partitions fit in memory? Nick