Hello all, I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes but haven't been able to make it work so far.
It looks like the executors start to run out of memory during deserialization. This behavior only shows itself when the number of partitions is above a few 10s, the broadcast does work for 10 or 20 partitions. I'm using the following setup to observe the problem: val tuples: Array[((String, String), (String, String))] // ~ 10M tuples val tuplesBc = sc.broadcast(tuples) val numsRdd = sc.parallelize(1 to 5000, 100) numsRdd.map(n => tuplesBc.value.head).count() If I set the number of partitions for numsRDD to 20, the count goes through successfully, but at 100, I'll start to get errors such as: 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in stage 1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java heap space at java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472) at java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark property maximizeResourceAllocation is set to true (executor.memory = 48G according to spark ui environment). We're also using kryo serialization and Yarn is the resource manager. Any ideas as what might be going wrong and how to debug this? Thanks, Arash