Hi Neha,

Our test is reproducible. We use a log replay tool to send the same part of
log to kafka.

We will check the log file latter and try to find some more hints.

Till now, it seems that if the batch size in sync producer is small, the
problem is hard to reproduce.

Thanks.


On Tue, Mar 19, 2013 at 1:54 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote:

> Do you mind trying out the DumpLogSegment tool on the log segment for the
> corrupted topic. That will validate if the log data is corrupted. Also, Is
> your test reproducible ? We ran into a similar issue in production but
> could not reproduce it.
>
> Thanks,
> Neha
>
> On Monday, March 18, 2013, Helin Xiang wrote:
>
> > thanks Jun.
> >
> > we are using java producer.
> > does the last exception
> > "java.lang.IllegalArgumentException
> >     at java.nio.Buffer.limit(Buffer.java:266)
> > "
> > also means the broker received corrupted messages ?  sorry i am not
> > familiar with java nio.
> >
> >
> >
> >
> > On Tue, Mar 19, 2013 at 12:58 PM, Jun Rao <jun...@gmail.com> wrote:
> >
> > > Hmm, both log4j messages suggest that the broker received some
> corrupted
> > > produce requests. Are you using the java producer? Also, we have seen
> > that
> > > network router problems caused corrupted requests before.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang <xkee...@gmail.com>
> wrote:
> > >
> > > > Hi,
> > > > We were doing some performance test using kafka 0.7.2. We use only 1
> > > > broker.
> > > > On producer client, we use 8 threads to send logs, each thread use
> sync
> > > > producer and send 100 logs at a time, (each log is about 1~2K bytes
> > > long),
> > > > The total QPS is about 30K.
> > > > But the number of logs both consumer read and the broker counts is
> less
> > > > than the producer send. we believe the data lost when producer
> sending
> > > logs
> > > > to broker.
> > > >
> > > > We settle the QPS down to 10K, still lost logs.
> > > > We found some exceptions in broker logs:
> > > >
> > > > 9201051 [kafka-processor-2] ERROR kafka.server.KafkaRequestHandlers
>  -
> > > > Error processing ProduceRequest on abc:0
> > > > kafka.message.InvalidMessageException: message is invalid,
> compression
> > > > codec: NoCompressionCodec size: 1021 curr offset: 0 init offset: 0
> > > >     at
> > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > >     at
> > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
> > > >     at
> > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > >     at
> > > >
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > >     at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > >     at
> > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89)
> > > >     at kafka.log.Log.append(Log.scala:218)
> > > >     at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > >     at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
> > > >     at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
> > > >     at
> > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
> > > >     at kafka.network.Processor.handle(SocketServer.scala:296)
> > > >     at kafka.network.Processor.read(SocketServer.scala:319)
> > > >     at kafka.network.Processor.run(SocketServer.scala:214)
> > > >     at java.lang.Thread.run(Thread.java:636)
> > > >
> > > > Or this:
> > > >
> > > > 1406871 [kafka-processor-2] ERROR kafka.network.Processor  - Closing
> > > socket
> > > > for /10.0.2.140 because of error
> > > > java.nio.BufferUnderflowException
> > > >     at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145)
> > > >     at java.nio.ByteBuffer.get(ByteBuffer.java:692)
> > > >     at kafka.utils.Utils$.readShortString(Utils.scala:123)
> > > >     at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:29)
> > > >     at
> > > >
> > > >
> > >
> >
> kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28)
> > > >     at
> > > >
> > > >
> > >
> >
> scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
> > > >     at
> > > > scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(--
> > *Best Regards
> >
> > 向河林*
> >
>



-- 
Guodong Wang
王国栋

Reply via email to