Hi i have a next problem. I have a dataset with 30 columns (15 numeric,
15 categorical) and using ml transformers/estimators to transform each
column (StringIndexer for categorical & MeanImputor for numeric). This
creates 30 more columns in a dataframe. After i’m using VectorAssembler
to combine 30 transformed columns into 1 vector.
After when i do df.select(“vector, Label”).saveAsParquetFile it fails
with OOM error.
|15/04/24 16:33:05 ERROR Executor: Exception in task 2.0 in stage 52.0
(TID 2238) 15/04/24 16:33:05 DEBUG LocalActor: [actor] received message
StatusUpdate(2238,FAILED,java.nio.HeapByteBuffer[pos=0 lim=4167
cap=4167]) from Actor[akka://sparkDriver/deadLetters] 15/04/24 16:33:05
ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread
Thread[Executor task launch worker-1,5,main] java.lang.OutOfMemoryError:
Java heap space at
java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3468)
at
java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3275) at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1792) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at
scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
at
scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
at scala.collection.mutable.HashTable$class.init(HashTable.scala:105) at
scala.collection.mutable.HashMap.init(HashMap.scala:39) at
scala.collection.mutable.HashMap.readObject(HashMap.scala:142) at
sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497) at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 15/04/24
16:33:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_52,
runningTasks: 3 15/04/24 16:33:05 DEBUG Utils: Shutdown hook called
15/04/24 16:33:05 DEBUG DiskBlockManager: Shutdown hook called 15/04/24
16:33:05 DEBUG TaskSetManager: Moving to NODE_LOCAL after waiting for
3000ms 15/04/24 16:33:05 DEBUG TaskSetManager: Moving to ANY after
waiting for 0ms 15/04/24 16:33:05 INFO TaskSetManager: Starting task 4.0
in stage 52.0 (TID 2240, localhost, PROCESS_LOCAL, 1979 bytes) 15/04/24
16:33:05 DEBUG LocalActor: [actor] handled message (12.488047 ms)
StatusUpdate(2238,FAILED,java.nio.HeapByteBuffer[pos=4167 lim=4167
cap=4167]) from Actor[akka://sparkDriver/deadLetters] 15/04/24 16:33:05
INFO Executor: Running task 4.0 in stage 52.0 (TID 2240) 15/04/24
16:33:05 DEBUG LocalActor: [actor] received message
StatusUpdate(2240,RUNNING,java.nio.HeapByteBuffer[pos=0 lim=0 cap=0])
from Actor[akka://sparkDriver/deadLetters] 15/04/24 16:33:05 DEBUG
Executor: Task 2240's epoch is 13 15/04/24 16:33:05 DEBUG BlockManager:
Getting local block broadcast_53 ... 15/04/24 16:33:05 DEBUG
BlockManager: Level for block broadcast_53 is StorageLevel(true, true,
false, true, 1) 15/04/24 16:33:05 DEBUG BlockManager: Getting block
broadcast_53 from memory 15/04/24 16:33:05 ERROR TaskSetManager: Task 2
in stage 52.0 failed 1 times; aborting job 15/04/24 16:33:05 DEBUG
LocalActor: [actor] handled message (7.195529 ms)
StatusUpdate(2240,RUNNING,java.nio.HeapByteBuffer[pos=0 lim=0 cap=0])
from Actor[akka://sparkDriver/deadLetters] 15/04/24 16:33:05 INFO
TaskSchedulerImpl: Cancelling stage 52 |
If i after last step manually repartition data i get GC overhead error:
|java.lang.OutOfMemoryError: GC overhead limit exceeded 15/04/24 18:04:55
ERROR Executor: Exception in task 1.0 in stage 52.0 (TID 2237)
java.lang.OutOfMemoryError: GC overhead limit exceeded at
scala.collection.mutable.HashMap.createNewEntry(HashMap.scala:131) at
scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
at
scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
at scala.collection.mutable.HashTable$class.init(HashTable.scala:105) at
scala.collection.mutable.HashMap.init(HashMap.scala:39) at
scala.collection.mutable.HashMap.readObject(HashMap.scala:142) at
sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497) at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at
java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
15/04/24 18:04:55 INFO ActorSystemImpl: starting new LARS thread
15/04/24 18:04:55 ERROR ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-scheduler-1] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: GC overhead limit exceeded at
akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:22) at
akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(Scheduler.scala:443)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745) |
It’s done with 4GB RAM on 2GB file in local context with 4 treads,
(label and vector columns serialized to parquet is about 500 mb).
I’ve tried to increase default parallelism, but my transformations are
linear: take a column and produce another column. What’s the best
practice to handle partitions in dataframes with a lots of columns?
Should i repartition manually after adding columns? What’s better &
faster: Applying 30 transformers for each numeric column or combine
these columns to 1 vector column and apply 1 transformer?
Thanks,
Peter Rudenko