Re: Coalesce behaviour

2018-11-19 Thread Sergey Zhemzhitsky
I've managed to spend some time digging deeper into this issue and it seems that for some cases it can be fixed pretty easily. I've raised a JIRA: https://issues.apache.org/jira/browse/SPARK-26114 ... and the corresponding pull request: https://github.com/apache/spark/pull/23083 It does not cover

Re: Coalesce behaviour

2018-10-15 Thread Koert Kuipers
i realize it is unlikely all data will be local to tasks, so placement will not be optimal and there will be some network traffic, but is this the same as a shuffle? in CoalesceRDD it shows a NarrowDependency, which i thought meant it could be implemented without a shuffle. On Mon, Oct 15, 2018 a

Re: Coalesce behaviour

2018-10-14 Thread Jörn Franke
This is not fully correct. If you have less files then you need to move some data to some other nodes, because not all the data is there for writing (even the case for the same node, but then it is easier from a network perspective). Hence a shuffling is needed. > Am 15.10.2018 um 05:04 schrie

Re: Coalesce behaviour

2018-10-14 Thread Koert Kuipers
sure, i understand currently the workaround is to add a shuffle. but that's just a workaround, not a satisfactory solution: we shouldn't have to introduce another shuffle (an expensive operation) just to reduce the number of files. logically all you need is a map-phase with less tasks after the re

Re: Coalesce behaviour

2018-10-14 Thread Wenchen Fan
You have a heavy workload, you want to run it with many tasks for better performance and stability(no OMM), but you also want to run it with few tasks to avoid too many small files. The reality is, mostly you can't reach these 2 goals together, they conflict with each other. The solution I can thin

Re: Coalesce behaviour

2018-10-13 Thread Koert Kuipers
we have a collection of programs in dataframe api that all do big shuffles for which we use 2048+ partitions. this works fine but it produces a lot of (small) output files, which put pressure on the memory of the drivers programs of any spark program that reads this data in again. so one of our de

Re: Coalesce behaviour

2018-10-13 Thread Sergey Zhemzhitsky
I've tried the same sample with DataFrame API and it's much more stable although it's backed by RDD API. This sample works without any issues and any additional Spark tuning val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) val df = rdd.map(item => item._1.toString ->

Re: Coalesce behaviour

2018-10-12 Thread Wenchen Fan
In your first example, the root RDD has 1000 partitions, then you do a shuffle (with repartitionAndSortWithinPartitions), and shuffles data to 1000 reducers. Then you do coalesce, which asks Spark to launch only 20 reducers to process the data which were prepared for 1 reducers. since the reduc

Re: Coalesce behaviour

2018-10-12 Thread Koert Kuipers
how can i get a shuffle with 2048 partitions and 2048 tasks and then a map phase with 10 partitions and 10 tasks that writes to hdfs? every time i try to do this using coalesce the shuffle ends up having 10 tasks which is unacceptable due to OOM. this makes coalesce somewhat useless. On Wed, Oct

Re: Coalesce behaviour

2018-10-12 Thread Sergey Zhemzhitsky
... sorry for that, but there is a mistake in the second sample, here is the right one // fails with either OOM or 'Container killed by YARN for exceeding memory limits ... spark.yarn.executor.memoryOverhead' rdd .map(item => item._1.toString -> item._2.toString) .repartitionAndSortWithinParti

Re: Coalesce behaviour

2018-10-12 Thread Sergey Zhemzhitsky
I'd like to reduce the number of files written to hdfs without introducing additional shuffles and at the same time to preserve the stability of the job, and also I'd like to understand why the samples below work in one case and fail in another one. Consider the following example which does the sa

Re: Coalesce behaviour

2018-10-10 Thread Wenchen Fan
Note that, RDD partitions and Spark tasks are not always 1-1 mapping. Assuming `rdd1` has 100 partitions, and `rdd2 = rdd1.coalesce(10)`. Then `rdd2` has 10 partitions, and there is no shuffle between `rdd1` and `rdd2`. During scheduling, `rdd1` and `rdd2` are in the same stage, and this stage has

Re: Coalesce behaviour

2018-10-09 Thread Sergey Zhemzhitsky
Well, it seems that I can still extend the CoalesceRDD to make it preserve the total number of partitions from the parent RDD, reduce some partitons in the same way as the original coalesce does for map-only jobs and fill the gaps (partitions which should reside on the positions of the coalesced on

Re: Coalesce behaviour

2018-10-08 Thread Koert Kuipers
although i personally would describe this as a bug the answer will be that this is the intended behavior. the coalesce "infects" the shuffle before it, making a coalesce useless for reducing output files after a shuffle with many partitions b design. your only option left is a repartition for whic