How many threads are you using?

Thanks,

Jun

On Wed, Mar 20, 2013 at 7:33 PM, Yang Zhou <zhou.yang.h...@gmail.com> wrote:

> Sorry, I made a mistake, we use many threads producing at same time.
>
>
> 2013/3/20 Jun Rao <jun...@gmail.com>
>
> > How many producer instances do you have? Can you reproduce the problem
> with
> > a single producer?
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Mar 20, 2013 at 12:29 AM, 王国栋 <wangg...@gmail.com> wrote:
> >
> > > Hi Jun,
> > >
> > > we do not use any compression in our test.
> > >
> > > We deploy producer and broker in the same machine. The problem still
> > > exists. We use sync producer, and send one message at a time(no batch
> > now).
> > > We find that when the qps reaches more than 40k, the exception appears.
> > So
> > > I don't think it's the underlying system error.
> > >
> > > Any suggestions if we want to do some debug on kafka
> > > serialization/deserialization?
> > >
> > > Thanks.
> > >
> > >
> > >
> > >
> > > On Wed, Mar 20, 2013 at 12:10 AM, Jun Rao <jun...@gmail.com> wrote:
> > >
> > > > It basically means that the broker is expecting to read certain
> number
> > of
> > > > bytes in a buffer received from socket, but there are fewer bytes
> than
> > > > expected in the buffer. Possible causes are (1) a bug in Kafka
> request
> > > > serialization/deserialization logic; (2) corruption in the underlying
> > > > system such as network.
> > > >
> > > > BTW, did you enable compression in your producer?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Mon, Mar 18, 2013 at 10:12 PM, Helin Xiang <xkee...@gmail.com>
> > wrote:
> > > >
> > > > > thanks Jun.
> > > > >
> > > > > we are using java producer.
> > > > > does the last exception
> > > > > "java.lang.IllegalArgumentException
> > > > >     at java.nio.Buffer.limit(Buffer.java:266)
> > > > > "
> > > > > also means the broker received corrupted messages ?  sorry i am not
> > > > > familiar with java nio.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Mar 19, 2013 at 12:58 PM, Jun Rao <jun...@gmail.com>
> wrote:
> > > > >
> > > > > > Hmm, both log4j messages suggest that the broker received some
> > > > corrupted
> > > > > > produce requests. Are you using the java producer? Also, we have
> > seen
> > > > > that
> > > > > > network router problems caused corrupted requests before.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang <xkee...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > > We were doing some performance test using kafka 0.7.2. We use
> > only
> > > 1
> > > > > > > broker.
> > > > > > > On producer client, we use 8 threads to send logs, each thread
> > use
> > > > sync
> > > > > > > producer and send 100 logs at a time, (each log is about 1~2K
> > bytes
> > > > > > long),
> > > > > > > The total QPS is about 30K.
> > > > > > > But the number of logs both consumer read and the broker counts
> > is
> > > > less
> > > > > > > than the producer send. we believe the data lost when producer
> > > > sending
> > > > > > logs
> > > > > > > to broker.
> > > > > > >
> > > > > > > We settle the QPS down to 10K, still lost logs.
> > > > > > > We found some exceptions in broker logs:
> > > > > > >
> > > > > > > 9201051 [kafka-processor-2] ERROR
> > kafka.server.KafkaRequestHandlers
> > > >  -
> > > > > > > Error processing ProduceRequest on abc:0
> > > > > > > kafka.message.InvalidMessageException: message is invalid,
> > > > compression
> > > > > > > codec: NoCompressionCodec size: 1021 curr offset: 0 init
> offset:
> > 0
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > > > > >     at
> > > > > > >
> > > > >
> > >
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > > > > >     at
> > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89)
> > > > > > >     at kafka.log.Log.append(Log.scala:218)
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
> > > > > > >     at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > > >     at kafka.network.Processor.read(SocketServer.scala:319)
> > > > > > >     at kafka.network.Processor.run(SocketServer.scala:214)
> > > > > > >     at java.lang.Thread.run(Thread.java:636)
> > > > > > >
> > > > > > > Or this:
> > > > > > >
> > > > > > > 1406871 [kafka-processor-2] ERROR kafka.network.Processor  -
> > > Closing
> > > > > > socket
> > > > > > > for /10.0.2.140 because of error
> > > > > > > java.nio.BufferUnderflowException
> > > > > > >     at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145)
> > > > > > >     at java.nio.ByteBuffer.get(ByteBuffer.java:692)
> > > > > > >     at kafka.utils.Utils$.readShortString(Utils.scala:123)
> > > > > > >     at
> > > kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:29)
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28)
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
> > > > > > >     at
> > > > > > >
> > > > >
> > >
> scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
> > > > > > >     at
> > > > > > >
> > > >
> kafka.api.MultiProducerRequest$.readFrom(MultiProducerRequest.scala:27)
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:59)
> > > > > > >
> > > > > > > Or this:
> > > > > > >
> > > > > > > 1830146 [kafka-processor-0] ERROR kafka.network.Processor  -
> > > Closing
> > > > > > socket
> > > > > > > for /10.0.2.140 because of error
> > > > > > > java.lang.IllegalArgumentException
> > > > > > >     at java.nio.Buffer.limit(Buffer.java:266)
> > > > > > >     at
> > > kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33)
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28)
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
> > > > > > >     at
> > > > > > >
> > > > >
> > >
> scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
> > > > > > >     at
> > > > > > >
> > > >
> kafka.api.MultiProducerRequest$.readFrom(MultiProducerRequest.scala:27)
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:59)
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > >     at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > >     at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > > >     at kafka.network.Processor.read(SocketServer.scala:319)
> > > > > > >     at kafka.network.Processor.run(SocketServer.scala:214)
> > > > > > >     at java.lang.Thread.run(Thread.java:636)
> > > > > > >
> > > > > > > It bothers us for a few days, and at first we thought it might
> be
> > > > some
> > > > > > > wrong configuration settings, and we changed to the wiki's
> > > > recommended
> > > > > > > configuration, but unfortunately the exceptions still came out.
> > > > > > >
> > > > > > > In what situation can these exceptions  be thrown out ? What
> can
> > we
> > > > do
> > > > > to
> > > > > > > avoid these exceptions ?
> > > > > > >
> > > > > > > THANKS
> > > > > > >
> > > > > > > --
> > > > > > > *Best Regards
> > > > > > >
> > > > > > > Xiang Helin*
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > *Best Regards
> > > > >
> > > > > 向河林*
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Guodong Wang
> > > 王国栋
> > >
> >
>
>
>
> --
>
> Yang Zhou(周阳)
>
> Department of Computer Science and Engineering
> Shanghai Jiao Tong University
>

Reply via email to