Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-19 Thread Marcin Kuthan
I'm glad that I could help :) 19 sie 2015 8:52 AM "Shenghua(Daniel) Wan" napisał(a): > +1 > > I wish I have read this blog earlier. I am using Java and have just > implemented a singleton producer per executor/JVM during the day. > Yes, I did see that NonSerializableException when I was debugging

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Shenghua(Daniel) Wan
+1 I wish I have read this blog earlier. I am using Java and have just implemented a singleton producer per executor/JVM during the day. Yes, I did see that NonSerializableException when I was debugging the code ... Thanks for sharing. On Tue, Aug 18, 2015 at 10:59 PM, Tathagata Das wrote: > I

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Tathagata Das
Its a cool blog post! Tweeted it! Broadcasting the configuration necessary for lazily instantiating the producer is a good idea. Nitpick: The first code example has an extra `}` ;) On Tue, Aug 18, 2015 at 10:49 PM, Marcin Kuthan wrote: > As long as Kafka producent is thread-safe you don't need

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Marcin Kuthan
As long as Kafka producent is thread-safe you don't need any pool at all. Just share single producer on every executor. Please look at my blog post for more details. http://allegro.tech/spark-kafka-integration.html 19 sie 2015 2:00 AM "Shenghua(Daniel) Wan" napisał(a): > All of you are right. > >

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Shenghua(Daniel) Wan
All of you are right. I was trying to create too many producers. My idea was to create a pool(for now the pool contains only one producer) shared by all the executors. After I realized it was related to the serializable issues (though I did not find clear clues in the source code to indicate the b

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Tathagata Das
Why are you even trying to broadcast a producer? A broadcast variable is some immutable piece of serializable DATA that can be used for processing on the executors. A Kafka producer is neither DATA nor immutable, and definitely not serializable. The right way to do this is to create the producer in

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Cody Koeninger
I wouldn't expect a kafka producer to be serializable at all... among other things, it has a background thread On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan wrote: > Hi, > Did anyone see java.util.ConcurrentModificationException when using > broadcast variables? > I encountered this exce

broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Shenghua(Daniel) Wan
Hi, Did anyone see java.util.ConcurrentModificationException when using broadcast variables? I encountered this exception when wrapping a Kafka producer like this in the spark streaming driver. Here is what I did. KafkaProducer producer = new KafkaProducer(properties); final Broadcast bCastProduce