I reduced the number of partitions to 1/4 to 76 in order to reduce the time to 1/4 (from 33 to 8) But the re-parition is still running beyond 15 mins.
@Nirmal click on details, shows the code lines and does not show why it is slow. I know that repartition is slow and want to speed it up @Sharma I have seen increasing the cores speeds up reparition, but it does slow down the rest of the stages in the job plan. I need some logical explanation and math to know before hand , otherwise with Spark am always firing in dark. Spark has been a depressingly lackluster so far (Join use case and now a simple outlier detection using TDigest) On Mon, Jul 13, 2015 at 9:37 PM, Aniruddh Sharma <asharma...@gmail.com> wrote: > Hi Deepak > > Not 100% sure , but please try increasing (--executor-cores ) to twice the > number of your physical cores on your machine. > > Thanks and Regards > Aniruddh > > On Tue, Jul 14, 2015 at 9:49 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> > wrote: > >> Its been 30 minutes and still the partitioner has not completed yet, its >> ever. >> >> Without repartition, i see this error >> https://issues.apache.org/jira/browse/SPARK-5928 >> >> >> FetchFailed(BlockManagerId(1, imran-2.ent.cloudera.com, 55028), >> shuffleId=1, mapId=0, reduceId=0, message= >> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds >> 2147483647: 3021252889 - discarded >> at >> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) >> at >> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >> at >> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >> at >> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >> >> >> >> >> On Mon, Jul 13, 2015 at 8:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >> wrote: >> >>> I have 100 MB of Avro data. and i do repartition(307) is taking forever. >>> >>> 2. val x = input.repartition(7907).map( {k1,k2,k3,k4}, {inputRecord} ) >>> 3. val quantiles = x.map( {k1,k2,k3,k4}, TDigest(inputRecord).asBytes >>> ).reduceByKey() [ This was groupBy earlier ] >>> 4. x.join(quantiles).coalesce(100).writeInAvro >>> >>> >>> Attached is full Scala code. >>> >>> I have 340 Yarn node cluster with 14G Ram on each node and have input >>> data of just just 100 MB. (Hadoop takes 2.5 hours on 1 TB dataset) >>> >>> >>> ./bin/spark-submit -v --master yarn-cluster --jars >>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar >>> --num-executors 330 --driver-memory 14g --driver-java-options >>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails >>> -XX:+PrintGCTimeStamps" --executor-memory 14g --executor-cores 1 --queue >>> hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp >>> /home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar >>> startDate=2015-06-20 endDate=2015-06-21 >>> input=/apps/hdmi-prod/b_um/epdatasets/exptsession subcommand=ppwmasterprime >>> output=/user/dvasthimal/epdatasets/ppwmasterprime buffersize=128 >>> maxbuffersize=1068 maxResultSize=200G >>> >>> >>> I see this in stdout of the task on that executor >>> >>> 15/07/13 19:58:48 WARN hdfs.BlockReaderLocal: The short-circuit local reads >>> feature cannot be used because libhadoop cannot be loaded. >>> 15/07/13 20:00:08 INFO collection.ExternalSorter: Thread 47 spilling >>> in-memory map of 2.2 GB to disk (1 time so far) >>> 15/07/13 20:01:31 INFO collection.ExternalSorter: Thread 47 spilling >>> in-memory map of 2.2 GB to disk (2 times so far) >>> 15/07/13 20:03:07 INFO collection.ExternalSorter: Thread 47 spilling >>> in-memory map of 2.2 GB to disk (3 times so far) >>> 15/07/13 20:04:32 INFO collection.ExternalSorter: Thread 47 spilling >>> in-memory map of 2.2 GB to disk (4 times so far) >>> 15/07/13 20:06:21 INFO collection.ExternalSorter: Thread 47 spilling >>> in-memory map of 2.2 GB to disk (5 times so far) >>> 15/07/13 20:08:09 INFO collection.ExternalSorter: Thread 47 spilling >>> in-memory map of 2.2 GB to disk (6 times so far) >>> 15/07/13 20:09:51 INFO collection.ExternalSorter: Thread 47 spilling >>> in-memory map of 2.2 GB to disk (7 times so far) >>> >>> >>> >>> Also attached is the thread dump >>> >>> >>> -- >>> Deepak >>> >>> >> >> >> -- >> Deepak >> >> > -- Deepak