Hi,
Please help me with this problem. I would really appreciate your help !
I am using spark 1.2.0. I have a map-reduce job written in spark in the
following way:
val sumW = splittedTrainingDataRDD.map(localTrainingData => LocalSGD(w,
localTrainingData, numeratorCtEta, numitorCtEta, regularizer,
0.2).reduce((w1,w2) => {w1.add(w2); w2.clear; w1})
Here, w is trove TLongDoubleHashMap containing no more than 50 million
elements (in RAM this is ~ 15 GB). w1.add(w2) does addition of the values of
the same key, for each key of both maps.
My initial configuration is:
conf.set("spark.cores.max", "16")
conf.set("spark.akka.frameSize", "100000")
conf.set("spark.executor.memory", "120g")
conf.set("spark.reducer.maxMbInFlight", "100000")
conf.set("spark.storage.memoryFraction", "0.9")
conf.set("spark.shuffle.file.buffer.kb", "1000")
conf.set("spark.broadcast.factory",
"org.apache.spark.broadcast.HttpBroadcastFactory")
conf.set("spark.driver.maxResultSize", "120g")
val sc = new SparkContext(conf)
I am running this on a cluster with 8 machines, each machine has 16 cores
and 130 GB RAM.
My spark-env.sh contains:
ulimit -n 200000
SPARK_JAVA_OPTS="-Xms120G -Xmx120G -XX:-UseGCOverheadLimit
-XX:-UseCompressedOops"
SPARK_DRIVER_MEMORY=120G
The error I get is at the reducer above (the reducer above is in file called
Learning.scala, line 313):
15/01/18 14:35:52 ERROR ActorSystemImpl: Uncaught fatal error from thread
[sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem
[sparkDriver]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.util.Arrays.copyOf(Arrays.java:2271)
at
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:836)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778)
at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1389)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/01/18 14:35:52 INFO DAGScheduler: Job 2 failed: reduce at
Learning.scala:313, took 54.657239 s
Exception in thread "main" org.apache.spark.SparkException: Job cancelled
because SparkContext was shut down
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428)
at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375)
at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
at akka.actor.ActorCell.terminate(ActorCell.scala:369)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/01/18 14:35:52 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
down remote daemon.
15/01/18 14:35:52 INFO RemoteActorRefProvider$RemotingTerminator: Remote
daemon shut down; proceeding with flushing remote transports.
Thank you a lot for your suggestions!!
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Reducer-memory-exceeded-tp21221.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]