No problem, Antony. ML lib is tricky! I'd love to chat with you about your use case - sounds like we're working on similar problems/scales. On Fri, Feb 20, 2015 at 1:55 PM Xiangrui Meng <men...@gmail.com> wrote:
> Hi Antony, Is it easy for you to try Spark 1.3.0 or master? The ALS > performance should be improved in 1.3.0. -Xiangrui > > On Fri, Feb 20, 2015 at 1:32 PM, Antony Mayi > <antonym...@yahoo.com.invalid> wrote: > > Hi Ilya, > > > > thanks for your insight, this was the right clue. I had default > parallelism > > already set but it was quite low (hundreds) and moreover the number of > > partitions of the input RDD was low as well so the chunks were really too > > big. Increased parallelism and repartitioning seems to be helping... > > > > Thanks! > > Antony. > > > > > > On Thursday, 19 February 2015, 16:45, Ilya Ganelin <ilgan...@gmail.com> > > wrote: > > > > > > > > Hi Anthony - you are seeing a problem that I ran into. The underlying > issue > > is your default parallelism setting. What's happening is that within ALS > > certain RDD operations end up changing the number of partitions you have > of > > your data. For example if you start with an RDD of 300 partitions, unless > > default parallelism is set while the algorithm executes you'll eventually > > get an RDD with something like 20 partitions. Consequently, your giant > data > > set is now stored across a much smaller number of partitions so each > > partition is huge. Then, when a shuffle requires serialization you run > out > > of heap space trying to serialize it. The solution should be as simple as > > setting the default parallelism setting. > > > > This is referenced in a JIRA I can't find at the moment. > > On Thu, Feb 19, 2015 at 5:10 AM Antony Mayi <antonym...@yahoo.com.invalid > > > > wrote: > > > > now with reverted spark.shuffle.io.preferDirectBufs (to true) getting > again > > GC overhead limit exceeded: > > > > === spark stdout === > > 15/02/19 12:08:08 WARN scheduler.TaskSetManager: Lost task 7.0 in stage > 18.0 > > (TID 5329, 192.168.1.93): java.lang.OutOfMemoryError: GC overhead limit > > exceeded > > at > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) > > at > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > > at > > java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:1801) > > at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > > at java.io.ObjectInputStream.readObject(ObjectInputStream. > java:371) > > at > > org.apache.spark.serializer.JavaDeserializationStream. > readObject(JavaSerializer.scala:62) > > > > === yarn log (same) === > > 15/02/19 12:08:08 ERROR executor.Executor: Exception in task 7.0 in stage > > 18.0 (TID 5329) > > java.lang.OutOfMemoryError: GC overhead limit exceeded > > at > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) > > at > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > > at > > java.io.ObjectInputStream.readOrdinaryObject( > ObjectInputStream.java:1801) > > at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > > at java.io.ObjectInputStream.readObject(ObjectInputStream. > java:371) > > at > > org.apache.spark.serializer.JavaDeserializationStream. > readObject(JavaSerializer.scala:62) > > > > === yarn nodemanager === > > 2015-02-19 12:08:13,758 INFO > > org.apache.hadoop.yarn.server.nodemanager.containermanager. > monitor.ContainersMonitorImpl: > > Memory usage of ProcessTree 19014 for container-id > > container_1424204221358_0013_01_000012: 29.8 GB of 32 GB physical memory > > used; 31.7 GB of 67.2 GB virtual memory used > > 2015-02-19 12:08:13,778 INFO > > org.apache.hadoop.yarn.server.nodemanager.containermanager. > monitor.ContainersMonitorImpl: > > Memory usage of ProcessTree 19013 for container-id > > container_1424204221358_0013_01_000008: 1.2 MB of 32 GB physical memory > > used; 103.6 MB of 67.2 GB virtual memory used > > 2015-02-19 12:08:14,455 WARN > > org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit > > code from container container_1424204221358_0013_01_000008 is : 143 > > 2015-02-19 12:08:14,455 INFO > > org.apache.hadoop.yarn.server.nodemanager.containermanager. > container.Container: > > Container container_1424204221358_0013_01_000008 transitioned from > RUNNING > > to EXITED_WITH_FAILURE > > 2015-02-19 12:08:14,455 INFO > > org.apache.hadoop.yarn.server.nodemanager.containermanager. > launcher.ContainerLaunch: > > Cleaning up container container_1424204221358_0013_01_000008 > > > > Antony. > > > > > > > > > > On Thursday, 19 February 2015, 11:54, Antony Mayi > > <antonym...@yahoo.com.INVALID> wrote: > > > > > > > > it is from within the ALS.trainImplicit() call. btw. the exception varies > > between this "GC overhead limit exceeded" and "Java heap space" (which I > > guess is just different outcome of same problem). > > > > just tried another run and here are the logs (filtered) - note I tried > this > > run with spark.shuffle.io.preferDirectBufs=false so this might be > slightly > > different issue from my previous case (going to revert now): > > > > === spark stdout === > > 15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: Removing > > BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent heart > > beats: 50221ms exceeds 45000ms > > 15/02/19 10:16:05 WARN storage.BlockManagerMasterActor: Removing > > BlockManager BlockManagerId(13, 192.168.1.90, 56768) with no recent heart > > beats: 54749ms exceeds 45000ms > > 15/02/19 10:16:44 ERROR cluster.YarnClientClusterScheduler: Lost > executor 6 > > on 192.168.1.92: remote Akka client disassociated > > 15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 57.0 in stage > > 18.0 (TID 5379, 192.168.1.92): ExecutorLostFailure (executor 6 lost) > > 15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 32.0 in stage > > 18.0 (TID 5354, 192.168.1.92): ExecutorLostFailure (executor 6 lost) > > 15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 82.0 in stage > > 18.0 (TID 5404, 192.168.1.92): ExecutorLostFailure (executor 6 lost) > > 15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 7.0 in stage > 18.0 > > (TID 5329, 192.168.1.92): ExecutorLostFailure (executor 6 lost) > > 15/02/19 10:16:44 ERROR cluster.YarnClientSchedulerBackend: Asked to > remove > > non-existent executor 6 > > 15/02/19 10:16:54 WARN scheduler.TaskSetManager: Lost task 6.0 in stage > 18.0 > > (TID 5328, 192.168.1.90): FetchFailed(BlockManagerId(6, 192.168.1.92, > > 54289), shuffleId=6, mapId=227, reduceId=6, message= > > org.apache.spark.shuffle.FetchFailedException: Failed to connect to > > /192.168.1.92:54289 > > at > > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$ > apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1( > BlockStoreShuffleFetcher.scala:67) > > at > > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply( > BlockStoreShuffleFetcher.scala:83) > > at > > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply( > BlockStoreShuffleFetcher.scala:83) > > at scala.collection.Iterator$$anon$13.hasNext(Iterator. > scala:371) > > > > === yarn log === > > 15/02/19 10:15:05 WARN executor.Executor: Told to re-register on > heartbeat > > 15/02/19 10:16:02 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED > > SIGNAL 15: SIGTERM > > 15/02/19 10:16:02 WARN server.TransportChannelHandler: Exception in > > connection from /192.168.1.92:45633 > > io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError: > Java > > heap space > > at > > io.netty.handler.codec.ByteToMessageDecoder.callDecode( > ByteToMessageDecoder.java:280) > > at > > io.netty.handler.codec.ByteToMessageDecoder.channelRead( > ByteToMessageDecoder.java:149) > > at > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead( > AbstractChannelHandlerContext.java:333) > > > > === yarn nodemanager log === > > 2015-02-19 10:16:45,146 INFO > > org.apache.hadoop.yarn.server.nodemanager.containermanager. > monitor.ContainersMonitorImpl: > > Memory usage of ProcessTree 20284 for container-id > > container_1424204221358_0012_01_000016: > > 28.5 GB of 32 GB physical memory used; 29.1 GB of 67.2 GB virtual memory > > used > > 2015-02-19 10:16:45,163 INFO > > org.apache.hadoop.yarn.server.nodemanager.containermanager. > monitor.ContainersMonitorImpl: > > Memory usage of ProcessTree 20273 for container-id > > container_1424204221358_0012_01_000020: > > 28.5 GB of 32 GB physical memory used; 29.2 GB of 67.2 GB virtual memory > > used > > 2015-02-19 10:16:46,621 WARN > > org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit > > code from container container_1424204221358_0012_01_000008 is : 143 > > 2015-02-19 10:16:46,621 INFO > > org.apache.hadoop.yarn.server.nodemanager.containermanager. > container.Container: > > Container container_1424204221358_0012_01_000008 transitioned from > RUNNING > > to EXITED_WITH_FAILURE > > 2015-02-19 10:16:46,621 INFO > > org.apache.hadoop.yarn.server.nodemanager.containermanager. > launcher.ContainerLaunch: > > Cleaning up container container_1424204221358_0012_01_000008 > > > > > > thanks for any help, > > Antony. > > > > > > > > > > > > > > > > > > > > ps. could that be Java 8 related? > > > > > > On Thursday, 19 February 2015, 11:25, Sean Owen <so...@cloudera.com> > wrote: > > > > > > > > Oh OK you are saying you are requesting 25 executors and getting them, > > got it. You can consider making fewer, bigger executors to pool rather > > than split up your memory, but at some point it becomes > > counter-productive. 32GB is a fine executor size. > > > > So you have ~8GB available per task which seems like plenty. Something > > else is at work here. Is this error form your code's stages or ALS? > > > > On Thu, Feb 19, 2015 at 10:07 AM, Antony Mayi <antonym...@yahoo.com> > wrote: > >> based on spark UI I am running 25 executors for sure. why would you > expect > >> four? I submit the task with --num-executors 25 and I get 6-7 executors > >> running per host (using more of smaller executors allows me better > cluster > >> utilization when running parallel spark sessions (which is not the case > of > >> this reported issue - for now using the cluster exclusively)). > >> > >> thx, > >> Antony. > >> > >> > >> On Thursday, 19 February 2015, 11:02, Sean Owen <so...@cloudera.com> > >> wrote: > >> > >> > >> > >> This should result in 4 executors, not 25. They should be able to > >> execute 4*4 = 16 tasks simultaneously. You have them grab 4*32 = 128GB > >> of RAM, not 1TB. > >> > >> It still feels like this shouldn't be running out of memory, not by a > >> long shot though. But just pointing out potential differences between > >> what you are expecting and what you are configuring. > >> > >> On Thu, Feb 19, 2015 at 9:56 AM, Antony Mayi > >> <antonym...@yahoo.com.invalid> wrote: > >>> Hi, > >>> > >>> I have 4 very powerful boxes (256GB RAM, 32 cores each). I am running > >>> spark > >>> 1.2.0 in yarn-client mode with following layout: > >>> > >>> spark.executor.cores=4 > >>> spark.executor.memory=28G > >>> spark.yarn.executor.memoryOverhead=4096 > >>> > >>> I am submitting bigger ALS trainImplicit task (rank=100, iters=15) on a > >>> dataset with ~3 billion of ratings using 25 executors. At some point > some > >>> executor crashes with: > >>> > >>> 15/02/19 05:41:06 WARN util.AkkaUtils: Error sending message in 1 > >>> attempts > >>> java.util.concurrent.TimeoutException: Futures timed out after [30 > >>> seconds] > >>> at > >>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > >>> at > >>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > >>> at > >>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) > >>> at > >>> > >>> > >>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn( > BlockContext.scala:53) > >>> at scala.concurrent.Await$.result(package.scala:107) > >>> at > >>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) > >>> at > >>> org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) > >>> 15/02/19 05:41:06 ERROR executor.Executor: Exception in task 131.0 in > >>> stage > >>> 51.0 (TID 7259) > >>> java.lang.OutOfMemoryError: GC overhead limit exceeded > >>> at java.lang.reflect.Array.newInstance(Array.java:75) > >>> at > >>> java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671) > >>> at > >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) > >>> at > >>> java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) > >>> at > >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) > >>> > >>> So the GC overhead limit exceeded is pretty clear and would suggest > >>> running > >>> out of memory. Since I have 1TB of RAM available this must be rather > due > >>> to > >>> some config inoptimality. > >>> > >>> Can anyone please point me to some directions how to tackle this? > >>> > >>> Thanks, > >>> Antony. > > > >> > >> > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > > For additional commands, e-mail: user-h...@spark.apache.org > > > > > > > > > > > > > > > > >