I'm sorry about the formatting issues below:-(I need to stop using hotmail as 
the hotmail is mangling the message formatting:-(I'll try re-posting from my 
gmail address.
Jagbir
> From: jsho...@hotmail.com
> To: users@kafka.apache.org
> Subject: Busy CPU while negotiating contentBuffer size at 
> BoundedByteBufferReceive.scala:54
> Date: Mon, 22 Sep 2014 12:05:47 -0400
> 
> Folks,
> Recently in one of our SimpleConsumer based client applications (0.8.1.1),we 
> spotted a very busy CPU with almost no traffic in/out from the clientand 
> Kafka broker (1broker+1zookeeper) (the stack trace is attached at the end).
> The busy thread was invoked in a while loop anchored at the readFrom function
> ---scala/kafka/network/Transmission.scala:55-59-----------------------    
> .....    while(!complete) {      val read = readFrom(channel)      trace(read 
> + " bytes read.")      totalRead += read    }    
> ....----------------------------------------------------------------------
> The readFrom funtion and the associated thread was RUNNABLE at"read += 
> Utils.read(channel, sizeBuffer)" (see below)
> ---scala/kafka/network/BoundedByteBufferReceive.scala:49-95--------  ....  
> def readFrom(channel: ReadableByteChannel): Int = {    expectIncomplete()    
> var read = 0    // have we read the request size yet?    
> if(sizeBuffer.remaining > 0)      read += Utils.read(channel, sizeBuffer)    
> // have we allocated the request buffer yet?    if(contentBuffer == null && 
> !sizeBuffer.hasRemaining) {      sizeBuffer.rewind()      val size = 
> sizeBuffer.getInt()      if(size <= 0)        throw new 
> InvalidRequestException("%d is not a valid request size.".format(size))      
> if(size > maxSize)        throw new InvalidRequestException("Request of 
> length %d is not valid, it is larger than the maximum size of %d 
> bytes.".format(size, maxSize))      contentBuffer = byteBufferAllocate(size)  
>   }    // if we have a buffer read some stuff into it    if(contentBuffer != 
> null) {      read = Utils.read(channel, contentBuffer)      // did we get 
> everything?      if(!contentBuffer.hasRemaining) {        
> contentBuffer.rewind()        complete = true      }    }    read  }  
> .....----------------------------------------------------------------------------------------------
> It looks like contentBuffer size is initialized only once in SimpleConsumer 
> life-cycle (we keepSimpleConsumer alive until the app is restarted).
> Wondering what's the communication pattern between the client and broker.
> Is our assumption that contentBuffer 'size' is only negotiated when 
> SimpleConsumeris created true?
> If above is true then why the thread is BUSY at "read += Utils.read(channel, 
> sizeBuffer)"?(Our application was running fine for over three days and on the 
> third day we noticedthe busy CPU and this behavior.)
> If contentBuffer size is negotiated every message (or more frequently) then 
> contentBuffervariable must be set to "null" somewhere to allow it to 
> reconfigured (as per existing codebase).
> Another question is about the broker configuration setting which controls the 
> contentBuffer size.
> I'll really appreciate if someone can help us in figuring it out.
> Thanks,jagbir 
> ----------------BUSY 
> THREAD------------------------------------------------------------"ConsumerFetcherThread-aaaa_ubuntu-1410940574038-305417a6-0-0"
>  prio=10tid=0x00007f7e4c04e800 nid=0x738c runnable [0x00007f7e4a3e1000] 
> java.lang.Thread.State: RUNNABLE at 
> sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at 
> sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at 
> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) at 
> sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) - locked 
> <0x00000000e583e670> (a sun.nio.ch.Util$2) - locked <0x00000000e583e660> (a 
> java.util.Collections$UnmodifiableSet) - locked <0x00000000e583e1d8> (a 
> sun.nio.ch.EPollSelectorImpl) at 
> sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221) - 
> locked <0x00000000e59b69a8> (a java.lang.Object) at 
> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) - locked 
> <0x00000000e59b6b30> (a sun.nio.ch.SocketAdaptor$SocketInputStream) at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) - 
> locked <0x00000000e59cc4f0> (a java.lang.Object) at 
> kafka.utils.Utils$.read(Utils.scala:375) at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>  at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>  at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at 
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73) at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>  - locked <0x00000000e591cb68> (a java.lang.Object) at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>  at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>  at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>  at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>  at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at 
> kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) 
> at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)-------------------------------------------------8<-------------------------------------------------
>                                         
                                          

Reply via email to