You can enable some trace/debug level logging to see if the thread is indeed hanging in BoundedByteBufferReceive.
Thanks, Jun On Wed, Sep 24, 2014 at 8:30 AM, Jagbir Hooda <jho...@gmail.com> wrote: > Hi Jun, > > Thanks for looking into it. We use the following steps to start the > consumer. > > 1) Create consumer connector > ConsumerConnector consumerConnector = > kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()) > > 2) Create message streams > List<KafkaStream<byte[], byte[]>> streamList = > consumerConnector.createMessageStreamsByFilter("topic1,topic2,etc."); > > 3) Get Stream iterator > KafkaStream<byte[], byte[]> stream = streamList.get(0); > ConsumerIterator<byte[], byte[]> it = stream.iterator(); > > 4) Get messages > while(it.hasNext()) { > MessageAndMetadata<byte[], byte[]> messageAndMeta = it.peek(); > // consume message > it.next(); > } > > I'm still at loss to understand the correlation between the consumer > code and the BoundedByteBufferReceive > being RUNNABLE at "read += Utils.read(channel, sizeBuffer)". We took > multiple thread dumps at different times (when CPU was busy) and the > thread was always RUNNABLE at the same location. Intuitively reading > the sizeBuffer should have been much less CPU bound then reading the > contentBuffer. > > Thanks, > Jagbir > > On Mon, Sep 22, 2014 at 9:26 PM, Jun Rao <jun...@gmail.com> wrote: > > We allocate a new BoundedByteBufferReceive for every fetch request. Are > you > > using SimpleConsumer directly? It seems it's started by the high level > > consumer through the FetchFetcher thread. > > > > Thanks, > > > > Jun > > > > On Mon, Sep 22, 2014 at 11:41 AM, Jagbir Hooda <jho...@gmail.com> wrote: > > > >> Note: Re-posting the older message from another account due to > >> formatting issues. > >> > >> > >> Folks, > >> > >> Recently in one of our SimpleConsumer based client applications > (0.8.1.1), > >> we spotted a very busy CPU with almost no traffic in/out from the client > >> and Kafka broker (1broker+1zookeeper) (the stack trace is attached at > the > >> end). > >> > >> The busy thread was invoked in a while loop anchored at the readFrom > >> function > >> > >> ---scala/kafka/network/Transmission.scala:55-59----- > >> ..... > >> while(!complete) { > >> val read = readFrom(channel) > >> trace(read + " bytes read.") > >> totalRead += read > >> } > >> .... > >> ---------------------------------------------------------------------- > >> > >> The readFrom funtion and the associated thread was RUNNABLE at > >> "read += Utils.read(channel, sizeBuffer)" (see below) > >> > >> ---scala/kafka/network/BoundedByteBufferReceive.scala:49-95-------- > >> .... > >> def readFrom(channel: ReadableByteChannel): Int = { > >> expectIncomplete() > >> var read = 0 > >> // have we read the request size yet? > >> if(sizeBuffer.remaining > 0) > >> read += Utils.read(channel, sizeBuffer) > >> // have we allocated the request buffer yet? > >> if(contentBuffer == null && !sizeBuffer.hasRemaining) { > >> sizeBuffer.rewind() > >> val size = sizeBuffer.getInt() > >> if(size <= 0) > >> throw new InvalidRequestException("%d is not a valid request > >> size.".format(size)) > >> if(size > maxSize) > >> throw new InvalidRequestException("Request of length %d is not > >> valid, it is larger than the maximum size of %d bytes.".format(size, > >> maxSize)) > >> contentBuffer = byteBufferAllocate(size) > >> } > >> // if we have a buffer read some stuff into it > >> if(contentBuffer != null) { > >> read = Utils.read(channel, contentBuffer) > >> // did we get everything? > >> if(!contentBuffer.hasRemaining) { > >> contentBuffer.rewind() > >> complete = true > >> } > >> } > >> read > >> } > >> ..... > >> > >> > ------------------------------------------------------------------------------ > >> > >> It looks like contentBuffer size is initialized only once in > >> SimpleConsumer life-cycle (we keep > >> SimpleConsumer alive until the app is restarted). > >> > >> Wondering what's the communication pattern between the client and > broker. > >> > >> Is our assumption that contentBuffer 'size' is only negotiated when > >> SimpleConsumer > >> is created true? > >> > >> If above is true then why the thread is BUSY at "read += > >> Utils.read(channel, sizeBuffer)"? > >> (Our application was running fine for over three days and on the third > >> day we noticed > >> the busy CPU and this behavior.) > >> > >> If contentBuffer size is negotiated every message (or more frequently) > >> then contentBuffer > >> variable must be set to "null" somewhere to allow it to reconfigured > >> (as per existing codebase). > >> > >> Another question is about the broker configuration setting which > >> controls the contentBuffer size. > >> > >> I'll really appreciate if someone can help us in figuring it out. > >> > >> Thanks, > >> jaguar > >> > >> ----------------BUSY THREAD-------------------------------- > >> "ConsumerFetcherThread-aaaa_ubuntu-1410940574038-305417a6-0-0" prio=10 > >> tid=0x00007f7e4c04e800 nid=0x738c runnable [0x00007f7e4a3e1000] > >> java.lang.Thread.State: RUNNABLE > >> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > >> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) > >> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) > >> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) > >> - locked <0x00000000e583e670> (a sun.nio.ch.Util$2) > >> - locked <0x00000000e583e660> (a java.util.Collections$UnmodifiableSet) > >> - locked <0x00000000e583e1d8> (a sun.nio.ch.EPollSelectorImpl) > >> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) > >> at > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221) > >> - locked <0x00000000e59b69a8> (a java.lang.Object) > >> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > >> - locked <0x00000000e59b6b30> (a > >> sun.nio.ch.SocketAdaptor$SocketInputStream) > >> at > >> > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) > >> - locked <0x00000000e59cc4f0> (a java.lang.Object) > >> at kafka.utils.Utils$.read(Utils.scala:375) > >> at > >> > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > >> at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > >> at > >> > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > >> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > >> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73) > >> at > >> > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) > >> - locked <0x00000000e591cb68> (a java.lang.Object) > >> at > >> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) > >> at > >> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > >> at > >> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > >> at > >> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) > >> at > >> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > >> at > >> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > >> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) > >> at > >> > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) > >> at > >> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > >> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > >> > >> > ------------------------------------8<---------------------------------------- > >> >