although i personally would describe this as a bug the answer will be that
this is the intended behavior. the coalesce "infects" the shuffle before
it, making a coalesce useless for reducing output files after a shuffle
with many partitions b design.

your only option left is a repartition for which you pay the price in that
it introduces another expensive shuffle.

interestingly if you do a coalesce on a map-only job it knows how to reduce
the partitions and output files without introducing a shuffle, so clearly
it is possible, but i dont know how to get this behavior after a shuffle in
an existing job.

On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <szh.s...@gmail.com>
wrote:

> Hello guys,
>
> Currently I'm a little bit confused with coalesce behaviour.
>
> Consider the following usecase - I'd like to join two pretty big RDDs.
> To make a join more stable and to prevent it from failures by OOM RDDs
> are usually repartitioned to redistribute data more evenly and to
> prevent every partition from hitting 2GB limit. Then after join with a
> lot of partitions.
>
> Then after successful join I'd like to save the resulting dataset.
> But I don't need such a huge amount of files as the number of
> partitions/tasks during joining. Actually I'm fine with such number of
> files as the total number of executor cores allocated to the job. So
> I've considered using a coalesce.
>
> The problem is that coalesce with shuffling disabled prevents join
> from using the specified number of partitions and instead forces join
> to use the number of partitions provided to coalesce
>
> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
> false).toDebugString
> res5: String =
> (5) CoalescedRDD[15] at coalesce at <console>:25 []
>  |  MapPartitionsRDD[14] at repartition at <console>:25 []
>  |  CoalescedRDD[13] at repartition at <console>:25 []
>  |  ShuffledRDD[12] at repartition at <console>:25 []
>  +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
>     |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []
>
> With shuffling enabled everything is ok, e.g.
>
> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
> true).toDebugString
> res6: String =
> (5) MapPartitionsRDD[24] at coalesce at <console>:25 []
>  |  CoalescedRDD[23] at coalesce at <console>:25 []
>  |  ShuffledRDD[22] at coalesce at <console>:25 []
>  +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
>      |   MapPartitionsRDD[20] at repartition at <console>:25 []
>      |   CoalescedRDD[19] at repartition at <console>:25 []
>      |   ShuffledRDD[18] at repartition at <console>:25 []
>      +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
>         |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []
>
> In that case the problem is that for pretty huge datasets additional
> reshuffling can take hours or at least comparable amount of time as
> for the join itself.
>
> So I'd like to understand whether it is a bug or just an expected
> behaviour?
> In case it is expected is there any way to insert additional
> ShuffleMapStage into an appropriate position of DAG but without
> reshuffling itself?
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

Reply via email to