Folks,
Recently in one of our SimpleConsumer based client applications (0.8.1.1),we 
spotted a very busy CPU with almost no traffic in/out from the clientand Kafka 
broker (1broker+1zookeeper) (the stack trace is attached at the end).
The busy thread was invoked in a while loop anchored at the readFrom function
---scala/kafka/network/Transmission.scala:55-59-----------------------    ..... 
   while(!complete) {      val read = readFrom(channel)      trace(read + " 
bytes read.")      totalRead += read    }    
....----------------------------------------------------------------------
The readFrom funtion and the associated thread was RUNNABLE at"read += 
Utils.read(channel, sizeBuffer)" (see below)
---scala/kafka/network/BoundedByteBufferReceive.scala:49-95--------  ....  def 
readFrom(channel: ReadableByteChannel): Int = {    expectIncomplete()    var 
read = 0    // have we read the request size yet?    if(sizeBuffer.remaining > 
0)      read += Utils.read(channel, sizeBuffer)    // have we allocated the 
request buffer yet?    if(contentBuffer == null && !sizeBuffer.hasRemaining) {  
    sizeBuffer.rewind()      val size = sizeBuffer.getInt()      if(size <= 0)  
      throw new InvalidRequestException("%d is not a valid request 
size.".format(size))      if(size > maxSize)        throw new 
InvalidRequestException("Request of length %d is not valid, it is larger than 
the maximum size of %d bytes.".format(size, maxSize))      contentBuffer = 
byteBufferAllocate(size)    }    // if we have a buffer read some stuff into it 
   if(contentBuffer != null) {      read = Utils.read(channel, contentBuffer)   
   // did we get everything?      if(!contentBuffer.hasRemaining) {        
contentBuffer.rewind()        complete = true      }    }    read  }  
.....----------------------------------------------------------------------------------------------
It looks like contentBuffer size is initialized only once in SimpleConsumer 
life-cycle (we keepSimpleConsumer alive until the app is restarted).
Wondering what's the communication pattern between the client and broker.
Is our assumption that contentBuffer 'size' is only negotiated when 
SimpleConsumeris created true?
If above is true then why the thread is BUSY at "read += Utils.read(channel, 
sizeBuffer)"?(Our application was running fine for over three days and on the 
third day we noticedthe busy CPU and this behavior.)
If contentBuffer size is negotiated every message (or more frequently) then 
contentBuffervariable must be set to "null" somewhere to allow it to 
reconfigured (as per existing codebase).
Another question is about the broker configuration setting which controls the 
contentBuffer size.
I'll really appreciate if someone can help us in figuring it out.
Thanks,jagbir 
----------------BUSY 
THREAD------------------------------------------------------------"ConsumerFetcherThread-aaaa_ubuntu-1410940574038-305417a6-0-0"
 prio=10tid=0x00007f7e4c04e800 nid=0x738c runnable [0x00007f7e4a3e1000] 
java.lang.Thread.State: RUNNABLE at 
sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at 
sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at 
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) at 
sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) - locked 
<0x00000000e583e670> (a sun.nio.ch.Util$2) - locked <0x00000000e583e660> (a 
java.util.Collections$UnmodifiableSet) - locked <0x00000000e583e1d8> (a 
sun.nio.ch.EPollSelectorImpl) at 
sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) at 
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221) - 
locked <0x00000000e59b69a8> (a java.lang.Object) at 
sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) - locked 
<0x00000000e59b6b30> (a sun.nio.ch.SocketAdaptor$SocketInputStream) at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) - 
locked <0x00000000e59cc4f0> (a java.lang.Object) at 
kafka.utils.Utils$.read(Utils.scala:375) at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at 
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
 at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at 
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73) at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
 - locked <0x00000000e591cb68> (a java.lang.Object) at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
 at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
 at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
 at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) 
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) 
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at 
kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) 
at 
kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)-------------------------------------------------8<-------------------------------------------------
                                          

Reply via email to