All of you are right. I was trying to create too many producers. My idea was to create a pool(for now the pool contains only one producer) shared by all the executors. After I realized it was related to the serializable issues (though I did not find clear clues in the source code to indicate the broacast template type parameter must be implement serializable), I followed spark cassandra connector design and created a singleton of Kafka producer pools. There is not exception noticed.
Thanks for all your comments. On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das <t...@databricks.com> wrote: > Why are you even trying to broadcast a producer? A broadcast variable is > some immutable piece of serializable DATA that can be used for processing > on the executors. A Kafka producer is neither DATA nor immutable, and > definitely not serializable. > The right way to do this is to create the producer in the executors. > Please see the discussion in the programming guide > > http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams > > On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> I wouldn't expect a kafka producer to be serializable at all... among >> other things, it has a background thread >> >> On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan < >> wansheng...@gmail.com> wrote: >> >>> Hi, >>> Did anyone see java.util.ConcurrentModificationException when using >>> broadcast variables? >>> I encountered this exception when wrapping a Kafka producer like this in >>> the spark streaming driver. >>> >>> Here is what I did. >>> KafkaProducer<String, String> producer = new KafkaProducer<String, >>> String>(properties); >>> final Broadcast<KafkaDataProducer> bCastProducer >>> = streamingContext.sparkContext().broadcast(producer); >>> >>> Then within an closure called by a foreachRDD, I was trying to get the >>> wrapped producer, i.e. >>> KafkaProducer<String, String> p = bCastProducer.value(); >>> >>> after rebuilding and rerunning, I got the stack trace like this >>> >>> Exception in thread "main" com.esotericsoftware.kryo.KryoException: >>> java.util.ConcurrentModificationException >>> Serialization trace: >>> classes (sun.misc.Launcher$AppClassLoader) >>> classloader (java.security.ProtectionDomain) >>> context (java.security.AccessControlContext) >>> acc (org.apache.spark.util.MutableURLClassLoader) >>> contextClassLoader (org.apache.kafka.common.utils.KafkaThread) >>> ioThread (org.apache.kafka.clients.producer.KafkaProducer) >>> producer ("my driver") >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >>> at >>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) >>> at >>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) >>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) >>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >>> at >>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148) >>> at >>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203) >>> at >>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102) >>> at >>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85) >>> at >>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) >>> at >>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) >>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289) >>> at >>> org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648) >>> at "my driver" >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:606) >>> at >>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) >>> at >>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) >>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) >>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) >>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >>> Caused by: java.util.ConcurrentModificationException >>> at java.util.Vector$Itr.checkForComodification(Vector.java:1156) >>> at java.util.Vector$Itr.next(Vector.java:1133) >>> at >>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67) >>> at >>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) >>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) >>> ... 41 more >>> >>> Thanks. >>> >>> -- >>> >>> Regards, >>> Shenghua (Daniel) Wan >>> >> >> > -- Regards, Shenghua (Daniel) Wan