val hrdd = sc.hadoopRDD(..) val res = hrdd.partitionBy(myCustomPartitioner).reduceKey(..).mapPartitionsWithIndex( some code to save those partitions )
I'm getting OutOfMemoryErrors on the read side of partitionBy shuffle. My custom partitioner generates over 20,000 partitions, so there are 20,000 tasks reading the shuffle files. On problems with low partitions (~ 1000), the job completes successfully. On my cluster, each worker gets 24 GB (SPARK_WORKER_MEMORY = 24 GB) and each executor gets 21 GB (SPARK_MEM = 21 GB). I have tried assigning 6 cores per executor and brought it down to 3, and I still get OutOfMemoryErrors at 20,000 partitions. I have spark.shuffle.memoryFraction=0.5 and spark.storage.memoryFraction=0.2 since I am not caching any RDDs. Do those config params look reasonable for my shuffle size ? I'm not sure what to increase - shuffle.memoryFraction or the memory that the reduce tasks get. The latter I am guessing is whatever is left after giving storage.memoryFraction and shuffle.memoryFraction. Thanks, Ameet