Patrick, that was the problem. Individual partitions were too big to fit in
memory.

I also believe the Snappy compression codec in Hadoop is not splittable.
> This means that each of your JSON files is read in its entirety as one
> spark partition. If you have files that are larger than the standard block
> size (128MB), it will exacerbate this shortcoming of Spark. Incidentally,
> this means minPartitions won't help you at all here.

 My source data is just plain JSON text (i.e. no Snappy compression), so I
was able to solve the problem (i.e. get the call to cache() to work) by
increasing minPartitions to a large enough number that I guess made a
single partition small enough to fit in memory.

It’s great that the issue reported in SPARK-1777
<https://issues.apache.org/jira/browse/SPARK-1777> was identified and
fixed. It makes it so that users don’t have to think “Will this crash my
job?” before calling cache(). That’s great. Otherwise, I would’ve suggested
somehow improving the error message thrown, but that isn’t necessary
anymore.

A (potential) workaround would be to first persist your data to disk, then
> re-partition it, then cache it. I'm not 100% sure whether that will work
> though.
>  val a = 
> sc.textFile("s3n://some-path/*.json").persist(DISK_ONLY).repartition(larger
> nr of partitions).cache()

 This didn’t work for me, by the way. I got the following in PySpark when I
called a.count():

14/08/04 18:24:14 WARN TaskSetManager: Loss was due to
java.lang.IllegalArgumentException
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)
    at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:415)
    at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:341)
    at org.apache.spark.storage.BlockManager.get(BlockManager.scala:508)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:96)
    ...

This looks like a bug, no?

I also tried this:

a = sc.textFile('s3n://path-to-stuff/*.json').repartition(sc.defaultParallelism
* 300).cache()

Which gave:

14/08/04 18:20:43 WARN TaskSetManager: Loss was due to
java.lang.OutOfMemoryError
java.lang.OutOfMemoryError: GC overhead limit exceeded
    at scala.collection.mutable.ArrayBuffer$.newBuilder(ArrayBuffer.scala:189)
    at 
scala.collection.generic.GenericTraversableTemplate$class.newBuilder(GenericTraversableTemplate.scala:64)
    at scala.collection.AbstractTraversable.newBuilder(Traversable.scala:105)
    at 
scala.collection.IndexedSeqOptimized$class.reverse(IndexedSeqOptimized.scala:210)
    at scala.collection.mutable.ArrayBuffer.reverse(ArrayBuffer.scala:47)
    ...

So it seems that repartitioning an RDD on the fly implicitly requires that
partitions fit in memory?

Nick
​

Reply via email to