genericRecordsAndKeys.persist(StorageLevel.MEMORY_AND_DISK) with 17 as repartitioning argument is throwing this exception:
7/13 23:26:36 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 2.0 failed 4 times, most recent failure: Lost task 14.3 in stage 2.0 (TID 37, phxaishdc9dn0725.phx.ebay.com): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:509) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:300) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114) On Mon, Jul 13, 2015 at 10:37 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote: > I stopped at 35 repartitions as it takes around 12-14 minutes. I cached a > RDD as it was used in the next two tasks. However it slowed down the > process. > > Code: > > val genericRecordsAndKeys = inputRecords.map { > > record => > > val rec = new MasterPrimeRecord(detail, record) > > var keyToOutput = new StringBuilder(""); > > dimensions.foreach { > > dim => > > keyToOutput = keyToOutput.append("_" + rec.get(dim).toString) > > } > > (keyToOutput.toString, rec) > > } > > genericRecordsAndKeys.cache > > > val quantiles = genericRecordsAndKeys > > .map { > > case (keyToOutput, rec) => > > var digest: TDigest = TDigest.createAvlTreeDigest(10) > > val fpPaidGMB = rec.get("fpPaidGMB").asInstanceOf[Double] > > digest.add(fpPaidGMB) > > var bbuf: ByteBuffer = ByteBuffer.allocate(digest.byteSize()); > > digest.asBytes(bbuf); > > (keyToOutput.toString, bbuf.array()) > > }.reduceByKey { > > case (v1, v2) => > > var tree1 = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v1 > .asInstanceOf[scala.Array[Byte]])) > > var tree2 = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v2 > .asInstanceOf[scala.Array[Byte]])) > > tree1.add(tree2) > > tree1.compress() > > var bbuf: ByteBuffer = ByteBuffer.allocate(tree1.byteSize()) > > tree1.asBytes(bbuf) > > bbuf.array > > } > > > val outputRecords: RDD[(AvroKey[MasterPrimeRecord], NullWritable)] = > genericRecordsAndKeys.join(quantiles).map { > > case (k, v) => > > val masterPrimeRec = v._1 > > val mergedTree = AVLTreeDigest.fromBytes(ByteBuffer.wrap(v._2)) > > val capVal = mergedTree.quantile(0.999) > > if (masterPrimeRec.get("fpPaidGMB").asInstanceOf[Double] > capVal) > { > > masterPrimeRec.put("fpPaidGMB", capVal) > > } > > val wrap = new AvroKey[MasterPrimeRecord](masterPrimeRec) > > (wrap, NullWritable.get) > > } > > On Mon, Jul 13, 2015 at 9:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> > wrote: > >> My guess worked fine now. The repartion took aproximately 1/4 the time as >> i reduce the number of paritions. >> And the rest of the process took 1/4 extra time but that is ok. >> >> On Mon, Jul 13, 2015 at 9:46 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >> wrote: >> >>> I reduced the number of partitions to 1/4 to 76 in order to reduce >>> the time to 1/4 (from 33 to 8) But the re-parition is still running beyond >>> 15 mins. >>> >>> @Nirmal >>> click on details, shows the code lines and does not show why it is slow. >>> I know that repartition is slow and want to speed it up >>> >>> @Sharma >>> I have seen increasing the cores speeds up reparition, but it does slow >>> down the rest of the stages in the job plan. >>> >>> >>> I need some logical explanation and math to know before hand , otherwise >>> with Spark am always firing in dark. Spark has been a depressingly >>> lackluster so far (Join use case and now a simple outlier detection using >>> TDigest) >>> >>> On Mon, Jul 13, 2015 at 9:37 PM, Aniruddh Sharma <asharma...@gmail.com> >>> wrote: >>> >>>> Hi Deepak >>>> >>>> Not 100% sure , but please try increasing (--executor-cores ) to twice >>>> the number of your physical cores on your machine. >>>> >>>> Thanks and Regards >>>> Aniruddh >>>> >>>> On Tue, Jul 14, 2015 at 9:49 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>> wrote: >>>> >>>>> Its been 30 minutes and still the partitioner has not completed yet, >>>>> its ever. >>>>> >>>>> Without repartition, i see this error >>>>> https://issues.apache.org/jira/browse/SPARK-5928 >>>>> >>>>> >>>>> FetchFailed(BlockManagerId(1, imran-2.ent.cloudera.com, 55028), >>>>> shuffleId=1, mapId=0, reduceId=0, message= >>>>> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length >>>>> exceeds 2147483647: 3021252889 - discarded >>>>> at >>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) >>>>> at >>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>>>> at >>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>> at >>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, Jul 13, 2015 at 8:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>>> wrote: >>>>> >>>>>> I have 100 MB of Avro data. and i do repartition(307) is taking >>>>>> forever. >>>>>> >>>>>> 2. val x = input.repartition(7907).map( {k1,k2,k3,k4}, {inputRecord} ) >>>>>> 3. val quantiles = >>>>>> x.map( {k1,k2,k3,k4}, TDigest(inputRecord).asBytes ).reduceByKey() [ >>>>>> This >>>>>> was groupBy earlier ] >>>>>> 4. x.join(quantiles).coalesce(100).writeInAvro >>>>>> >>>>>> >>>>>> Attached is full Scala code. >>>>>> >>>>>> I have 340 Yarn node cluster with 14G Ram on each node and have input >>>>>> data of just just 100 MB. (Hadoop takes 2.5 hours on 1 TB dataset) >>>>>> >>>>>> >>>>>> ./bin/spark-submit -v --master yarn-cluster --jars >>>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.4/lib/spark_reporting_dep_only-1.0-SNAPSHOT.jar >>>>>> --num-executors 330 --driver-memory 14g --driver-java-options >>>>>> "-XX:MaxPermSize=512M -Xmx4096M -Xms4096M -verbose:gc -XX:+PrintGCDetails >>>>>> -XX:+PrintGCTimeStamps" --executor-memory 14g --executor-cores 1 --queue >>>>>> hdmi-others --class com.ebay.ep.poc.spark.reporting.SparkApp >>>>>> /home/dvasthimal/spark1.4/lib/spark_reporting-1.0-SNAPSHOT.jar >>>>>> startDate=2015-06-20 endDate=2015-06-21 >>>>>> input=/apps/hdmi-prod/b_um/epdatasets/exptsession >>>>>> subcommand=ppwmasterprime >>>>>> output=/user/dvasthimal/epdatasets/ppwmasterprime buffersize=128 >>>>>> maxbuffersize=1068 maxResultSize=200G >>>>>> >>>>>> >>>>>> I see this in stdout of the task on that executor >>>>>> >>>>>> 15/07/13 19:58:48 WARN hdfs.BlockReaderLocal: The short-circuit local >>>>>> reads feature cannot be used because libhadoop cannot be loaded. >>>>>> 15/07/13 20:00:08 INFO collection.ExternalSorter: Thread 47 spilling >>>>>> in-memory map of 2.2 GB to disk (1 time so far) >>>>>> 15/07/13 20:01:31 INFO collection.ExternalSorter: Thread 47 spilling >>>>>> in-memory map of 2.2 GB to disk (2 times so far) >>>>>> 15/07/13 20:03:07 INFO collection.ExternalSorter: Thread 47 spilling >>>>>> in-memory map of 2.2 GB to disk (3 times so far) >>>>>> 15/07/13 20:04:32 INFO collection.ExternalSorter: Thread 47 spilling >>>>>> in-memory map of 2.2 GB to disk (4 times so far) >>>>>> 15/07/13 20:06:21 INFO collection.ExternalSorter: Thread 47 spilling >>>>>> in-memory map of 2.2 GB to disk (5 times so far) >>>>>> 15/07/13 20:08:09 INFO collection.ExternalSorter: Thread 47 spilling >>>>>> in-memory map of 2.2 GB to disk (6 times so far) >>>>>> 15/07/13 20:09:51 INFO collection.ExternalSorter: Thread 47 spilling >>>>>> in-memory map of 2.2 GB to disk (7 times so far) >>>>>> >>>>>> >>>>>> >>>>>> Also attached is the thread dump >>>>>> >>>>>> >>>>>> -- >>>>>> Deepak >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Deepak >>>>> >>>>> >>>> >>> >>> >>> -- >>> Deepak >>> >>> >> >> >> -- >> Deepak >> >> > > > -- > Deepak > > -- Deepak