Hi Kafka Users, I have written my own implementation of the kafka Encoder class for serializing objects to Messages. It uses Kryo, which is a non-thread safe java serialization library. I'm using Kafka 0.7.2.
We recently ran into an issue where we increased the number of kafka brokers for our kafka producer from 1 to 2. When we did this, we ran into exceptions that seemed related to Kryo being used concurrently by multiple threads. So, my question is, do I need to modify my Encoder class to be thread safe? I dug through the Kafka documentation and couldn't find anything that said one way or another. Any information would be great. Thank you! --Liz Bennett p.s. for what it's worth here is a stack trace from one of the exceptions we saw: 2015-01-08 07:33:35,938 [ERROR] [ProducerHandlerWrapper.handle] Failed to write 9 batched events to Kafka. com.esotericsoftware.kryo.KryoException: java.lang.ArrayIndexOutOfBoundsException: 40 Serialization trace: fieldGroups (com.loggly.core.event.Event) event (com.loggly.core.event.FailedEvent) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:474) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:538) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:474) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:520) at com.loggly.eventreader.kryo.KryoEventSerDes.serialize(KryoEventSerDes.java:39) at com.loggly.kafka.serializer.KryoFailedEventSerializer.toMessage(KryoFailedEventSerializer.java:23) at com.loggly.kafka.serializer.KryoFailedEventSerializer.toMessage(KryoFailedEventSerializer.java:8) at kafka.producer.async.DefaultEventHandler$$anonfun$2$$anonfun$apply$2.apply(DefaultEventHandler.scala:74) at kafka.producer.async.DefaultEventHandler$$anonfun$2$$anonfun$apply$2.apply(DefaultEventHandler.scala:74) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) at scala.collection.immutable.List.foreach(List.scala:45) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:30) at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:42) at scala.collection.TraversableLike$class.map(TraversableLike.scala:194) at scala.collection.mutable.ListBuffer.map(ListBuffer.scala:42) at kafka.producer.async.DefaultEventHandler$$anonfun$2.apply(DefaultEventHandler.scala:74) at kafka.producer.async.DefaultEventHandler$$anonfun$2.apply(DefaultEventHandler.scala:74) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:194) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93) at scala.collection.Iterator$class.foreach(Iterator.scala:660) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:43) at scala.collection.mutable.HashMap.foreach(HashMap.scala:93) at scala.collection.TraversableLike$class.map(TraversableLike.scala:194) at scala.collection.mutable.HashMap.map(HashMap.scala:43) at kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:74) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44) at com.loggly.kafka.producer.ProducerHandlerWrapper.handle(ProducerHandlerWrapper.java:64) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71) at scala.collection.immutable.Stream.foreach(Stream.scala:291) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41) Caused by: java.lang.ArrayIndexOutOfBoundsException: 40 at com.esotericsoftware.kryo.util.ObjectMap.resize(ObjectMap.java:460) at com.esotericsoftware.kryo.util.ObjectMap.put_internal(ObjectMap.java:125) at com.esotericsoftware.kryo.util.ObjectMap.put(ObjectMap.java:73) at com.esotericsoftware.kryo.util.DefaultClassResolver.register(DefaultClassResolver.java:49) at com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56) at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:476) at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79) at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:503) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:608) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:91) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:538) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ... 40 more