I've managed to spend some time digging deeper into this issue and it seems that for some cases it can be fixed pretty easily. I've raised a JIRA: https://issues.apache.org/jira/browse/SPARK-26114 ... and the corresponding pull request: https://github.com/apache/spark/pull/23083
It does not cover CoGroupedRDDs and ExternalAppendOnlyMap (aka joins) because it contains a lot more places to consider, e.g. - detectCorrupt (spark.shuffle.detectCorrupt) property of ShuffleBlockFetcherIterator, that uncompresses received blocks in-place (and stores them in memory not taking into account TaskMemoryManagers) - maxReqsInFlight (spark.reducer.maxReqsInFlight) and maxBlocksInFlightPerAddress (spark.reducer.maxBlocksInFlightPerAddress), which are "requests in flight" but not the "results in flight"; in case responses are received much faster they are processed there is a pretty good chance of OutOfMemory errors - java serializer (spark.serializer.objectStreamReset) in case of pretty big objects (e.g. a lot of records for the same key in "join" transformations) leads to OutOfMemoryErrors pretty quickly too. On Mon, Oct 15, 2018 at 4:51 PM Koert Kuipers <ko...@tresata.com> wrote: > > i realize it is unlikely all data will be local to tasks, so placement will > not be optimal and there will be some network traffic, but is this the same > as a shuffle? > > in CoalesceRDD it shows a NarrowDependency, which i thought meant it could be > implemented without a shuffle. > > On Mon, Oct 15, 2018 at 2:49 AM Jörn Franke <jornfra...@gmail.com> wrote: >> >> This is not fully correct. If you have less files then you need to move some >> data to some other nodes, because not all the data is there for writing >> (even the case for the same node, but then it is easier from a network >> perspective). Hence a shuffling is needed. >> >> >> Am 15.10.2018 um 05:04 schrieb Koert Kuipers <ko...@tresata.com>: >> >> sure, i understand currently the workaround is to add a shuffle. but that's >> just a workaround, not a satisfactory solution: we shouldn't have to >> introduce another shuffle (an expensive operation) just to reduce the number >> of files. >> >> logically all you need is a map-phase with less tasks after the reduce phase >> with many tasks to reduce the number of files, but there is currently no way >> to express this in spark. it seems the map operation always gets tagged on >> to the end of the previous reduce operation, which is generally a reasonable >> optimization, but not here since it causes the tasks for the reduce to go >> down which is unacceptable. >> >> On Sun, Oct 14, 2018 at 10:06 PM Wenchen Fan <cloud0...@gmail.com> wrote: >>> >>> You have a heavy workload, you want to run it with many tasks for better >>> performance and stability(no OMM), but you also want to run it with few >>> tasks to avoid too many small files. The reality is, mostly you can't reach >>> these 2 goals together, they conflict with each other. The solution I can >>> think of is to sacrifice performance a little: run the workload with many >>> tasks at first, and then merge the many small files. Generally this is how >>> `coalesce(n, shuffle = true)` does. >>> >>> On Sat, Oct 13, 2018 at 10:05 PM Koert Kuipers <ko...@tresata.com> wrote: >>>> >>>> we have a collection of programs in dataframe api that all do big shuffles >>>> for which we use 2048+ partitions. this works fine but it produces a lot >>>> of (small) output files, which put pressure on the memory of the drivers >>>> programs of any spark program that reads this data in again. >>>> >>>> so one of our developers stuck in a .coalesce at the end of every program >>>> just before writing to disk to reduce the output files thinking this would >>>> solve the many files issue. to his surprise the coalesce caused the >>>> existing shuffles to run with less tasks, leading to unacceptable >>>> slowdowns and OOMs. so this is not a solution. >>>> >>>> how can we insert a coalesce as a new map-phase (new job on application >>>> manager with narrow dependency) instead of modifying the existing reduce >>>> phase? i am saying map-phase because it should not introduce a new >>>> shuffle: this is wasteful and unnecessary. >>>> >>>> >>>> On Sat, Oct 13, 2018 at 1:39 AM Wenchen Fan <cloud0...@gmail.com> wrote: >>>>> >>>>> In your first example, the root RDD has 1000 partitions, then you do a >>>>> shuffle (with repartitionAndSortWithinPartitions), and shuffles data to >>>>> 1000 reducers. Then you do coalesce, which asks Spark to launch only 20 >>>>> reducers to process the data which were prepared for 10000 reducers. >>>>> since the reducers have heavy work(sorting), so you OOM. In general, your >>>>> work flow is: 1000 mappers -> 20 reducers. >>>>> >>>>> In your second example, the coalesce introduces shuffle, so your work >>>>> flow is: 1000 mappers -> 1000 reducers(also mappers) -> 20 reducers. The >>>>> sorting is done by 1000 tasks so no OOM. >>>>> >>>>> BTW have you tried DataFrame API? With Spark SQL, the memory management >>>>> is more precise, so even we only have 20 tasks to do the heavy sorting, >>>>> the system should just have more disk spills instead of OOM. >>>>> >>>>> >>>>> On Sat, Oct 13, 2018 at 11:35 AM Koert Kuipers <ko...@tresata.com> wrote: >>>>>> >>>>>> how can i get a shuffle with 2048 partitions and 2048 tasks and then a >>>>>> map phase with 10 partitions and 10 tasks that writes to hdfs? >>>>>> >>>>>> every time i try to do this using coalesce the shuffle ends up having 10 >>>>>> tasks which is unacceptable due to OOM. this makes coalesce somewhat >>>>>> useless. >>>>>> >>>>>> On Wed, Oct 10, 2018 at 9:06 AM Wenchen Fan <cloud0...@gmail.com> wrote: >>>>>>> >>>>>>> Note that, RDD partitions and Spark tasks are not always 1-1 mapping. >>>>>>> >>>>>>> Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. >>>>>>> Then `rdd2` has 10 partitions, and there is no shuffle between `rdd1` >>>>>>> and `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, >>>>>>> and this stage has 10 tasks (decided by the last RDD). This means, each >>>>>>> Spark task will process 10 partitions of `rdd1`. >>>>>>> >>>>>>> Looking at your example, I don't see where is the problem. Can you >>>>>>> describe what is not expected? >>>>>>> >>>>>>> On Wed, Oct 10, 2018 at 2:11 PM Sergey Zhemzhitsky <szh.s...@gmail.com> >>>>>>> wrote: >>>>>>>> >>>>>>>> Well, it seems that I can still extend the CoalesceRDD to make it >>>>>>>> preserve the total number of partitions from the parent RDD, reduce >>>>>>>> some partitons in the same way as the original coalesce does for >>>>>>>> map-only jobs and fill the gaps (partitions which should reside on the >>>>>>>> positions of the coalesced ones) with just a special kind of >>>>>>>> partitions which do not have any parent dependencies and always return >>>>>>>> an empty iterator. >>>>>>>> >>>>>>>> I believe this should work as desired (at least the previous >>>>>>>> ShuffleMapStage will think that the number of partitons in the next >>>>>>>> stage, it generates shuffle output for, is not changed). >>>>>>>> >>>>>>>> There are few issues though - existence of empty partitions which can >>>>>>>> be evaluated almost for free and empty output files from these empty >>>>>>>> partitons which can be beaten by means of LazyOutputFormat in case of >>>>>>>> RDDs. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Oct 8, 2018, 23:57 Koert Kuipers <ko...@tresata.com> wrote: >>>>>>>>> >>>>>>>>> although i personally would describe this as a bug the answer will be >>>>>>>>> that this is the intended behavior. the coalesce "infects" the >>>>>>>>> shuffle before it, making a coalesce useless for reducing output >>>>>>>>> files after a shuffle with many partitions b design. >>>>>>>>> >>>>>>>>> your only option left is a repartition for which you pay the price in >>>>>>>>> that it introduces another expensive shuffle. >>>>>>>>> >>>>>>>>> interestingly if you do a coalesce on a map-only job it knows how to >>>>>>>>> reduce the partitions and output files without introducing a shuffle, >>>>>>>>> so clearly it is possible, but i dont know how to get this behavior >>>>>>>>> after a shuffle in an existing job. >>>>>>>>> >>>>>>>>> On Fri, Oct 5, 2018 at 6:34 PM Sergey Zhemzhitsky >>>>>>>>> <szh.s...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>> Hello guys, >>>>>>>>>> >>>>>>>>>> Currently I'm a little bit confused with coalesce behaviour. >>>>>>>>>> >>>>>>>>>> Consider the following usecase - I'd like to join two pretty big >>>>>>>>>> RDDs. >>>>>>>>>> To make a join more stable and to prevent it from failures by OOM >>>>>>>>>> RDDs >>>>>>>>>> are usually repartitioned to redistribute data more evenly and to >>>>>>>>>> prevent every partition from hitting 2GB limit. Then after join with >>>>>>>>>> a >>>>>>>>>> lot of partitions. >>>>>>>>>> >>>>>>>>>> Then after successful join I'd like to save the resulting dataset. >>>>>>>>>> But I don't need such a huge amount of files as the number of >>>>>>>>>> partitions/tasks during joining. Actually I'm fine with such number >>>>>>>>>> of >>>>>>>>>> files as the total number of executor cores allocated to the job. So >>>>>>>>>> I've considered using a coalesce. >>>>>>>>>> >>>>>>>>>> The problem is that coalesce with shuffling disabled prevents join >>>>>>>>>> from using the specified number of partitions and instead forces join >>>>>>>>>> to use the number of partitions provided to coalesce >>>>>>>>>> >>>>>>>>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, >>>>>>>>>> false).toDebugString >>>>>>>>>> res5: String = >>>>>>>>>> (5) CoalescedRDD[15] at coalesce at <console>:25 [] >>>>>>>>>> | MapPartitionsRDD[14] at repartition at <console>:25 [] >>>>>>>>>> | CoalescedRDD[13] at repartition at <console>:25 [] >>>>>>>>>> | ShuffledRDD[12] at repartition at <console>:25 [] >>>>>>>>>> +-(20) MapPartitionsRDD[11] at repartition at <console>:25 [] >>>>>>>>>> | ParallelCollectionRDD[10] at makeRDD at <console>:25 [] >>>>>>>>>> >>>>>>>>>> With shuffling enabled everything is ok, e.g. >>>>>>>>>> >>>>>>>>>> scala> sc.makeRDD(1 to 100, 20).repartition(100).coalesce(5, >>>>>>>>>> true).toDebugString >>>>>>>>>> res6: String = >>>>>>>>>> (5) MapPartitionsRDD[24] at coalesce at <console>:25 [] >>>>>>>>>> | CoalescedRDD[23] at coalesce at <console>:25 [] >>>>>>>>>> | ShuffledRDD[22] at coalesce at <console>:25 [] >>>>>>>>>> +-(100) MapPartitionsRDD[21] at coalesce at <console>:25 [] >>>>>>>>>> | MapPartitionsRDD[20] at repartition at <console>:25 [] >>>>>>>>>> | CoalescedRDD[19] at repartition at <console>:25 [] >>>>>>>>>> | ShuffledRDD[18] at repartition at <console>:25 [] >>>>>>>>>> +-(20) MapPartitionsRDD[17] at repartition at <console>:25 [] >>>>>>>>>> | ParallelCollectionRDD[16] at makeRDD at <console>:25 [] >>>>>>>>>> >>>>>>>>>> In that case the problem is that for pretty huge datasets additional >>>>>>>>>> reshuffling can take hours or at least comparable amount of time as >>>>>>>>>> for the join itself. >>>>>>>>>> >>>>>>>>>> So I'd like to understand whether it is a bug or just an expected >>>>>>>>>> behaviour? >>>>>>>>>> In case it is expected is there any way to insert additional >>>>>>>>>> ShuffleMapStage into an appropriate position of DAG but without >>>>>>>>>> reshuffling itself? >>>>>>>>>> >>>>>>>>>> --------------------------------------------------------------------- >>>>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org >>>>>>>>>> --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org