Well, it seems that I can still extend the CoalesceRDD to make it preserve
the total number of partitions from the parent RDD, reduce some partitons
in the same way as the original coalesce does for map-only jobs and fill
the gaps (partitions which should reside on the positions of the coalesced
ones) with just a special kind of partitions which do not have any parent
dependencies and always return an empty iterator.

I believe this should work as desired (at least the previous
ShuffleMapStage will think that the number of partitons in the next stage,
it generates shuffle output for, is not changed).

There are few issues though - existence of empty partitions which can be
evaluated almost for free and empty output files from these empty partitons
which can be beaten by means of LazyOutputFormat in case of RDDs.



On Mon, Oct 8, 2018, 23:57 Koert Kuipers <ko...@tresata.com> wrote:

> although i personally would describe this as a bug the answer will be that
> this is the intended behavior. the coalesce "infects" the shuffle before
> it, making a coalesce useless for reducing output files after a shuffle
> with many partitions b design.
>
> your only option left is a repartition for which you pay the price in that
> it introduces another expensive shuffle.
>
> interestingly if you do a coalesce on a map-only job it knows how to
> reduce the partitions and output files without introducing a shuffle, so
> clearly it is possible, but i dont know how to get this behavior after a
> shuffle in an existing job.
>
> On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <szh.s...@gmail.com>
> wrote:
>
>> Hello guys,
>>
>> Currently I'm a little bit confused with coalesce behaviour.
>>
>> Consider the following usecase - I'd like to join two pretty big RDDs.
>> To make a join more stable and to prevent it from failures by OOM RDDs
>> are usually repartitioned to redistribute data more evenly and to
>> prevent every partition from hitting 2GB limit. Then after join with a
>> lot of partitions.
>>
>> Then after successful join I'd like to save the resulting dataset.
>> But I don't need such a huge amount of files as the number of
>> partitions/tasks during joining. Actually I'm fine with such number of
>> files as the total number of executor cores allocated to the job. So
>> I've considered using a coalesce.
>>
>> The problem is that coalesce with shuffling disabled prevents join
>> from using the specified number of partitions and instead forces join
>> to use the number of partitions provided to coalesce
>>
>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
>> false).toDebugString
>> res5: String =
>> (5) CoalescedRDD[15] at coalesce at <console>:25 []
>>  |  MapPartitionsRDD[14] at repartition at <console>:25 []
>>  |  CoalescedRDD[13] at repartition at <console>:25 []
>>  |  ShuffledRDD[12] at repartition at <console>:25 []
>>  +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
>>     |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []
>>
>> With shuffling enabled everything is ok, e.g.
>>
>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
>> true).toDebugString
>> res6: String =
>> (5) MapPartitionsRDD[24] at coalesce at <console>:25 []
>>  |  CoalescedRDD[23] at coalesce at <console>:25 []
>>  |  ShuffledRDD[22] at coalesce at <console>:25 []
>>  +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
>>      |   MapPartitionsRDD[20] at repartition at <console>:25 []
>>      |   CoalescedRDD[19] at repartition at <console>:25 []
>>      |   ShuffledRDD[18] at repartition at <console>:25 []
>>      +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
>>         |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []
>>
>> In that case the problem is that for pretty huge datasets additional
>> reshuffling can take hours or at least comparable amount of time as
>> for the join itself.
>>
>> So I'd like to understand whether it is a bug or just an expected
>> behaviour?
>> In case it is expected is there any way to insert additional
>> ShuffleMapStage into an appropriate position of DAG but without
>> reshuffling itself?
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>

Reply via email to