Hi i have a next problem. I have a dataset with 30 columns (15 numeric, 15 categorical) and using ml transformers/estimators to transform each column (StringIndexer for categorical & MeanImputor for numeric). This creates 30 more columns in a dataframe. After i’m using VectorAssembler to combine 30 transformed columns into 1 vector. After when i do df.select(“vector, Label”).saveAsParquetFile it fails with OOM error.

|15/04/24 16:33:05 ERROR Executor: Exception in task 2.0 in stage 52.0 (TID 2238) 15/04/24 16:33:05 DEBUG LocalActor: [actor] received message StatusUpdate(2238,FAILED,java.nio.HeapByteBuffer[pos=0 lim=4167 cap=4167]) from Actor[akka://sparkDriver/deadLetters] 15/04/24 16:33:05 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-1,5,main] java.lang.OutOfMemoryError: Java heap space at java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3468) at java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3275) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1792) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142) at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142) at scala.collection.mutable.HashTable$class.init(HashTable.scala:105) at scala.collection.mutable.HashMap.init(HashMap.scala:39) at scala.collection.mutable.HashMap.readObject(HashMap.scala:142) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 15/04/24 16:33:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_52, runningTasks: 3 15/04/24 16:33:05 DEBUG Utils: Shutdown hook called 15/04/24 16:33:05 DEBUG DiskBlockManager: Shutdown hook called 15/04/24 16:33:05 DEBUG TaskSetManager: Moving to NODE_LOCAL after waiting for 3000ms 15/04/24 16:33:05 DEBUG TaskSetManager: Moving to ANY after waiting for 0ms 15/04/24 16:33:05 INFO TaskSetManager: Starting task 4.0 in stage 52.0 (TID 2240, localhost, PROCESS_LOCAL, 1979 bytes) 15/04/24 16:33:05 DEBUG LocalActor: [actor] handled message (12.488047 ms) StatusUpdate(2238,FAILED,java.nio.HeapByteBuffer[pos=4167 lim=4167 cap=4167]) from Actor[akka://sparkDriver/deadLetters] 15/04/24 16:33:05 INFO Executor: Running task 4.0 in stage 52.0 (TID 2240) 15/04/24 16:33:05 DEBUG LocalActor: [actor] received message StatusUpdate(2240,RUNNING,java.nio.HeapByteBuffer[pos=0 lim=0 cap=0]) from Actor[akka://sparkDriver/deadLetters] 15/04/24 16:33:05 DEBUG Executor: Task 2240's epoch is 13 15/04/24 16:33:05 DEBUG BlockManager: Getting local block broadcast_53 ... 15/04/24 16:33:05 DEBUG BlockManager: Level for block broadcast_53 is StorageLevel(true, true, false, true, 1) 15/04/24 16:33:05 DEBUG BlockManager: Getting block broadcast_53 from memory 15/04/24 16:33:05 ERROR TaskSetManager: Task 2 in stage 52.0 failed 1 times; aborting job 15/04/24 16:33:05 DEBUG LocalActor: [actor] handled message (7.195529 ms) StatusUpdate(2240,RUNNING,java.nio.HeapByteBuffer[pos=0 lim=0 cap=0]) from Actor[akka://sparkDriver/deadLetters] 15/04/24 16:33:05 INFO TaskSchedulerImpl: Cancelling stage 52 |

If i after last step manually repartition data i get GC overhead error:

|java.lang.OutOfMemoryError: GC overhead limit exceeded 15/04/24 18:04:55 ERROR Executor: Exception in task 1.0 in stage 52.0 (TID 2237) java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.HashMap.createNewEntry(HashMap.scala:131) at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142) at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142) at scala.collection.mutable.HashTable$class.init(HashTable.scala:105) at scala.collection.mutable.HashMap.init(HashMap.scala:39) at scala.collection.mutable.HashMap.readObject(HashMap.scala:142) at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 15/04/24 18:04:55 INFO ActorSystemImpl: starting new LARS thread 15/04/24 18:04:55 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-scheduler-1] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded at akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:22) at akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(Scheduler.scala:443) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) at java.lang.Thread.run(Thread.java:745) |

It’s done with 4GB RAM on 2GB file in local context with 4 treads, (label and vector columns serialized to parquet is about 500 mb). I’ve tried to increase default parallelism, but my transformations are linear: take a column and produce another column. What’s the best practice to handle partitions in dataframes with a lots of columns? Should i repartition manually after adding columns? What’s better & faster: Applying 30 transformers for each numeric column or combine these columns to 1 vector column and apply 1 transformer?

Thanks,
Peter Rudenko

​

Reply via email to