Hi Peter, As far as setting the parallelism, I would recommend setting it as early as possible. Ideally, that would mean specifying the number of partitions when loading the initial data (rather than repartitioning later on).
In general, working with Vector columns should be better since the Vector can be stored as a native array, rather than a bunch of objects. I suspect the OOM is from Parquet's very large default buffer sizes. This is a problem with ML model import/export as well. I have a JIRA for that: https://issues.apache.org/jira/browse/SPARK-7148 I'm not yet sure if there's a good way to set the buffer size automatically, though. Joseph On Fri, Apr 24, 2015 at 8:20 AM, Peter Rudenko <petro.rude...@gmail.com> wrote: > Hi i have a next problem. I have a dataset with 30 columns (15 numeric, > 15 categorical) and using ml transformers/estimators to transform each > column (StringIndexer for categorical & MeanImputor for numeric). This > creates 30 more columns in a dataframe. After i’m using VectorAssembler to > combine 30 transformed columns into 1 vector. > After when i do df.select(“vector, Label”).saveAsParquetFile it fails with > OOM error. > > 15/04/24 16:33:05 ERROR Executor: Exception in task 2.0 in stage 52.0 (TID > 2238) > 15/04/24 16:33:05 DEBUG LocalActor: [actor] received message > StatusUpdate(2238,FAILED,java.nio.HeapByteBuffer[pos=0 lim=4167 cap=4167]) > from Actor[akka://sparkDriver/deadLetters] > 15/04/24 16:33:05 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-1,5,main] > java.lang.OutOfMemoryError: Java heap space > at java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3468) > at > java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3275) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1792) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142) > at > scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142) > at scala.collection.mutable.HashTable$class.init(HashTable.scala:105) > at scala.collection.mutable.HashMap.init(HashMap.scala:39) > at scala.collection.mutable.HashMap.readObject(HashMap.scala:142) > at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > 15/04/24 16:33:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_52, > runningTasks: 3 > 15/04/24 16:33:05 DEBUG Utils: Shutdown hook called > 15/04/24 16:33:05 DEBUG DiskBlockManager: Shutdown hook called > 15/04/24 16:33:05 DEBUG TaskSetManager: Moving to NODE_LOCAL after waiting > for 3000ms > 15/04/24 16:33:05 DEBUG TaskSetManager: Moving to ANY after waiting for 0ms > 15/04/24 16:33:05 INFO TaskSetManager: Starting task 4.0 in stage 52.0 (TID > 2240, localhost, PROCESS_LOCAL, 1979 bytes) > 15/04/24 16:33:05 DEBUG LocalActor: [actor] handled message (12.488047 ms) > StatusUpdate(2238,FAILED,java.nio.HeapByteBuffer[pos=4167 lim=4167 cap=4167]) > from Actor[akka://sparkDriver/deadLetters] > 15/04/24 16:33:05 INFO Executor: Running task 4.0 in stage 52.0 (TID 2240) > 15/04/24 16:33:05 DEBUG LocalActor: [actor] received message > StatusUpdate(2240,RUNNING,java.nio.HeapByteBuffer[pos=0 lim=0 cap=0]) from > Actor[akka://sparkDriver/deadLetters] > 15/04/24 16:33:05 DEBUG Executor: Task 2240's epoch is 13 > 15/04/24 16:33:05 DEBUG BlockManager: Getting local block broadcast_53 > ... > 15/04/24 16:33:05 DEBUG BlockManager: Level for block broadcast_53 is > StorageLevel(true, true, false, true, 1) > 15/04/24 16:33:05 DEBUG BlockManager: Getting block broadcast_53 from memory > 15/04/24 16:33:05 ERROR TaskSetManager: Task 2 in stage 52.0 failed 1 times; > aborting job > 15/04/24 16:33:05 DEBUG LocalActor: [actor] handled message (7.195529 ms) > StatusUpdate(2240,RUNNING,java.nio.HeapByteBuffer[pos=0 lim=0 cap=0]) from > Actor[akka://sparkDriver/deadLetters] > 15/04/24 16:33:05 INFO TaskSchedulerImpl: Cancelling stage 52 > > If i after last step manually repartition data i get GC overhead error: > > java.lang.OutOfMemoryError: GC overhead limit exceeded > 15/04/24 18:04:55 ERROR Executor: Exception in task 1.0 in stage 52.0 (TID > 2237) > java.lang.OutOfMemoryError: GC overhead limit exceeded > at scala.collection.mutable.HashMap.createNewEntry(HashMap.scala:131) > at > scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142) > at > scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142) > at scala.collection.mutable.HashTable$class.init(HashTable.scala:105) > at scala.collection.mutable.HashMap.init(HashMap.scala:39) > at scala.collection.mutable.HashMap.readObject(HashMap.scala:142) > at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > > 15/04/24 18:04:55 INFO ActorSystemImpl: starting new LARS thread > 15/04/24 18:04:55 ERROR ActorSystemImpl: Uncaught fatal error from thread > [sparkDriver-scheduler-1] shutting down ActorSystem [sparkDriver] > java.lang.OutOfMemoryError: GC overhead limit exceeded > at akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:22) > at > akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(Scheduler.scala:443) > at > akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409) > at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) > at java.lang.Thread.run(Thread.java:745) > > It’s done with 4GB RAM on 2GB file in local context with 4 treads, (label > and vector columns serialized to parquet is about 500 mb). > I’ve tried to increase default parallelism, but my transformations are > linear: take a column and produce another column. What’s the best practice > to handle partitions in dataframes with a lots of columns? Should i > repartition manually after adding columns? What’s better & faster: Applying > 30 transformers for each numeric column or combine these columns to 1 > vector column and apply 1 transformer? > > Thanks, > Peter Rudenko > >