This indicates that the messages sent to the broker are somehow corrupted. Are you using a java producer? How many instances of producers do you have?
Thanks, Jun On Fri, Mar 29, 2013 at 2:46 AM, anand nalya <anand.na...@gmail.com> wrote: > Hi, > > I'm running kafka in distributed mode with 2 nodes. It works fine with slow > ingestion rates but when I increase the ingestion rate, both the nodes > starts giving the following error: > > [2013-03-29 14:51:45,379] ERROR Closing socket for /192.168.145.183because > of error (kafka.network.Processor) > kafka.message.InvalidMessageException: message is invalid, compression > codec: NoCompressionCodec size: 414138 curr offset: 2901235 init offset: 0 > at > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > at > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > at > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > at > > kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89) > at kafka.log.Log.append(Log.scala:218) > at > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > at > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > at kafka.network.Processor.handle(SocketServer.scala:296) > at kafka.network.Processor.read(SocketServer.scala:319) > at kafka.network.Processor.run(SocketServer.scala:214) > at java.lang.Thread.run(Thread.java:662) > > I'm running kafak 0.7.2 with jdk 1.6.0.43. > > Any idea what I might be doing wrong here? > > Regards, > Anand >