Hi, I'm running kafka in distributed mode with 2 nodes. It works fine with slow ingestion rates but when I increase the ingestion rate, both the nodes starts giving the following error:
[2013-03-29 14:51:45,379] ERROR Closing socket for /192.168.145.183 because of error (kafka.network.Processor) kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 414138 curr offset: 2901235 init offset: 0 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89) at kafka.log.Log.append(Log.scala:218) at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) at kafka.network.Processor.handle(SocketServer.scala:296) at kafka.network.Processor.read(SocketServer.scala:319) at kafka.network.Processor.run(SocketServer.scala:214) at java.lang.Thread.run(Thread.java:662) I'm running kafak 0.7.2 with jdk 1.6.0.43. Any idea what I might be doing wrong here? Regards, Anand