Hello Deam, If I don;t use Kryo serializer i got Serialization error and hence am using it. If I don';t use partitionBy/reparition then the simply join never completed even after 7 hours and infact as next step i need to run it against 250G as that is my full dataset size. Someone here suggested to me to use repartition.
Assuming reparition is mandatory , how do i decide whats the right number ? When i am using 400 i do not get NullPointerException that i talked about, which is strange. I never saw that exception against small random dataset but see it with 25G and again with 400 partitions , i do not see it. On Sun, May 3, 2015 at 9:15 PM, Dean Wampler <deanwamp...@gmail.com> wrote: > IMHO, you are trying waaay to hard to optimize work on what is really a > small data set. 25G, even 250G, is not that much data, especially if you've > spent a month trying to get something to work that should be simple. All > these errors are from optimization attempts. > > Kryo is great, but if it's not working reliably for some reason, then > don't use it. Rather than force 200 partitions, let Spark try to figure out > a good-enough number. (If you really need to force a partition count, use > the repartition method instead, unless you're overriding the partitioner.) > > So. I recommend that you eliminate all the optimizations: Kryo, > partitionBy, etc. Just use the simplest code you can. Make it work first. > Then, if it really isn't fast enough, look for actual evidence of > bottlenecks and optimize those. > > > > Dean Wampler, Ph.D. > Author: Programming Scala, 2nd Edition > <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) > Typesafe <http://typesafe.com> > @deanwampler <http://twitter.com/deanwampler> > http://polyglotprogramming.com > > On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> > wrote: > >> Hello Dean & Others, >> Thanks for your suggestions. >> I have two data sets and all i want to do is a simple equi join. I have >> 10G limit and as my dataset_1 exceeded that it was throwing OOM error. >> Hence i switched back to use .join() API instead of map-side broadcast >> join. >> I am repartitioning the data with 100,200 and i see a >> NullPointerException now. >> >> When i run against 25G of each input and with .partitionBy(new >> org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption >> >> >> this trace does not include a line from my code and hence i do not what >> is causing error ? >> I do have registered kryo serializer. >> >> val conf = new SparkConf() >> .setAppName(detail) >> * .set("spark.serializer", >> "org.apache.spark.serializer.KryoSerializer")* >> .set("spark.kryoserializer.buffer.mb", >> arguments.get("buffersize").get) >> .set("spark.kryoserializer.buffer.max.mb", >> arguments.get("maxbuffersize").get) >> .set("spark.driver.maxResultSize", >> arguments.get("maxResultSize").get) >> .set("spark.yarn.maxAppAttempts", "0") >> * >> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve* >> lMetricSum])) >> val sc = new SparkContext(conf) >> >> I see the exception when this task runs >> >> val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long], vi) } >> >> Its a simple mapping of input records to (itemId, record) >> >> I found this >> >> http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist >> and >> >> http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html >> >> Looks like Kryo (2.21v) changed something to stop using default >> constructors. >> >> (Kryo.DefaultInstantiatorStrategy) >> kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new >> StdInstantiatorStrategy()); >> >> >> Please suggest >> >> >> Trace: >> 15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 in >> stage 2.0 (TID 774) >> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException >> Serialization trace: >> values (org.apache.avro.generic.GenericData$Record) >> datum (org.apache.avro.mapred.AvroKey) >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >> at >> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >> at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41) >> at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >> Regards, >> >> >> Any suggestions. >> I am not able to get this thing to work over a month now, its kind of >> getting frustrating. >> >> On Sun, May 3, 2015 at 8:03 PM, Dean Wampler <deanwamp...@gmail.com> >> wrote: >> >>> How big is the data you're returning to the driver with collectAsMap? >>> You are probably running out of memory trying to copy too much data back to >>> it. >>> >>> If you're trying to force a map-side join, Spark can do that for you in >>> some cases within the regular DataFrame/RDD context. See >>> http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning >>> and this talk by Michael Armbrust for example, >>> http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf. >>> >>> >>> dean >>> >>> Dean Wampler, Ph.D. >>> Author: Programming Scala, 2nd Edition >>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) >>> Typesafe <http://typesafe.com> >>> @deanwampler <http://twitter.com/deanwampler> >>> http://polyglotprogramming.com >>> >>> On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>> wrote: >>> >>>> Full Exception >>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap >>>> at VISummaryDataProvider.scala:37) failed in 884.087 s* >>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed: >>>> collectAsMap at VISummaryDataProvider.scala:37, took 1093.418249 s* >>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw >>>> exception: Job aborted due to stage failure: Exception while getting task >>>> result: org.apache.spark.SparkException: Error sending message [message = >>>> GetLocations(taskresult_112)] >>>> org.apache.spark.SparkException: Job aborted due to stage failure: >>>> Exception while getting task result: org.apache.spark.SparkException: Error >>>> sending message [message = GetLocations(taskresult_112)] >>>> at org.apache.spark.scheduler.DAGScheduler.org >>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) >>>> at >>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) >>>> at scala.Option.foreach(Option.scala:236) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) >>>> at >>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) >>>> at >>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) >>>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>> 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app status: >>>> FAILED, exitCode: 15, (reason: User class threw exception: Job aborted due >>>> to stage failure: Exception while getting task result: >>>> org.apache.spark.SparkException: Error sending message [message = >>>> GetLocations(taskresult_112)]) >>>> >>>> >>>> *Code at line 37* >>>> >>>> val lstgItemMap = listings.map { lstg => (lstg.getItemId().toLong, >>>> lstg) }.collectAsMap >>>> >>>> Listing data set size is 26G (10 files) and my driver memory is 12G (I >>>> cant go beyond it). The reason i do collectAsMap is to brodcast it and do a >>>> map-side join instead of regular join. >>>> >>>> >>>> Please suggest ? >>>> >>>> >>>> On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>> wrote: >>>> >>>>> My Spark Job is failing and i see >>>>> >>>>> ============================== >>>>> >>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw >>>>> exception: Job aborted due to stage failure: Exception while getting task >>>>> result: org.apache.spark.SparkException: Error sending message [message = >>>>> GetLocations(taskresult_112)] >>>>> >>>>> org.apache.spark.SparkException: Job aborted due to stage failure: >>>>> Exception while getting task result: org.apache.spark.SparkException: >>>>> Error >>>>> sending message [message = GetLocations(taskresult_112)] >>>>> >>>>> at org.apache.spark.scheduler.DAGScheduler.org >>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) >>>>> >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) >>>>> >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) >>>>> >>>>> at >>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>> >>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>> >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) >>>>> >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) >>>>> >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) >>>>> >>>>> at scala.Option.foreach(Option.scala:236) >>>>> >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) >>>>> >>>>> >>>>> java.util.concurrent.TimeoutException: Futures timed out after [30 >>>>> seconds] >>>>> >>>>> >>>>> I see multiple of these >>>>> >>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out >>>>> after [30 seconds] >>>>> >>>>> And finally i see this >>>>> java.lang.OutOfMemoryError: Java heap space >>>>> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) >>>>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) >>>>> at >>>>> org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:95) >>>>> at >>>>> org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:206) >>>>> at >>>>> org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:72) >>>>> at >>>>> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:124) >>>>> at >>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93) >>>>> at >>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) >>>>> at >>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>> at >>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>> at >>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >>>>> at >>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >>>>> at >>>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) >>>>> at >>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) >>>>> at >>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >>>>> at >>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >>>>> at >>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >>>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >>>>> at io.netty.util.concurrent.SingleThreadEven >>>>> >>>>> >>>>> >>>>> >>>>> Solutions >>>>> >>>>> 1) >>>>> >>>>> .set("spark.akka.askTimeout", "6000") >>>>> >>>>> .set("spark.akka.timeout", "6000") >>>>> >>>>> .set("spark.worker.timeout", "6000") >>>>> >>>>> 2) --num-executors 96 --driver-memory 12g --driver-java-options >>>>> "-XX:MaxPermSize=10G" --executor-memory 12g --executor-cores 4 >>>>> >>>>> 12G is the limit imposed by YARN cluster, I cant go beyond this. >>>>> >>>>> >>>>> ANY suggestions ? >>>>> >>>>> Regards, >>>>> >>>>> Deepak >>>>> >>>>> On Thu, Apr 30, 2015 at 6:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>>> wrote: >>>>> >>>>>> Did not work. Same problem. >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Apr 30, 2015 at 1:28 PM, Akhil Das < >>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>> >>>>>>> You could try increasing your heap space explicitly. like export >>>>>>> _JAVA_OPTIONS="-Xmx10g", its not the correct approach but try. >>>>>>> >>>>>>> Thanks >>>>>>> Best Regards >>>>>>> >>>>>>> On Tue, Apr 28, 2015 at 10:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com >>>>>>> > wrote: >>>>>>> >>>>>>>> I have a SparkApp that runs completes in 45 mins for 5 files >>>>>>>> (5*750MB size) and it takes 16 executors to do so. >>>>>>>> >>>>>>>> I wanted to run it against 10 files of each input type (10*3 files >>>>>>>> as there are three inputs that are transformed). [Input1 = 10*750 MB, >>>>>>>> Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used 32 executors. >>>>>>>> >>>>>>>> I see multiple >>>>>>>> 5/04/28 09:23:31 WARN executor.Executor: Issue communicating with >>>>>>>> driver in heartbeater >>>>>>>> org.apache.spark.SparkException: Error sending message [message = >>>>>>>> Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22, >>>>>>>> phxaishdc9dn1048.stratus.phx.ebay.com, 39505))] >>>>>>>> at >>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) >>>>>>>> at >>>>>>>> org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) >>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out >>>>>>>> after [30 seconds] >>>>>>>> at >>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) >>>>>>>> at >>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) >>>>>>>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) >>>>>>>> at >>>>>>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) >>>>>>>> at scala.concurrent.Await$.result(package.scala:107) >>>>>>>> at >>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195) >>>>>>>> ... 1 more >>>>>>>> >>>>>>>> >>>>>>>> When i searched deeper, i found OOM error. >>>>>>>> 15/04/28 09:10:15 INFO storage.BlockManagerMasterActor: Removing >>>>>>>> block manager BlockManagerId(17, >>>>>>>> phxdpehdc9dn2643.stratus.phx.ebay.com, 36819) >>>>>>>> 15/04/28 09:11:26 WARN storage.BlockManagerMasterActor: Removing >>>>>>>> BlockManager BlockManagerId(9, >>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304) with no recent heart >>>>>>>> beats: 121200ms exceeds 120000ms >>>>>>>> 15/04/28 09:11:26 INFO storage.BlockManagerMasterActor: Removing >>>>>>>> block manager BlockManagerId(9, >>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304) >>>>>>>> 15/04/28 09:11:26 ERROR util.Utils: Uncaught exception in thread >>>>>>>> task-result-getter-3 >>>>>>>> java.lang.OutOfMemoryError: Java heap space >>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245) >>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219) >>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242) >>>>>>>> at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) >>>>>>>> at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) >>>>>>>> at java.util.ArrayList.add(ArrayList.java:440) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>> at >>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) >>>>>>>> at >>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) >>>>>>>> at >>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>>> at >>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>> Exception in thread "task-result-getter-3" >>>>>>>> java.lang.OutOfMemoryError: Java heap space >>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245) >>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219) >>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242) >>>>>>>> at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216) >>>>>>>> at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208) >>>>>>>> at java.util.ArrayList.add(ArrayList.java:440) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) >>>>>>>> at >>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) >>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >>>>>>>> at >>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) >>>>>>>> at >>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) >>>>>>>> at >>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>>> at >>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>> >>>>>>>> LogType: stdout >>>>>>>> LogLength: 96 >>>>>>>> Log Contents: >>>>>>>> >>>>>>>> hdfs://hostName:8020/sys/edw/dw_lstg_item/snapshot/2015/04/28/00/part-r-0000* >>>>>>>> >>>>>>>> >>>>>>>> Spark Command: >>>>>>>> >>>>>>>> ./bin/spark-submit -v --master yarn-cluster --driver-class-path >>>>>>>> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar >>>>>>>> --jars >>>>>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting_dep_only-1.0-SNAPSHOT-jar-with-dependencies.jar >>>>>>>> --num-executors 32 --driver-memory 12g --driver-java-options >>>>>>>> "-XX:MaxPermSize=8G" --executor-memory 12g --executor-cores 4 --queue >>>>>>>> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp >>>>>>>> /home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting-1.0-SNAPSHOT.jar >>>>>>>> startDate=2015-04-6 endDate=2015-04-7 >>>>>>>> input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem >>>>>>>> output=/user/dvasthimal/epdatasets/viewItem buffersize=128 >>>>>>>> maxbuffersize=1068 maxResultSize=200G askTimeout=1200 >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> There is 12G limit on memory that i can use as this Spark is >>>>>>>> running over YARN. >>>>>>>> >>>>>>>> Spark Version: 1.3.1 >>>>>>>> Should i increase the number of executors form 32? >>>>>>>> -- >>>>>>>> Deepak >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Deepak >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Deepak >>>>> >>>>> >>>> >>>> >>>> -- >>>> Deepak >>>> >>>> >>> >> >> >> -- >> Deepak >> >> > -- Deepak