Thanks. At what conditions number of partitions can be higher than minPartitions when reading textFile? Should this be considered as unfrequent situation?
To sum up - is there more efficient way to ensure exact number of partitions than following: rdd = sc.textFile("perf_test1.csv", minPartitions=128) if (rdd.getNumPartitions > 128) rdd.repartition(128) ? On 31 May 2016 at 16:11, Takeshi Yamamuro <linguin....@gmail.com> wrote: > `coalesce` without shuffling can only set fewer partitions than its > parent RDD. > As Ted said, you need to set true in shuffle, or you need to use > `RDD#repartition`. > > // maropu > > On Tue, May 31, 2016 at 11:02 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Value for shuffle is false by default. >> >> Have you tried setting it to true ? >> >> Which Spark release are you using ? >> >> On Tue, May 31, 2016 at 6:13 AM, Maciej Sokołowski <matemac...@gmail.com> >> wrote: >> >>> Hello Spark users and developers. >>> >>> I read file and want to ensure that it has exact number of partitions, >>> for example 128. >>> >>> In documentation I found: >>> >>> def textFile(path: String, minPartitions: Int = defaultMinPartitions): >>> RDD[String] >>> >>> But argument here is minimal number of partitions, so I use coalesce to >>> ensure desired number of partitions: >>> >>> def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: >>> Ordering[T] = null): RDD[T] >>> //Return a new RDD that is reduced into numPartitions partitions. >>> >>> So I combine them and get number of partitions lower than expected: >>> >>> scala> sc.textFile("perf_test1.csv", >>> minPartitions=128).coalesce(128).getNumPartitions >>> res14: Int = 126 >>> >>> Is this expected behaviour? File contains 100000 lines, size of >>> partitions before and after coalesce: >>> >>> scala> sc.textFile("perf_test1.csv", >>> minPartitions=128).mapPartitions{rows => Iterator(rows.length)}.collect() >>> res16: Array[Int] = Array(782, 781, 782, 781, 781, 782, 781, 781, 781, >>> 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, >>> 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782, >>> 781, 781, 781, 782, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, >>> 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, >>> 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, >>> 781, 782, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, >>> 781, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 782, 781, 781, 781, >>> 781, 782, 781, 781, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781) >>> >>> scala> sc.textFile("perf_test1.csv", >>> minPartitions=128).coalesce(128).mapPartitions{rows => >>> Iterator(rows.length)}.collect() >>> res15: Array[Int] = Array(1563, 781, 781, 781, 782, 781, 781, 781, 781, >>> 782, 781, 781, 781, 781, 782, 781, 781, 781, 781, 781, 782, 781, 781, 781, >>> 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, >>> 781, 782, 781, 781, 781, 781, 1563, 782, 781, 781, 782, 781, 781, 781, 781, >>> 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 782, 781, 781, >>> 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782, 781, >>> 781, 781, 782, 781, 781, 782, 781, 781, 782, 781, 781, 781, 781, 782, 781, >>> 781, 781, 782, 781, 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 782, >>> 781, 781, 782, 781, 781, 781, 781, 782, 781, 781, 781, 782) >>> >>> So two partitions are double the size. Is this expected behaviour or is >>> it some kind of bug? >>> >>> Thanks, >>> Maciej Sokołowski >>> >> >> > > > -- > --- > Takeshi Yamamuro >