Hi Jun, I'm using async java producer. It works fine till the messages are in 100s of thousands but starts failing for anything above a million. Each message is around 2kb.
I've tried both with single producer and multiple producers. Rate of this error is much less in single producer then in case of multiple producers. Thanks, Anand On 29 March 2013 20:24, Jun Rao <jun...@gmail.com> wrote: > This indicates that the messages sent to the broker are somehow corrupted. > Are you using a java producer? How many instances of producers do you have? > > Thanks, > > Jun > > On Fri, Mar 29, 2013 at 2:46 AM, anand nalya <anand.na...@gmail.com> > wrote: > > > Hi, > > > > I'm running kafka in distributed mode with 2 nodes. It works fine with > slow > > ingestion rates but when I increase the ingestion rate, both the nodes > > starts giving the following error: > > > > [2013-03-29 14:51:45,379] ERROR Closing socket for > /192.168.145.183because > > of error (kafka.network.Processor) > > kafka.message.InvalidMessageException: message is invalid, compression > > codec: NoCompressionCodec size: 414138 curr offset: 2901235 init offset: > 0 > > at > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > at > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > > at > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > at > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > at > > > > > kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89) > > at kafka.log.Log.append(Log.scala:218) > > at > > > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > > at > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > at > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > at > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > at > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > at > > > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > > at > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > > at > > > > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) > > at > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > at > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > at kafka.network.Processor.handle(SocketServer.scala:296) > > at kafka.network.Processor.read(SocketServer.scala:319) > > at kafka.network.Processor.run(SocketServer.scala:214) > > at java.lang.Thread.run(Thread.java:662) > > > > I'm running kafak 0.7.2 with jdk 1.6.0.43. > > > > Any idea what I might be doing wrong here? > > > > Regards, > > Anand > > >