Hi,
I have a 170GB data tab limited data set which I am converting into the
RDD[LabeledPoint] format. I am then taking a 60% sample of this data set to be
used for training a GBT model.
I got the Size exceeds Integer.MAX_VALUE error which I fixed by repartitioning
the data set to 1000 partitions.
Now, the GBT code caches the data set, if it's not already cached, with this
operation input.persist(StorageLevel.MEMORY_AND_DISK)
(https://github.com/apache/spark/blob/branch-1.2/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala
).
To pre-empt this caching so I can better control it, I am caching the RDD
(after repartition) with this command,
trainingData.persist(StorageLevel.MEMORY_AND_DISK_SER_2)
But now, I get the following error on one executor and the application fails
after a retry. I am not sure how to fix this. Could someone help with this?
One possible reason could be that I submit my job with "--driver-memory 11G
--executor-memory 11G " but I am allotted only 5.7GB. I am not sure if this
could actually cause an affect.
My runtime environment: 120 executors with 5.7 GB each, Driver has 5.3 GB.
My Spark Config: set("spark.default.parallelism",
"300").set("spark.akka.frameSize", "256").set("spark.akka.timeout",
"1000").set("spark.core.connection.ack.wait.timeout","200").set("spark.akka.threads",
"10").set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.mb",
"256")
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.util.Arrays.copyOf(Arrays.java:2271)
at
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at
java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
at
com.esotericsoftware.kryo.io.Output.writeLong(Output.java:477)
at
com.esotericsoftware.kryo.io.Output.writeDouble(Output.java:596)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:212)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:200)
at
com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
at
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
at
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1175)
at
org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1184)
at
org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:103)
at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:789)
at
org.apache.spark.storage.BlockManager.putArray(BlockManager.scala:669)
at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:167)
at
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
Thank You!
Vinay