All of you are right.

I was trying to create too many producers. My idea was to create a pool(for
now the pool contains only one producer) shared by all the executors.
After I realized it was related to the serializable issues (though I did
not find clear clues in the source code to indicate the broacast template
type parameter must be implement serializable),  I followed spark cassandra
connector design and created a singleton of Kafka producer pools. There is
not exception noticed.

Thanks for all your comments.


On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das <t...@databricks.com> wrote:

> Why are you even trying to broadcast a producer? A broadcast variable is
> some immutable piece of serializable DATA that can be used for processing
> on the executors. A Kafka producer is neither DATA nor immutable, and
> definitely not serializable.
> The right way to do this is to create the producer in the executors.
> Please see the discussion in the programming guide
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>
> On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> I wouldn't expect a kafka producer to be serializable at all... among
>> other things, it has a background thread
>>
>> On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan <
>> wansheng...@gmail.com> wrote:
>>
>>> Hi,
>>> Did anyone see java.util.ConcurrentModificationException when using
>>> broadcast variables?
>>> I encountered this exception when wrapping a Kafka producer like this in
>>> the spark streaming driver.
>>>
>>> Here is what I did.
>>> KafkaProducer<String, String> producer = new KafkaProducer<String,
>>> String>(properties);
>>> final Broadcast<KafkaDataProducer> bCastProducer
>>>     = streamingContext.sparkContext().broadcast(producer);
>>>
>>> Then within an closure called by a foreachRDD, I was trying to get the
>>> wrapped producer, i.e.
>>>  KafkaProducer<String, String> p = bCastProducer.value();
>>>
>>> after rebuilding and rerunning, I got the stack trace like this
>>>
>>> Exception in thread "main" com.esotericsoftware.kryo.KryoException:
>>> java.util.ConcurrentModificationException
>>> Serialization trace:
>>> classes (sun.misc.Launcher$AppClassLoader)
>>> classloader (java.security.ProtectionDomain)
>>> context (java.security.AccessControlContext)
>>> acc (org.apache.spark.util.MutableURLClassLoader)
>>> contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
>>> ioThread (org.apache.kafka.clients.producer.KafkaProducer)
>>> producer ("my driver")
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>> at
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>>> at
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>> at
>>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>> at
>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
>>> at
>>> org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
>>> at "my driver"
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> Caused by: java.util.ConcurrentModificationException
>>> at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
>>> at java.util.Vector$Itr.next(Vector.java:1133)
>>> at
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
>>> at
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>> ... 41 more
>>>
>>> ​Thanks.​
>>>
>>> --
>>>
>>> Regards,
>>> Shenghua (Daniel) Wan
>>>
>>
>>
>


-- 

Regards,
Shenghua (Daniel) Wan

Reply via email to