Hi Jun, we do not use any compression in our test.
We deploy producer and broker in the same machine. The problem still exists. We use sync producer, and send one message at a time(no batch now). We find that when the qps reaches more than 40k, the exception appears. So I don't think it's the underlying system error. Any suggestions if we want to do some debug on kafka serialization/deserialization? Thanks. On Wed, Mar 20, 2013 at 12:10 AM, Jun Rao <jun...@gmail.com> wrote: > It basically means that the broker is expecting to read certain number of > bytes in a buffer received from socket, but there are fewer bytes than > expected in the buffer. Possible causes are (1) a bug in Kafka request > serialization/deserialization logic; (2) corruption in the underlying > system such as network. > > BTW, did you enable compression in your producer? > > Thanks, > > Jun > > On Mon, Mar 18, 2013 at 10:12 PM, Helin Xiang <xkee...@gmail.com> wrote: > > > thanks Jun. > > > > we are using java producer. > > does the last exception > > "java.lang.IllegalArgumentException > > at java.nio.Buffer.limit(Buffer.java:266) > > " > > also means the broker received corrupted messages ? sorry i am not > > familiar with java nio. > > > > > > > > > > On Tue, Mar 19, 2013 at 12:58 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > Hmm, both log4j messages suggest that the broker received some > corrupted > > > produce requests. Are you using the java producer? Also, we have seen > > that > > > network router problems caused corrupted requests before. > > > > > > Thanks, > > > > > > Jun > > > > > > On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang <xkee...@gmail.com> > wrote: > > > > > > > Hi, > > > > We were doing some performance test using kafka 0.7.2. We use only 1 > > > > broker. > > > > On producer client, we use 8 threads to send logs, each thread use > sync > > > > producer and send 100 logs at a time, (each log is about 1~2K bytes > > > long), > > > > The total QPS is about 30K. > > > > But the number of logs both consumer read and the broker counts is > less > > > > than the producer send. we believe the data lost when producer > sending > > > logs > > > > to broker. > > > > > > > > We settle the QPS down to 10K, still lost logs. > > > > We found some exceptions in broker logs: > > > > > > > > 9201051 [kafka-processor-2] ERROR kafka.server.KafkaRequestHandlers > - > > > > Error processing ProduceRequest on abc:0 > > > > kafka.message.InvalidMessageException: message is invalid, > compression > > > > codec: NoCompressionCodec size: 1021 curr offset: 0 init offset: 0 > > > > at > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > > > at > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > > > > at > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > > > at > > > > > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > > > at > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > > > at > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89) > > > > at kafka.log.Log.append(Log.scala:218) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > > > at kafka.network.Processor.handle(SocketServer.scala:296) > > > > at kafka.network.Processor.read(SocketServer.scala:319) > > > > at kafka.network.Processor.run(SocketServer.scala:214) > > > > at java.lang.Thread.run(Thread.java:636) > > > > > > > > Or this: > > > > > > > > 1406871 [kafka-processor-2] ERROR kafka.network.Processor - Closing > > > socket > > > > for /10.0.2.140 because of error > > > > java.nio.BufferUnderflowException > > > > at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) > > > > at java.nio.ByteBuffer.get(ByteBuffer.java:692) > > > > at kafka.utils.Utils$.readShortString(Utils.scala:123) > > > > at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:29) > > > > at > > > > > > > > > > > > > > kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28) > > > > at > > > > > > > > > > > > > > scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) > > > > at > > > > > > scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) > > > > at > > > > > kafka.api.MultiProducerRequest$.readFrom(MultiProducerRequest.scala:27) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:59) > > > > > > > > Or this: > > > > > > > > 1830146 [kafka-processor-0] ERROR kafka.network.Processor - Closing > > > socket > > > > for /10.0.2.140 because of error > > > > java.lang.IllegalArgumentException > > > > at java.nio.Buffer.limit(Buffer.java:266) > > > > at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33) > > > > at > > > > > > > > > > > > > > kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28) > > > > at > > > > > > > > > > > > > > scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) > > > > at > > > > > > scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) > > > > at > > > > > kafka.api.MultiProducerRequest$.readFrom(MultiProducerRequest.scala:27) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:59) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > > > at kafka.network.Processor.handle(SocketServer.scala:296) > > > > at kafka.network.Processor.read(SocketServer.scala:319) > > > > at kafka.network.Processor.run(SocketServer.scala:214) > > > > at java.lang.Thread.run(Thread.java:636) > > > > > > > > It bothers us for a few days, and at first we thought it might be > some > > > > wrong configuration settings, and we changed to the wiki's > recommended > > > > configuration, but unfortunately the exceptions still came out. > > > > > > > > In what situation can these exceptions be thrown out ? What can we > do > > to > > > > avoid these exceptions ? > > > > > > > > THANKS > > > > > > > > -- > > > > *Best Regards > > > > > > > > Xiang Helin* > > > > > > > > > > > > > > > -- > > *Best Regards > > > > 向河林* > > > -- Guodong Wang 王国栋