I'm sorry about the formatting issues below:-(I need to stop using hotmail as
the hotmail is mangling the message formatting:-(I'll try re-posting from my
gmail address.
Jagbir
> From: jsho...@hotmail.com
> To: users@kafka.apache.org
> Subject: Busy CPU while negotiating contentBuffer size at
> BoundedByteBufferReceive.scala:54
> Date: Mon, 22 Sep 2014 12:05:47 -0400
>
> Folks,
> Recently in one of our SimpleConsumer based client applications (0.8.1.1),we
> spotted a very busy CPU with almost no traffic in/out from the clientand
> Kafka broker (1broker+1zookeeper) (the stack trace is attached at the end).
> The busy thread was invoked in a while loop anchored at the readFrom function
> ---scala/kafka/network/Transmission.scala:55-59-----------------------
> ..... while(!complete) { val read = readFrom(channel) trace(read
> + " bytes read.") totalRead += read }
> ....----------------------------------------------------------------------
> The readFrom funtion and the associated thread was RUNNABLE at"read +=
> Utils.read(channel, sizeBuffer)" (see below)
> ---scala/kafka/network/BoundedByteBufferReceive.scala:49-95-------- ....
> def readFrom(channel: ReadableByteChannel): Int = { expectIncomplete()
> var read = 0 // have we read the request size yet?
> if(sizeBuffer.remaining > 0) read += Utils.read(channel, sizeBuffer)
> // have we allocated the request buffer yet? if(contentBuffer == null &&
> !sizeBuffer.hasRemaining) { sizeBuffer.rewind() val size =
> sizeBuffer.getInt() if(size <= 0) throw new
> InvalidRequestException("%d is not a valid request size.".format(size))
> if(size > maxSize) throw new InvalidRequestException("Request of
> length %d is not valid, it is larger than the maximum size of %d
> bytes.".format(size, maxSize)) contentBuffer = byteBufferAllocate(size)
> } // if we have a buffer read some stuff into it if(contentBuffer !=
> null) { read = Utils.read(channel, contentBuffer) // did we get
> everything? if(!contentBuffer.hasRemaining) {
> contentBuffer.rewind() complete = true } } read }
> .....----------------------------------------------------------------------------------------------
> It looks like contentBuffer size is initialized only once in SimpleConsumer
> life-cycle (we keepSimpleConsumer alive until the app is restarted).
> Wondering what's the communication pattern between the client and broker.
> Is our assumption that contentBuffer 'size' is only negotiated when
> SimpleConsumeris created true?
> If above is true then why the thread is BUSY at "read += Utils.read(channel,
> sizeBuffer)"?(Our application was running fine for over three days and on the
> third day we noticedthe busy CPU and this behavior.)
> If contentBuffer size is negotiated every message (or more frequently) then
> contentBuffervariable must be set to "null" somewhere to allow it to
> reconfigured (as per existing codebase).
> Another question is about the broker configuration setting which controls the
> contentBuffer size.
> I'll really appreciate if someone can help us in figuring it out.
> Thanks,jagbir
> ----------------BUSY
> THREAD------------------------------------------------------------"ConsumerFetcherThread-aaaa_ubuntu-1410940574038-305417a6-0-0"
> prio=10tid=0x00007f7e4c04e800 nid=0x738c runnable [0x00007f7e4a3e1000]
> java.lang.Thread.State: RUNNABLE at
> sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at
> sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at
> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) at
> sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) - locked
> <0x00000000e583e670> (a sun.nio.ch.Util$2) - locked <0x00000000e583e660> (a
> java.util.Collections$UnmodifiableSet) - locked <0x00000000e583e1d8> (a
> sun.nio.ch.EPollSelectorImpl) at
> sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) at
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221) -
> locked <0x00000000e59b69a8> (a java.lang.Object) at
> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) - locked
> <0x00000000e59b6b30> (a sun.nio.ch.SocketAdaptor$SocketInputStream) at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) -
> locked <0x00000000e59cc4f0> (a java.lang.Object) at
> kafka.utils.Utils$.read(Utils.scala:375) at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73) at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> - locked <0x00000000e591cb68> (a java.lang.Object) at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at
> kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)-------------------------------------------------8<-------------------------------------------------
>