IMHO, you are trying waaay to hard to optimize work on what is really a
small data set. 25G, even 250G, is not that much data, especially if you've
spent a month trying to get something to work that should be simple. All
these errors are from optimization attempts.

Kryo is great, but if it's not working reliably for some reason, then don't
use it. Rather than force 200 partitions, let Spark try to figure out a
good-enough number. (If you really need to force a partition count, use the
repartition method instead, unless you're overriding the partitioner.)

So. I recommend that you eliminate all the optimizations: Kryo,
partitionBy, etc. Just use the simplest code you can. Make it work first.
Then, if it really isn't fast enough, look for actual evidence of
bottlenecks and optimize those.



Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Sun, May 3, 2015 at 10:22 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:

> Hello Dean & Others,
> Thanks for your suggestions.
> I have two data sets and all i want to do is a simple equi join. I have
> 10G limit and as my dataset_1 exceeded that it was throwing OOM error.
> Hence i switched back to use .join() API instead of map-side broadcast
> join.
> I am repartitioning the data with 100,200 and i see a NullPointerException
> now.
>
> When i run against 25G of each input and with .partitionBy(new
> org.apache.spark.HashPartitioner(200)) , I see NullPointerExveption
>
>
> this trace does not include a line from my code and hence i do not what is
> causing error ?
> I do have registered kryo serializer.
>
> val conf = new SparkConf()
>       .setAppName(detail)
> *      .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")*
>       .set("spark.kryoserializer.buffer.mb",
> arguments.get("buffersize").get)
>       .set("spark.kryoserializer.buffer.max.mb",
> arguments.get("maxbuffersize").get)
>       .set("spark.driver.maxResultSize",
> arguments.get("maxResultSize").get)
>       .set("spark.yarn.maxAppAttempts", "0")
> * 
> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLeve*
> lMetricSum]))
>     val sc = new SparkContext(conf)
>
> I see the exception when this task runs
>
> val viEvents = details.map { vi => (vi.get(14).asInstanceOf[Long], vi) }
>
> Its a simple mapping of input records to (itemId, record)
>
> I found this
>
> http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist
> and
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-NPE-with-Array-td19797.html
>
> Looks like Kryo (2.21v)  changed something to stop using default
> constructors.
>
> (Kryo.DefaultInstantiatorStrategy) 
> kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new 
> StdInstantiatorStrategy());
>
>
> Please suggest
>
>
> Trace:
> 15/05/01 03:02:15 ERROR executor.Executor: Exception in task 110.1 in
> stage 2.0 (TID 774)
> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
> Serialization trace:
> values (org.apache.avro.generic.GenericData$Record)
> datum (org.apache.avro.mapred.AvroKey)
>     at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>     at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
>     at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>     at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>     at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:41)
>     at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
> Regards,
>
>
> Any suggestions.
> I am not able to get this thing to work over a month now, its kind of
> getting frustrating.
>
> On Sun, May 3, 2015 at 8:03 PM, Dean Wampler <deanwamp...@gmail.com>
> wrote:
>
>> How big is the data you're returning to the driver with collectAsMap? You
>> are probably running out of memory trying to copy too much data back to it.
>>
>> If you're trying to force a map-side join, Spark can do that for you in
>> some cases within the regular DataFrame/RDD context. See
>> http://spark.apache.org/docs/latest/sql-programming-guide.html#performance-tuning
>> and this talk by Michael Armbrust for example,
>> http://spark-summit.org/wp-content/uploads/2014/07/Performing-Advanced-Analytics-on-Relational-Data-with-Spark-SQL-Michael-Armbrust.pdf.
>>
>>
>> dean
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>> Typesafe <http://typesafe.com>
>> @deanwampler <http://twitter.com/deanwampler>
>> http://polyglotprogramming.com
>>
>> On Thu, Apr 30, 2015 at 12:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>> wrote:
>>
>>> Full Exception
>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Stage 1 (collectAsMap at
>>> VISummaryDataProvider.scala:37) failed in 884.087 s*
>>> *15/04/30 09:59:49 INFO scheduler.DAGScheduler: Job 0 failed:
>>> collectAsMap at VISummaryDataProvider.scala:37, took 1093.418249 s*
>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
>>> exception: Job aborted due to stage failure: Exception while getting task
>>> result: org.apache.spark.SparkException: Error sending message [message =
>>> GetLocations(taskresult_112)]
>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>> Exception while getting task result: org.apache.spark.SparkException: Error
>>> sending message [message = GetLocations(taskresult_112)]
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>> at scala.Option.foreach(Option.scala:236)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>>> at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
>>> at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
>>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>> 15/04/30 09:59:49 INFO yarn.ApplicationMaster: Final app status: FAILED,
>>> exitCode: 15, (reason: User class threw exception: Job aborted due to stage
>>> failure: Exception while getting task result:
>>> org.apache.spark.SparkException: Error sending message [message =
>>> GetLocations(taskresult_112)])
>>>
>>>
>>> *Code at line 37*
>>>
>>> val lstgItemMap = listings.map { lstg => (lstg.getItemId().toLong,
>>> lstg) }.collectAsMap
>>>
>>> Listing data set size is 26G (10 files) and my driver memory is 12G (I
>>> cant go beyond it). The reason i do collectAsMap is to brodcast it and do a
>>> map-side join instead of regular join.
>>>
>>>
>>> Please suggest ?
>>>
>>>
>>> On Thu, Apr 30, 2015 at 10:52 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>> wrote:
>>>
>>>> My Spark Job is failing  and i see
>>>>
>>>> ==============================
>>>>
>>>> 15/04/30 09:59:49 ERROR yarn.ApplicationMaster: User class threw
>>>> exception: Job aborted due to stage failure: Exception while getting task
>>>> result: org.apache.spark.SparkException: Error sending message [message =
>>>> GetLocations(taskresult_112)]
>>>>
>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>> Exception while getting task result: org.apache.spark.SparkException: Error
>>>> sending message [message = GetLocations(taskresult_112)]
>>>>
>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
>>>>
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
>>>>
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
>>>>
>>>> at
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>
>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
>>>>
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
>>>>
>>>> at scala.Option.foreach(Option.scala:236)
>>>>
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
>>>>
>>>>
>>>> java.util.concurrent.TimeoutException: Futures timed out after [30
>>>> seconds]
>>>>
>>>>
>>>> I see multiple of these
>>>>
>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out
>>>> after [30 seconds]
>>>>
>>>> And finally i see this
>>>> java.lang.OutOfMemoryError: Java heap space
>>>> at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>>>> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>>>> at
>>>> org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:95)
>>>> at
>>>> org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:206)
>>>> at
>>>> org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:72)
>>>> at
>>>> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:124)
>>>> at
>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93)
>>>> at
>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>>>> at
>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>> at
>>>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>> at
>>>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>>> at
>>>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>>>> at
>>>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>>>> at
>>>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>>> at
>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>>> at
>>>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>>> at io.netty.util.concurrent.SingleThreadEven
>>>>
>>>>
>>>>
>>>>
>>>> Solutions
>>>>
>>>> 1)
>>>>
>>>>       .set("spark.akka.askTimeout", "6000")
>>>>
>>>>       .set("spark.akka.timeout", "6000")
>>>>
>>>>       .set("spark.worker.timeout", "6000")
>>>>
>>>> 2)  --num-executors 96 --driver-memory 12g --driver-java-options
>>>> "-XX:MaxPermSize=10G" --executor-memory 12g --executor-cores 4
>>>>
>>>> 12G is the limit imposed by YARN cluster, I cant go beyond this.
>>>>
>>>>
>>>> ANY suggestions ?
>>>>
>>>> Regards,
>>>>
>>>> Deepak
>>>>
>>>> On Thu, Apr 30, 2015 at 6:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>>> wrote:
>>>>
>>>>> Did not work. Same problem.
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Apr 30, 2015 at 1:28 PM, Akhil Das <ak...@sigmoidanalytics.com
>>>>> > wrote:
>>>>>
>>>>>> You could try increasing your heap space explicitly. like export
>>>>>> _JAVA_OPTIONS="-Xmx10g", its not the correct approach but try.
>>>>>>
>>>>>> Thanks
>>>>>> Best Regards
>>>>>>
>>>>>> On Tue, Apr 28, 2015 at 10:35 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I have a SparkApp that runs completes in 45 mins for 5 files
>>>>>>> (5*750MB size) and it takes 16 executors to do so.
>>>>>>>
>>>>>>> I wanted to run it against 10 files of each input type (10*3 files
>>>>>>> as there are three inputs that are transformed). [Input1 = 10*750 MB,
>>>>>>> Input2=10*2.5GB, Input3 = 10*1.5G], Hence i used 32 executors.
>>>>>>>
>>>>>>> I see multiple
>>>>>>> 5/04/28 09:23:31 WARN executor.Executor: Issue communicating with
>>>>>>> driver in heartbeater
>>>>>>> org.apache.spark.SparkException: Error sending message [message =
>>>>>>> Heartbeat(22,[Lscala.Tuple2;@2e4c404a,BlockManagerId(22,
>>>>>>> phxaishdc9dn1048.stratus.phx.ebay.com, 39505))]
>>>>>>> at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
>>>>>>> at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out
>>>>>>> after [30 seconds]
>>>>>>> at
>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>>>>>> at
>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>>>>>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>>>>>> at
>>>>>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>>>>>> at scala.concurrent.Await$.result(package.scala:107)
>>>>>>> at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
>>>>>>> ... 1 more
>>>>>>>
>>>>>>>
>>>>>>> When i searched deeper, i found OOM error.
>>>>>>> 15/04/28 09:10:15 INFO storage.BlockManagerMasterActor: Removing
>>>>>>> block manager BlockManagerId(17,
>>>>>>> phxdpehdc9dn2643.stratus.phx.ebay.com, 36819)
>>>>>>> 15/04/28 09:11:26 WARN storage.BlockManagerMasterActor: Removing
>>>>>>> BlockManager BlockManagerId(9, phxaishdc9dn1783.stratus.phx.ebay.com,
>>>>>>> 48304) with no recent heart beats: 121200ms exceeds 120000ms
>>>>>>> 15/04/28 09:11:26 INFO storage.BlockManagerMasterActor: Removing
>>>>>>> block manager BlockManagerId(9,
>>>>>>> phxaishdc9dn1783.stratus.phx.ebay.com, 48304)
>>>>>>> 15/04/28 09:11:26 ERROR util.Utils: Uncaught exception in thread
>>>>>>> task-result-getter-3
>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245)
>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219)
>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242)
>>>>>>> at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
>>>>>>> at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
>>>>>>> at java.util.ArrayList.add(ArrayList.java:440)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
>>>>>>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>> at
>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>> at
>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>> Exception in thread "task-result-getter-3"
>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2245)
>>>>>>> at java.util.Arrays.copyOf(Arrays.java:2219)
>>>>>>> at java.util.ArrayList.grow(ArrayList.java:242)
>>>>>>> at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:216)
>>>>>>> at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:208)
>>>>>>> at java.util.ArrayList.add(ArrayList.java:440)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.nextReadId(MapReferenceResolver.java:33)
>>>>>>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:766)
>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>>>>> at
>>>>>>> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
>>>>>>> at
>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> LogType: stdout
>>>>>>> LogLength: 96
>>>>>>> Log Contents:
>>>>>>>
>>>>>>> hdfs://hostName:8020/sys/edw/dw_lstg_item/snapshot/2015/04/28/00/part-r-0000*
>>>>>>>
>>>>>>>
>>>>>>> Spark Command:
>>>>>>>
>>>>>>> ./bin/spark-submit -v --master yarn-cluster --driver-class-path
>>>>>>> /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
>>>>>>> --jars
>>>>>>> /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting_dep_only-1.0-SNAPSHOT-jar-with-dependencies.jar
>>>>>>> --num-executors 32 --driver-memory 12g --driver-java-options
>>>>>>> "-XX:MaxPermSize=8G" --executor-memory 12g --executor-cores 4 --queue
>>>>>>> hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
>>>>>>> /home/dvasthimal/spark1.3/1.3.1.lib/spark_reporting-1.0-SNAPSHOT.jar
>>>>>>> startDate=2015-04-6 endDate=2015-04-7
>>>>>>> input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem
>>>>>>> output=/user/dvasthimal/epdatasets/viewItem buffersize=128
>>>>>>> maxbuffersize=1068 maxResultSize=200G askTimeout=1200
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> There is 12G limit on memory that i can use as this Spark is running
>>>>>>> over YARN.
>>>>>>>
>>>>>>> Spark Version: 1.3.1
>>>>>>> Should i increase the number of executors form 32?
>>>>>>> --
>>>>>>> Deepak
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Deepak
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>
>
> --
> Deepak
>
>

Reply via email to