Hello all,

I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes but
haven't been able to make it work so far.

It looks like the executors start to run out of memory during
deserialization. This behavior only shows itself when the number of
partitions is above a few 10s, the broadcast does work for 10 or 20
partitions.

I'm using the following setup to observe the problem:

val tuples: Array[((String, String), (String, String))]      // ~ 10M tuples
val tuplesBc = sc.broadcast(tuples)
val numsRdd = sc.parallelize(1 to 5000, 100)
numsRdd.map(n => tuplesBc.value.head).count()

If I set the number of partitions for numsRDD to 20, the count goes through
successfully, but at 100, I'll start to get errors such as:

16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in stage
1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java heap
space
        at
java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472)
        at
java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at
scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)


I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark
property maximizeResourceAllocation is set to true (executor.memory = 48G
according to spark ui environment). We're also using kryo serialization and
Yarn is the resource manager.

Any ideas as what might be going wrong and how to debug this?

Thanks,
Arash

Reply via email to