although i personally would describe this as a bug the answer will be that this is the intended behavior. the coalesce "infects" the shuffle before it, making a coalesce useless for reducing output files after a shuffle with many partitions b design.
your only option left is a repartition for which you pay the price in that it introduces another expensive shuffle. interestingly if you do a coalesce on a map-only job it knows how to reduce the partitions and output files without introducing a shuffle, so clearly it is possible, but i dont know how to get this behavior after a shuffle in an existing job. On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <szh.s...@gmail.com> wrote: > Hello guys, > > Currently I'm a little bit confused with coalesce behaviour. > > Consider the following usecase - I'd like to join two pretty big RDDs. > To make a join more stable and to prevent it from failures by OOM RDDs > are usually repartitioned to redistribute data more evenly and to > prevent every partition from hitting 2GB limit. Then after join with a > lot of partitions. > > Then after successful join I'd like to save the resulting dataset. > But I don't need such a huge amount of files as the number of > partitions/tasks during joining. Actually I'm fine with such number of > files as the total number of executor cores allocated to the job. So > I've considered using a coalesce. > > The problem is that coalesce with shuffling disabled prevents join > from using the specified number of partitions and instead forces join > to use the number of partitions provided to coalesce > > scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, > false).toDebugString > res5: String = > (5) CoalescedRDD[15] at coalesce at <console>:25 [] > | MapPartitionsRDD[14] at repartition at <console>:25 [] > | CoalescedRDD[13] at repartition at <console>:25 [] > | ShuffledRDD[12] at repartition at <console>:25 [] > +-(20) MapPartitionsRDD[11] at repartition at <console>:25 [] > | ParallelCollectionRDD[10] at makeRDD at <console>:25 [] > > With shuffling enabled everything is ok, e.g. > > scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, > true).toDebugString > res6: String = > (5) MapPartitionsRDD[24] at coalesce at <console>:25 [] > | CoalescedRDD[23] at coalesce at <console>:25 [] > | ShuffledRDD[22] at coalesce at <console>:25 [] > +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 [] > | MapPartitionsRDD[20] at repartition at <console>:25 [] > | CoalescedRDD[19] at repartition at <console>:25 [] > | ShuffledRDD[18] at repartition at <console>:25 [] > +-(20) MapPartitionsRDD[17] at repartition at <console>:25 [] > | ParallelCollectionRDD[16] at makeRDD at <console>:25 [] > > In that case the problem is that for pretty huge datasets additional > reshuffling can take hours or at least comparable amount of time as > for the join itself. > > So I'd like to understand whether it is a bug or just an expected > behaviour? > In case it is expected is there any way to insert additional > ShuffleMapStage into an appropriate position of DAG but without > reshuffling itself? > > --------------------------------------------------------------------- > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >