You can enable some trace/debug level logging to see if the thread is
indeed hanging in BoundedByteBufferReceive.

Thanks,

Jun

On Wed, Sep 24, 2014 at 8:30 AM, Jagbir Hooda <jho...@gmail.com> wrote:

> Hi Jun,
>
> Thanks for looking into it. We use the following steps to start the
> consumer.
>
> 1) Create consumer connector
>    ConsumerConnector consumerConnector =
> kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig())
>
> 2) Create message streams
>    List<KafkaStream<byte[], byte[]>> streamList =
> consumerConnector.createMessageStreamsByFilter("topic1,topic2,etc.");
>
> 3) Get Stream iterator
>     KafkaStream<byte[], byte[]> stream = streamList.get(0);
>     ConsumerIterator<byte[], byte[]> it = stream.iterator();
>
> 4) Get messages
>     while(it.hasNext()) {
>           MessageAndMetadata<byte[], byte[]> messageAndMeta = it.peek();
>           // consume message
>           it.next();
>     }
>
> I'm still at loss to understand the correlation between the consumer
> code and the BoundedByteBufferReceive
> being RUNNABLE at "read += Utils.read(channel, sizeBuffer)". We took
> multiple thread dumps at different times (when CPU was busy) and the
> thread was always RUNNABLE at the same location. Intuitively reading
> the sizeBuffer should have been much less CPU bound then reading the
> contentBuffer.
>
> Thanks,
> Jagbir
>
> On Mon, Sep 22, 2014 at 9:26 PM, Jun Rao <jun...@gmail.com> wrote:
> > We allocate a new BoundedByteBufferReceive for every fetch request. Are
> you
> > using SimpleConsumer directly? It seems it's started by the high level
> > consumer through the FetchFetcher thread.
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Sep 22, 2014 at 11:41 AM, Jagbir Hooda <jho...@gmail.com> wrote:
> >
> >> Note:  Re-posting the older message from another account due to
> >> formatting issues.
> >>
> >>
> >> Folks,
> >>
> >> Recently in one of our SimpleConsumer based client applications
> (0.8.1.1),
> >> we spotted a very busy CPU with almost no traffic in/out from the client
> >> and Kafka broker (1broker+1zookeeper) (the stack trace is attached at
> the
> >> end).
> >>
> >> The busy thread was invoked in a while loop anchored at the readFrom
> >> function
> >>
> >> ---scala/kafka/network/Transmission.scala:55-59-----
> >>     .....
> >>     while(!complete) {
> >>       val read = readFrom(channel)
> >>       trace(read + " bytes read.")
> >>       totalRead += read
> >>     }
> >>     ....
> >> ----------------------------------------------------------------------
> >>
> >> The readFrom funtion and the associated thread was RUNNABLE at
> >> "read += Utils.read(channel, sizeBuffer)" (see below)
> >>
> >> ---scala/kafka/network/BoundedByteBufferReceive.scala:49-95--------
> >>   ....
> >>   def readFrom(channel: ReadableByteChannel): Int = {
> >>     expectIncomplete()
> >>     var read = 0
> >>     // have we read the request size yet?
> >>     if(sizeBuffer.remaining > 0)
> >>       read += Utils.read(channel, sizeBuffer)
> >>     // have we allocated the request buffer yet?
> >>     if(contentBuffer == null && !sizeBuffer.hasRemaining) {
> >>       sizeBuffer.rewind()
> >>       val size = sizeBuffer.getInt()
> >>       if(size <= 0)
> >>         throw new InvalidRequestException("%d is not a valid request
> >> size.".format(size))
> >>       if(size > maxSize)
> >>         throw new InvalidRequestException("Request of length %d is not
> >> valid, it is larger than the maximum size of %d bytes.".format(size,
> >> maxSize))
> >>       contentBuffer = byteBufferAllocate(size)
> >>     }
> >>     // if we have a buffer read some stuff into it
> >>     if(contentBuffer != null) {
> >>       read = Utils.read(channel, contentBuffer)
> >>       // did we get everything?
> >>       if(!contentBuffer.hasRemaining) {
> >>         contentBuffer.rewind()
> >>         complete = true
> >>       }
> >>     }
> >>     read
> >>   }
> >>   .....
> >>
> >>
> ------------------------------------------------------------------------------
> >>
> >> It looks like contentBuffer size is initialized only once in
> >> SimpleConsumer life-cycle (we keep
> >> SimpleConsumer alive until the app is restarted).
> >>
> >> Wondering what's the communication pattern between the client and
> broker.
> >>
> >> Is our assumption that contentBuffer 'size' is only negotiated when
> >> SimpleConsumer
> >> is created true?
> >>
> >> If above is true then why the thread is BUSY at "read +=
> >> Utils.read(channel, sizeBuffer)"?
> >> (Our application was running fine for over three days and on the third
> >> day we noticed
> >> the busy CPU and this behavior.)
> >>
> >> If contentBuffer size is negotiated every message (or more frequently)
> >> then contentBuffer
> >> variable must be set to "null" somewhere to allow it to reconfigured
> >> (as per existing codebase).
> >>
> >> Another question is about the broker configuration setting which
> >> controls the contentBuffer size.
> >>
> >> I'll really appreciate if someone can help us in figuring it out.
> >>
> >> Thanks,
> >> jaguar
> >>
> >> ----------------BUSY THREAD--------------------------------
> >> "ConsumerFetcherThread-aaaa_ubuntu-1410940574038-305417a6-0-0" prio=10
> >> tid=0x00007f7e4c04e800 nid=0x738c runnable [0x00007f7e4a3e1000]
> >> java.lang.Thread.State: RUNNABLE
> >>  at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> >>  at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> >>  at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
> >>  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
> >>  - locked <0x00000000e583e670> (a sun.nio.ch.Util$2)
> >>  - locked <0x00000000e583e660> (a java.util.Collections$UnmodifiableSet)
> >>  - locked <0x00000000e583e1d8> (a sun.nio.ch.EPollSelectorImpl)
> >>  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
> >>  at
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221)
> >>  - locked <0x00000000e59b69a8> (a java.lang.Object)
> >>  at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> >>  - locked <0x00000000e59b6b30> (a
> >> sun.nio.ch.SocketAdaptor$SocketInputStream)
> >>  at
> >>
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
> >>  - locked <0x00000000e59cc4f0> (a java.lang.Object)
> >>  at kafka.utils.Utils$.read(Utils.scala:375)
> >>  at
> >>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >>  at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >>  at
> >>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >>  at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >>  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
> >>  at
> >>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> >>  - locked <0x00000000e591cb68> (a java.lang.Object)
> >>  at
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >>  at
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >>  at
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >>  at
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> >>  at
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >>  at
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >>  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> >>  at
> >>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> >>  at
> >>
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >>
> >>
> ------------------------------------8<----------------------------------------
> >>
>

Reply via email to