how can i get a shuffle with 2048 partitions and 2048 tasks and then a map
phase with 10 partitions and 10 tasks that writes to hdfs?

every time i try to do this using coalesce the shuffle ends up having 10
tasks which is unacceptable due to OOM. this makes coalesce somewhat
useless.

On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan <cloud0...@gmail.com> wrote:

> Note that, RDD partitions and Spark tasks are not always 1-1 mapping.
>
> Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then
> `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and
> `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and
> this stage has 10 tasks (decided by the last RDD). This means, each Spark
> task will process 10 partitions of `rdd1`.
>
> Looking at your example, I don't see where is the problem. Can you
> describe what is not expected?
>
> On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky <szh.s...@gmail.com>
> wrote:
>
>> Well, it seems that I can still extend the CoalesceRDD to make it
>> preserve the total number of partitions from the parent RDD, reduce some
>> partitons in the same way as the original coalesce does for map-only jobs
>> and fill the gaps (partitions which should reside on the positions of the
>> coalesced ones) with just a special kind of partitions which do not have
>> any parent dependencies and always return an empty iterator.
>>
>> I believe this should work as desired (at least the previous
>> ShuffleMapStage will think that the number of partitons in the next stage,
>> it generates shuffle output for, is not changed).
>>
>> There are few issues though - existence of empty partitions which can be
>> evaluated almost for free and empty output files from these empty partitons
>> which can be beaten by means of LazyOutputFormat in case of RDDs.
>>
>>
>>
>> On Mon, Oct 8, 2018, 23:57 Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> although i personally would describe this as a bug the answer will be
>>> that this is the intended behavior. the coalesce "infects" the shuffle
>>> before it, making a coalesce useless for reducing output files after a
>>> shuffle with many partitions b design.
>>>
>>> your only option left is a repartition for which you pay the price in
>>> that it introduces another expensive shuffle.
>>>
>>> interestingly if you do a coalesce on a map-only job it knows how to
>>> reduce the partitions and output files without introducing a shuffle, so
>>> clearly it is possible, but i dont know how to get this behavior after a
>>> shuffle in an existing job.
>>>
>>> On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky <szh.s...@gmail.com>
>>> wrote:
>>>
>>>> Hello guys,
>>>>
>>>> Currently I'm a little bit confused with coalesce behaviour.
>>>>
>>>> Consider the following usecase - I'd like to join two pretty big RDDs.
>>>> To make a join more stable and to prevent it from failures by OOM RDDs
>>>> are usually repartitioned to redistribute data more evenly and to
>>>> prevent every partition from hitting 2GB limit. Then after join with a
>>>> lot of partitions.
>>>>
>>>> Then after successful join I'd like to save the resulting dataset.
>>>> But I don't need such a huge amount of files as the number of
>>>> partitions/tasks during joining. Actually I'm fine with such number of
>>>> files as the total number of executor cores allocated to the job. So
>>>> I've considered using a coalesce.
>>>>
>>>> The problem is that coalesce with shuffling disabled prevents join
>>>> from using the specified number of partitions and instead forces join
>>>> to use the number of partitions provided to coalesce
>>>>
>>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
>>>> false).toDebugString
>>>> res5: String =
>>>> (5) CoalescedRDD[15] at coalesce at <console>:25 []
>>>>  |  MapPartitionsRDD[14] at repartition at <console>:25 []
>>>>  |  CoalescedRDD[13] at repartition at <console>:25 []
>>>>  |  ShuffledRDD[12] at repartition at <console>:25 []
>>>>  +-(20) MapPartitionsRDD[11] at repartition at <console>:25 []
>>>>     |   ParallelCollectionRDD[10] at makeRDD at <console>:25 []
>>>>
>>>> With shuffling enabled everything is ok, e.g.
>>>>
>>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5,
>>>> true).toDebugString
>>>> res6: String =
>>>> (5) MapPartitionsRDD[24] at coalesce at <console>:25 []
>>>>  |  CoalescedRDD[23] at coalesce at <console>:25 []
>>>>  |  ShuffledRDD[22] at coalesce at <console>:25 []
>>>>  +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 []
>>>>      |   MapPartitionsRDD[20] at repartition at <console>:25 []
>>>>      |   CoalescedRDD[19] at repartition at <console>:25 []
>>>>      |   ShuffledRDD[18] at repartition at <console>:25 []
>>>>      +-(20) MapPartitionsRDD[17] at repartition at <console>:25 []
>>>>         |   ParallelCollectionRDD[16] at makeRDD at <console>:25 []
>>>>
>>>> In that case the problem is that for pretty huge datasets additional
>>>> reshuffling can take hours or at least comparable amount of time as
>>>> for the join itself.
>>>>
>>>> So I'd like to understand whether it is a bug or just an expected
>>>> behaviour?
>>>> In case it is expected is there any way to insert additional
>>>> ShuffleMapStage into an appropriate position of DAG but without
>>>> reshuffling itself?
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>
>>>>

Reply via email to