You probably also want to check the request time jmx (in particular
FetchRequest time) on the broker to see if the broker is healthy.

Thanks,

Jun

On Fri, Sep 26, 2014 at 3:02 PM, Jagbir Hooda <jho...@gmail.com> wrote:

> Hi Jun,
>
> Thanks for the info, we'll keep you posted on the above topic.
> Surprisingly in broker controller logs we also see somewhat similar
> exceptions at the same line BoundedByteBufferReceive.scala:54. (Not
> sure if there is any correlation here.)
>
> ----------------8<----------------------
> [2014-09-26 01:07:34,727] ERROR [ReplicaFetcherThread-0-2], Error in
> fetch Name: FetchRequest; Version: 0; CorrelationId: 354; ClientId:
> ReplicaFetcherThread-0-2; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1
> bytes; RequestInfo:
> [Topic0001,0] -> PartitionFetchInfo(0,1048576)
> ....
> [Topic0001,7] -> PartitionFetchInfo(0,1048576)
> ....
> [Topic1000,0] -> PartitionFetchInfo(0,1048576)
> ...
> [Topic1000,7] -> PartitionFetchInfo(0,1048576)
> (kafka.server.ReplicaFetcherThread)
> java.net.SocketTimeoutException
>         at
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
>         at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
>         at kafka.utils.Utils$.read(Utils.scala:375)
>         at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>         at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>         at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>         at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>         at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>         at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>         at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> -----------------8<----------------
>
> On Wed, Sep 24, 2014 at 9:39 PM, Jun Rao <jun...@gmail.com> wrote:
> > You can enable some trace/debug level logging to see if the thread is
> > indeed hanging in BoundedByteBufferReceive.
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Sep 24, 2014 at 8:30 AM, Jagbir Hooda <jho...@gmail.com> wrote:
> >
> >> Hi Jun,
> >>
> >> Thanks for looking into it. We use the following steps to start the
> >> consumer.
> >>
> >> 1) Create consumer connector
> >>    ConsumerConnector consumerConnector =
> >>
> kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig())
> >>
> >> 2) Create message streams
> >>    List<KafkaStream<byte[], byte[]>> streamList =
> >> consumerConnector.createMessageStreamsByFilter("topic1,topic2,etc.");
> >>
> >> 3) Get Stream iterator
> >>     KafkaStream<byte[], byte[]> stream = streamList.get(0);
> >>     ConsumerIterator<byte[], byte[]> it = stream.iterator();
> >>
> >> 4) Get messages
> >>     while(it.hasNext()) {
> >>           MessageAndMetadata<byte[], byte[]> messageAndMeta = it.peek();
> >>           // consume message
> >>           it.next();
> >>     }
> >>
> >> I'm still at loss to understand the correlation between the consumer
> >> code and the BoundedByteBufferReceive
> >> being RUNNABLE at "read += Utils.read(channel, sizeBuffer)". We took
> >> multiple thread dumps at different times (when CPU was busy) and the
> >> thread was always RUNNABLE at the same location. Intuitively reading
> >> the sizeBuffer should have been much less CPU bound then reading the
> >> contentBuffer.
> >>
> >> Thanks,
> >> Jagbir
> >>
> >> On Mon, Sep 22, 2014 at 9:26 PM, Jun Rao <jun...@gmail.com> wrote:
> >> > We allocate a new BoundedByteBufferReceive for every fetch request.
> Are
> >> you
> >> > using SimpleConsumer directly? It seems it's started by the high level
> >> > consumer through the FetchFetcher thread.
> >> >
> >> > Thanks,
> >> >
> >> > Jun
> >> >
> >> > On Mon, Sep 22, 2014 at 11:41 AM, Jagbir Hooda <jho...@gmail.com>
> wrote:
> >> >
> >> >> Note:  Re-posting the older message from another account due to
> >> >> formatting issues.
> >> >>
> >> >>
> >> >> Folks,
> >> >>
> >> >> Recently in one of our SimpleConsumer based client applications
> >> (0.8.1.1),
> >> >> we spotted a very busy CPU with almost no traffic in/out from the
> client
> >> >> and Kafka broker (1broker+1zookeeper) (the stack trace is attached at
> >> the
> >> >> end).
> >> >>
> >> >> The busy thread was invoked in a while loop anchored at the readFrom
> >> >> function
> >> >>
> >> >> ---scala/kafka/network/Transmission.scala:55-59-----
> >> >>     .....
> >> >>     while(!complete) {
> >> >>       val read = readFrom(channel)
> >> >>       trace(read + " bytes read.")
> >> >>       totalRead += read
> >> >>     }
> >> >>     ....
> >> >>
> ----------------------------------------------------------------------
> >> >>
> >> >> The readFrom funtion and the associated thread was RUNNABLE at
> >> >> "read += Utils.read(channel, sizeBuffer)" (see below)
> >> >>
> >> >> ---scala/kafka/network/BoundedByteBufferReceive.scala:49-95--------
> >> >>   ....
> >> >>   def readFrom(channel: ReadableByteChannel): Int = {
> >> >>     expectIncomplete()
> >> >>     var read = 0
> >> >>     // have we read the request size yet?
> >> >>     if(sizeBuffer.remaining > 0)
> >> >>       read += Utils.read(channel, sizeBuffer)
> >> >>     // have we allocated the request buffer yet?
> >> >>     if(contentBuffer == null && !sizeBuffer.hasRemaining) {
> >> >>       sizeBuffer.rewind()
> >> >>       val size = sizeBuffer.getInt()
> >> >>       if(size <= 0)
> >> >>         throw new InvalidRequestException("%d is not a valid request
> >> >> size.".format(size))
> >> >>       if(size > maxSize)
> >> >>         throw new InvalidRequestException("Request of length %d is
> not
> >> >> valid, it is larger than the maximum size of %d bytes.".format(size,
> >> >> maxSize))
> >> >>       contentBuffer = byteBufferAllocate(size)
> >> >>     }
> >> >>     // if we have a buffer read some stuff into it
> >> >>     if(contentBuffer != null) {
> >> >>       read = Utils.read(channel, contentBuffer)
> >> >>       // did we get everything?
> >> >>       if(!contentBuffer.hasRemaining) {
> >> >>         contentBuffer.rewind()
> >> >>         complete = true
> >> >>       }
> >> >>     }
> >> >>     read
> >> >>   }
> >> >>   .....
> >> >>
> >> >>
> >>
> ------------------------------------------------------------------------------
> >> >>
> >> >> It looks like contentBuffer size is initialized only once in
> >> >> SimpleConsumer life-cycle (we keep
> >> >> SimpleConsumer alive until the app is restarted).
> >> >>
> >> >> Wondering what's the communication pattern between the client and
> >> broker.
> >> >>
> >> >> Is our assumption that contentBuffer 'size' is only negotiated when
> >> >> SimpleConsumer
> >> >> is created true?
> >> >>
> >> >> If above is true then why the thread is BUSY at "read +=
> >> >> Utils.read(channel, sizeBuffer)"?
> >> >> (Our application was running fine for over three days and on the
> third
> >> >> day we noticed
> >> >> the busy CPU and this behavior.)
> >> >>
> >> >> If contentBuffer size is negotiated every message (or more
> frequently)
> >> >> then contentBuffer
> >> >> variable must be set to "null" somewhere to allow it to reconfigured
> >> >> (as per existing codebase).
> >> >>
> >> >> Another question is about the broker configuration setting which
> >> >> controls the contentBuffer size.
> >> >>
> >> >> I'll really appreciate if someone can help us in figuring it out.
> >> >>
> >> >> Thanks,
> >> >> jaguar
> >> >>
> >> >> ----------------BUSY THREAD--------------------------------
> >> >> "ConsumerFetcherThread-aaaa_ubuntu-1410940574038-305417a6-0-0"
> prio=10
> >> >> tid=0x00007f7e4c04e800 nid=0x738c runnable [0x00007f7e4a3e1000]
> >> >> java.lang.Thread.State: RUNNABLE
> >> >>  at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> >> >>  at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> >> >>  at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
> >> >>  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
> >> >>  - locked <0x00000000e583e670> (a sun.nio.ch.Util$2)
> >> >>  - locked <0x00000000e583e660> (a
> java.util.Collections$UnmodifiableSet)
> >> >>  - locked <0x00000000e583e1d8> (a sun.nio.ch.EPollSelectorImpl)
> >> >>  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
> >> >>  at
> >> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221)
> >> >>  - locked <0x00000000e59b69a8> (a java.lang.Object)
> >> >>  at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> >> >>  - locked <0x00000000e59b6b30> (a
> >> >> sun.nio.ch.SocketAdaptor$SocketInputStream)
> >> >>  at
> >> >>
> >>
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
> >> >>  - locked <0x00000000e59cc4f0> (a java.lang.Object)
> >> >>  at kafka.utils.Utils$.read(Utils.scala:375)
> >> >>  at
> >> >>
> >>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >> >>  at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >> >>  at
> >> >>
> >>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >> >>  at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >> >>  at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
> >> >>  at
> >> >>
> >>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> >> >>  - locked <0x00000000e591cb68> (a java.lang.Object)
> >> >>  at
> >> >>
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >> >>  at
> >> >>
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >> >>  at
> >> >>
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >> >>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >> >>  at
> >> >>
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> >> >>  at
> >> >>
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >> >>  at
> >> >>
> >>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >> >>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >> >>  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> >> >>  at
> >> >>
> >>
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> >> >>  at
> >> >>
> >>
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >> >>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >> >>
> >> >>
> >>
> ------------------------------------8<----------------------------------------
> >> >>
> >>
>

Reply via email to