No problem, Antony. ML lib is tricky! I'd love to chat with you about your
use case - sounds like we're working on similar problems/scales.
On Fri, Feb 20, 2015 at 1:55 PM Xiangrui Meng <men...@gmail.com> wrote:

> Hi Antony, Is it easy for you to try Spark 1.3.0 or master? The ALS
> performance should be improved in 1.3.0. -Xiangrui
>
> On Fri, Feb 20, 2015 at 1:32 PM, Antony Mayi
> <antonym...@yahoo.com.invalid> wrote:
> > Hi Ilya,
> >
> > thanks for your insight, this was the right clue. I had default
> parallelism
> > already set but it was quite low (hundreds) and moreover the number of
> > partitions of the input RDD was low as well so the chunks were really too
> > big. Increased parallelism and repartitioning seems to be helping...
> >
> > Thanks!
> > Antony.
> >
> >
> > On Thursday, 19 February 2015, 16:45, Ilya Ganelin <ilgan...@gmail.com>
> > wrote:
> >
> >
> >
> > Hi Anthony - you are seeing a problem that I ran into. The underlying
> issue
> > is your default parallelism setting. What's happening is that within ALS
> > certain RDD operations end up changing the number of partitions you have
> of
> > your data. For example if you start with an RDD of 300 partitions, unless
> > default parallelism is set while the algorithm executes you'll eventually
> > get an RDD with something like 20 partitions. Consequently, your giant
> data
> > set is now stored across a much smaller number of partitions so each
> > partition is huge. Then, when a shuffle requires serialization you run
> out
> > of heap space trying to serialize it. The solution should be as simple as
> > setting the default parallelism setting.
> >
> > This is referenced in a JIRA I can't find at the moment.
> > On Thu, Feb 19, 2015 at 5:10 AM Antony Mayi <antonym...@yahoo.com.invalid
> >
> > wrote:
> >
> > now with reverted spark.shuffle.io.preferDirectBufs (to true) getting
> again
> > GC overhead limit exceeded:
> >
> > === spark stdout ===
> > 15/02/19 12:08:08 WARN scheduler.TaskSetManager: Lost task 7.0 in stage
> 18.0
> > (TID 5329, 192.168.1.93): java.lang.OutOfMemoryError: GC overhead limit
> > exceeded
> >         at
> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
> >         at
> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> >         at
> > java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> >         at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> >         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> java:371)
> >         at
> > org.apache.spark.serializer.JavaDeserializationStream.
> readObject(JavaSerializer.scala:62)
> >
> > === yarn log (same) ===
> > 15/02/19 12:08:08 ERROR executor.Executor: Exception in task 7.0 in stage
> > 18.0 (TID 5329)
> > java.lang.OutOfMemoryError: GC overhead limit exceeded
> >         at
> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
> >         at
> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> >         at
> > java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:1801)
> >         at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> >         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> java:371)
> >         at
> > org.apache.spark.serializer.JavaDeserializationStream.
> readObject(JavaSerializer.scala:62)
> >
> > === yarn nodemanager ===
> > 2015-02-19 12:08:13,758 INFO
> > org.apache.hadoop.yarn.server.nodemanager.containermanager.
> monitor.ContainersMonitorImpl:
> > Memory usage of ProcessTree 19014 for container-id
> > container_1424204221358_0013_01_000012: 29.8 GB of 32 GB physical memory
> > used; 31.7 GB of 67.2 GB virtual memory used
> > 2015-02-19 12:08:13,778 INFO
> > org.apache.hadoop.yarn.server.nodemanager.containermanager.
> monitor.ContainersMonitorImpl:
> > Memory usage of ProcessTree 19013 for container-id
> > container_1424204221358_0013_01_000008: 1.2 MB of 32 GB physical memory
> > used; 103.6 MB of 67.2 GB virtual memory used
> > 2015-02-19 12:08:14,455 WARN
> > org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
> > code from container container_1424204221358_0013_01_000008 is : 143
> > 2015-02-19 12:08:14,455 INFO
> > org.apache.hadoop.yarn.server.nodemanager.containermanager.
> container.Container:
> > Container container_1424204221358_0013_01_000008 transitioned from
> RUNNING
> > to EXITED_WITH_FAILURE
> > 2015-02-19 12:08:14,455 INFO
> > org.apache.hadoop.yarn.server.nodemanager.containermanager.
> launcher.ContainerLaunch:
> > Cleaning up container container_1424204221358_0013_01_000008
> >
> > Antony.
> >
> >
> >
> >
> > On Thursday, 19 February 2015, 11:54, Antony Mayi
> > <antonym...@yahoo.com.INVALID> wrote:
> >
> >
> >
> > it is from within the ALS.trainImplicit() call. btw. the exception varies
> > between this "GC overhead limit exceeded" and "Java heap space" (which I
> > guess is just different outcome of same problem).
> >
> > just tried another run and here are the logs (filtered) - note I tried
> this
> > run with spark.shuffle.io.preferDirectBufs=false so this might be
> slightly
> > different issue from my previous case (going to revert now):
> >
> > === spark stdout ===
> > 15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: Removing
> > BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent heart
> > beats: 50221ms exceeds 45000ms
> > 15/02/19 10:16:05 WARN storage.BlockManagerMasterActor: Removing
> > BlockManager BlockManagerId(13, 192.168.1.90, 56768) with no recent heart
> > beats: 54749ms exceeds 45000ms
> > 15/02/19 10:16:44 ERROR cluster.YarnClientClusterScheduler: Lost
> executor 6
> > on 192.168.1.92: remote Akka client disassociated
> > 15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 57.0 in stage
> > 18.0 (TID 5379, 192.168.1.92): ExecutorLostFailure (executor 6 lost)
> > 15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 32.0 in stage
> > 18.0 (TID 5354, 192.168.1.92): ExecutorLostFailure (executor 6 lost)
> > 15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 82.0 in stage
> > 18.0 (TID 5404, 192.168.1.92): ExecutorLostFailure (executor 6 lost)
> > 15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 7.0 in stage
> 18.0
> > (TID 5329, 192.168.1.92): ExecutorLostFailure (executor 6 lost)
> > 15/02/19 10:16:44 ERROR cluster.YarnClientSchedulerBackend: Asked to
> remove
> > non-existent executor 6
> > 15/02/19 10:16:54 WARN scheduler.TaskSetManager: Lost task 6.0 in stage
> 18.0
> > (TID 5328, 192.168.1.90): FetchFailed(BlockManagerId(6, 192.168.1.92,
> > 54289), shuffleId=6, mapId=227, reduceId=6, message=
> > org.apache.spark.shuffle.FetchFailedException: Failed to connect to
> > /192.168.1.92:54289
> >         at
> > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$
> apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(
> BlockStoreShuffleFetcher.scala:67)
> >         at
> > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(
> BlockStoreShuffleFetcher.scala:83)
> >         at
> > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(
> BlockStoreShuffleFetcher.scala:83)
> >         at scala.collection.Iterator$$anon$13.hasNext(Iterator.
> scala:371)
> >
> > === yarn log ===
> > 15/02/19 10:15:05 WARN executor.Executor: Told to re-register on
> heartbeat
> > 15/02/19 10:16:02 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED
> > SIGNAL 15: SIGTERM
> > 15/02/19 10:16:02 WARN server.TransportChannelHandler: Exception in
> > connection from /192.168.1.92:45633
> > io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError:
> Java
> > heap space
> >         at
> > io.netty.handler.codec.ByteToMessageDecoder.callDecode(
> ByteToMessageDecoder.java:280)
> >         at
> > io.netty.handler.codec.ByteToMessageDecoder.channelRead(
> ByteToMessageDecoder.java:149)
> >         at
> > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
> AbstractChannelHandlerContext.java:333)
> >
> > === yarn nodemanager log ===
> > 2015-02-19 10:16:45,146 INFO
> > org.apache.hadoop.yarn.server.nodemanager.containermanager.
> monitor.ContainersMonitorImpl:
> > Memory usage of ProcessTree 20284 for container-id
> > container_1424204221358_0012_01_000016:
> >  28.5 GB of 32 GB physical memory used; 29.1 GB of 67.2 GB virtual memory
> > used
> > 2015-02-19 10:16:45,163 INFO
> > org.apache.hadoop.yarn.server.nodemanager.containermanager.
> monitor.ContainersMonitorImpl:
> > Memory usage of ProcessTree 20273 for container-id
> > container_1424204221358_0012_01_000020:
> >  28.5 GB of 32 GB physical memory used; 29.2 GB of 67.2 GB virtual memory
> > used
> > 2015-02-19 10:16:46,621 WARN
> > org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
> > code from container container_1424204221358_0012_01_000008 is : 143
> > 2015-02-19 10:16:46,621 INFO
> > org.apache.hadoop.yarn.server.nodemanager.containermanager.
> container.Container:
> > Container container_1424204221358_0012_01_000008 transitioned from
> RUNNING
> > to EXITED_WITH_FAILURE
> > 2015-02-19 10:16:46,621 INFO
> > org.apache.hadoop.yarn.server.nodemanager.containermanager.
> launcher.ContainerLaunch:
> > Cleaning up container container_1424204221358_0012_01_000008
> >
> >
> > thanks for any help,
> > Antony.
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > ps. could that be Java 8 related?
> >
> >
> > On Thursday, 19 February 2015, 11:25, Sean Owen <so...@cloudera.com>
> wrote:
> >
> >
> >
> > Oh OK you are saying you are requesting 25 executors and getting them,
> > got it. You can consider making fewer, bigger executors to pool rather
> > than split up your memory, but at some point it becomes
> > counter-productive. 32GB is a fine executor size.
> >
> > So you have ~8GB available per task which seems like plenty. Something
> > else is at work here. Is this error form your code's stages or ALS?
> >
> > On Thu, Feb 19, 2015 at 10:07 AM, Antony Mayi <antonym...@yahoo.com>
> wrote:
> >> based on spark UI I am running 25 executors for sure. why would you
> expect
> >> four? I submit the task with --num-executors 25 and I get 6-7 executors
> >> running per host (using more of smaller executors allows me better
> cluster
> >> utilization when running parallel spark sessions (which is not the case
> of
> >> this reported issue - for now using the cluster exclusively)).
> >>
> >> thx,
> >> Antony.
> >>
> >>
> >> On Thursday, 19 February 2015, 11:02, Sean Owen <so...@cloudera.com>
> >> wrote:
> >>
> >>
> >>
> >> This should result in 4 executors, not 25. They should be able to
> >> execute 4*4 = 16 tasks simultaneously. You have them grab 4*32 = 128GB
> >> of RAM, not 1TB.
> >>
> >> It still feels like this shouldn't be running out of memory, not by a
> >> long shot though. But just pointing out potential differences between
> >> what you are expecting and what you are configuring.
> >>
> >> On Thu, Feb 19, 2015 at 9:56 AM, Antony Mayi
> >> <antonym...@yahoo.com.invalid> wrote:
> >>> Hi,
> >>>
> >>> I have 4 very powerful boxes (256GB RAM, 32 cores each). I am running
> >>> spark
> >>> 1.2.0 in yarn-client mode with following layout:
> >>>
> >>> spark.executor.cores=4
> >>> spark.executor.memory=28G
> >>> spark.yarn.executor.memoryOverhead=4096
> >>>
> >>> I am submitting bigger ALS trainImplicit task (rank=100, iters=15) on a
> >>> dataset with ~3 billion of ratings using 25 executors. At some point
> some
> >>> executor crashes with:
> >>>
> >>> 15/02/19 05:41:06 WARN util.AkkaUtils: Error sending message in 1
> >>> attempts
> >>> java.util.concurrent.TimeoutException: Futures timed out after [30
> >>> seconds]
> >>>        at
> >>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> >>>        at
> >>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> >>>        at
> >>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> >>>        at
> >>>
> >>>
> >>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(
> BlockContext.scala:53)
> >>>        at scala.concurrent.Await$.result(package.scala:107)
> >>>        at
> >>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
> >>>        at
> >>> org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)
> >>> 15/02/19 05:41:06 ERROR executor.Executor: Exception in task 131.0 in
> >>> stage
> >>> 51.0 (TID 7259)
> >>> java.lang.OutOfMemoryError: GC overhead limit exceeded
> >>>        at java.lang.reflect.Array.newInstance(Array.java:75)
> >>>        at
> >>> java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671)
> >>>        at
> >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
> >>>        at
> >>> java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
> >>>        at
> >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
> >>>
> >>> So the GC overhead limit exceeded is pretty clear and would suggest
> >>> running
> >>> out of memory. Since I have 1TB of RAM available this must be rather
> due
> >>> to
> >>> some config inoptimality.
> >>>
> >>> Can anyone please point me to some directions how to tackle this?
> >>>
> >>> Thanks,
> >>> Antony.
> >
> >>
> >>
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
> >
> >
> >
> >
> >
> >
> >
>

Reply via email to