val hrdd = sc.hadoopRDD(..)
val res =
hrdd.partitionBy(myCustomPartitioner).reduceKey(..).mapPartitionsWithIndex(
some code to save those partitions )

I'm getting OutOfMemoryErrors on the read side of partitionBy shuffle. My
custom partitioner generates over 20,000 partitions, so there are 20,000
tasks reading the shuffle files. On problems with low partitions (~ 1000),
the job completes successfully.

On my cluster, each worker gets 24 GB (SPARK_WORKER_MEMORY = 24 GB) and
each executor gets 21 GB (SPARK_MEM = 21 GB). I have tried assigning 6
cores per executor and brought it down to 3, and I still get
OutOfMemoryErrors at 20,000 partitions. I have
spark.shuffle.memoryFraction=0.5 and spark.storage.memoryFraction=0.2 since
I am not caching any RDDs.

Do those config params look reasonable for my shuffle size ? I'm not sure
what to increase - shuffle.memoryFraction or the memory that the reduce
tasks get. The latter I am guessing is whatever is left after giving
storage.memoryFraction and shuffle.memoryFraction.

Thanks,
Ameet

Reply via email to