Hi Mathias,

Looking at that thread dump, I think the potential culprit is this one:

TRACE 303545: (thread=200049)
sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:Unknown line)
    sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
    sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
    sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221)
sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
    kafka.utils.Utils$.read(Utils.scala:380)
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
kafka.network.Receive$class.readCompletely(Transmission.scala:56)
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
    kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
    kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:97)
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)


I see many such threads all triggered through the SimpleConsumer and ending up polling. Looking at the code, in theory, I can see why there might be a busy CPU loop generated by that code path. If my guess is right, it could be because of an issue in the implementation of how data is read off a channel in a blocking manner and I think this patch might help overcome that problem:

diff --git a/core/src/main/scala/kafka/network/Transmission.scala b/core/src/main/scala/kafka/network/Transmission.scala
index 2827103..0bab9ed 100644
--- a/core/src/main/scala/kafka/network/Transmission.scala
+++ b/core/src/main/scala/kafka/network/Transmission.scala
@@ -54,8 +54,15 @@ trait Receive extends Transmission {
     var totalRead = 0
     while(!complete) {
       val read = readFrom(channel)
-      trace(read + " bytes read.")
-      totalRead += read
+      if (read > 0) {
+        trace(read + " bytes read.")
+        totalRead += read
+      } else if (read == 0) {
+ // it's possible that nothing was read (see javadoc of ReadableByteChannel#read), from the backing channel, + // so we wait for a while before polling again, so that we don't end up with a busy CPU loop
+        // TODO: For now, this 30 milli seconds is a random value.
+        Thread.sleep(30)
+      }
     }
     totalRead
   }

Is this something that you would be able to apply against the latest 0.8.2 branch of Kafka, build the Kafka binary, try it out and see if it improves the situation?

-Jaikiran

On Monday 26 January 2015 11:35 PM, Mathias Söderberg wrote:
Hi Neha,

I sent an e-mail earlier today, but noticed now that it didn't actually go through.

Anyhow, I've attached two files, one with output from a 10 minute run and one with output from a 30 minute run. Realized that maybe I should've done one or two runs with 0.8.1.1 as well, but nevertheless.

I upgraded our staging cluster to 0.8.2.0-rc2, and I'm seeing the same CPU usage as with the beta version (basically pegging all cores). If I manage to find the time I'll do another run with hprof on the rc2 version later today.

Best regards,
Mathias

On Tue Dec 09 2014 at 10:08:21 PM Neha Narkhede <n...@confluent.io <mailto:n...@confluent.io>> wrote:

    The following should be sufficient

    java
    
-agentlib:hprof=cpu=samples,depth=100,interval=20,lineno=y,thread=y,file=kafka.hprof
    <classname>

    You would need to start the Kafka server with the settings above for
    sometime until you observe the problem.

    On Tue, Dec 9, 2014 at 3:47 AM, Mathias Söderberg <
    mathias.soederb...@gmail.com
    <mailto:mathias.soederb...@gmail.com>> wrote:

    > Hi Neha,
    >
    > Yeah sure. I'm not familiar with hprof, so any particular
    options I should
    > include or just run with defaults?
    >
    > Best regards,
    > Mathias
    >
    > On Mon Dec 08 2014 at 7:41:32 PM Neha Narkhede
    <n...@confluent.io <mailto:n...@confluent.io>> wrote:
    >
    > > Thanks for reporting the issue. Would you mind running hprof
    and sending
    > > the output?
    > >
    > > On Mon, Dec 8, 2014 at 1:25 AM, Mathias Söderberg <
    > > mathias.soederb...@gmail.com
    <mailto:mathias.soederb...@gmail.com>> wrote:
    > >
    > > > Good day,
    > > >
    > > > I upgraded a Kafka cluster from v0.8.1.1 to v0.8.2-beta and
    noticed
    > that
    > > > the CPU usage on the broker machines went up by roughly 40%,
    from ~60%
    > to
    > > > ~100% and am wondering if anyone else has experienced something
    > similar?
    > > > The load average also went up by 2x-3x.
    > > >
    > > > We're running on EC2 and the cluster currently consists of four
    > > m1.xlarge,
    > > > with roughly 1100 topics / 4000 partitions. Using Java 7
    (1.7.0_65 to
    > be
    > > > exact) and Scala 2.9.2. Configurations can be found over here:
    > > > https://gist.github.com/mthssdrbrg/7df34a795e07eef10262.
    > > >
    > > > I'm assuming that this is not expected behaviour for 0.8.2-beta?
    > > >
    > > > Best regards,
    > > > Mathias
    > > >
    > >
    > >
    > >
    > > --
    > > Thanks,
    > > Neha
    > >
    >



    --
    Thanks,
    Neha


Reply via email to