Hmm, both log4j messages suggest that the broker received some corrupted produce requests. Are you using the java producer? Also, we have seen that network router problems caused corrupted requests before.
Thanks, Jun On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang <xkee...@gmail.com> wrote: > Hi, > We were doing some performance test using kafka 0.7.2. We use only 1 > broker. > On producer client, we use 8 threads to send logs, each thread use sync > producer and send 100 logs at a time, (each log is about 1~2K bytes long), > The total QPS is about 30K. > But the number of logs both consumer read and the broker counts is less > than the producer send. we believe the data lost when producer sending logs > to broker. > > We settle the QPS down to 10K, still lost logs. > We found some exceptions in broker logs: > > 9201051 [kafka-processor-2] ERROR kafka.server.KafkaRequestHandlers - > Error processing ProduceRequest on abc:0 > kafka.message.InvalidMessageException: message is invalid, compression > codec: NoCompressionCodec size: 1021 curr offset: 0 init offset: 0 > at > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > at > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > at > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > at > > kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89) > at kafka.log.Log.append(Log.scala:218) > at > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > at > > kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > at kafka.network.Processor.handle(SocketServer.scala:296) > at kafka.network.Processor.read(SocketServer.scala:319) > at kafka.network.Processor.run(SocketServer.scala:214) > at java.lang.Thread.run(Thread.java:636) > > Or this: > > 1406871 [kafka-processor-2] ERROR kafka.network.Processor - Closing socket > for /10.0.2.140 because of error > java.nio.BufferUnderflowException > at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) > at java.nio.ByteBuffer.get(ByteBuffer.java:692) > at kafka.utils.Utils$.readShortString(Utils.scala:123) > at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:29) > at > > kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28) > at > > scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) > at > scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) > at > kafka.api.MultiProducerRequest$.readFrom(MultiProducerRequest.scala:27) > at > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:59) > > Or this: > > 1830146 [kafka-processor-0] ERROR kafka.network.Processor - Closing socket > for /10.0.2.140 because of error > java.lang.IllegalArgumentException > at java.nio.Buffer.limit(Buffer.java:266) > at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33) > at > > kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28) > at > > scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) > at > scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) > at > kafka.api.MultiProducerRequest$.readFrom(MultiProducerRequest.scala:27) > at > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:59) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > at kafka.network.Processor.handle(SocketServer.scala:296) > at kafka.network.Processor.read(SocketServer.scala:319) > at kafka.network.Processor.run(SocketServer.scala:214) > at java.lang.Thread.run(Thread.java:636) > > It bothers us for a few days, and at first we thought it might be some > wrong configuration settings, and we changed to the wiki's recommended > configuration, but unfortunately the exceptions still came out. > > In what situation can these exceptions be thrown out ? What can we do to > avoid these exceptions ? > > THANKS > > -- > *Best Regards > > Xiang Helin* >