Hi Mathias,
Looking at that thread dump, I think the potential culprit is this one:
TRACE 303545: (thread=200049)
sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:Unknown line)
sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221)
sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
kafka.utils.Utils$.read(Utils.scala:380)
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
kafka.network.Receive$class.readCompletely(Transmission.scala:56)
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:97)
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
I see many such threads all triggered through the SimpleConsumer and
ending up polling. Looking at the code, in theory, I can see why there
might be a busy CPU loop generated by that code path. If my guess is
right, it could be because of an issue in the implementation of how data
is read off a channel in a blocking manner and I think this patch might
help overcome that problem:
diff --git a/core/src/main/scala/kafka/network/Transmission.scala
b/core/src/main/scala/kafka/network/Transmission.scala
index 2827103..0bab9ed 100644
--- a/core/src/main/scala/kafka/network/Transmission.scala
+++ b/core/src/main/scala/kafka/network/Transmission.scala
@@ -54,8 +54,15 @@ trait Receive extends Transmission {
var totalRead = 0
while(!complete) {
val read = readFrom(channel)
- trace(read + " bytes read.")
- totalRead += read
+ if (read > 0) {
+ trace(read + " bytes read.")
+ totalRead += read
+ } else if (read == 0) {
+ // it's possible that nothing was read (see javadoc of
ReadableByteChannel#read), from the backing channel,
+ // so we wait for a while before polling again, so that we
don't end up with a busy CPU loop
+ // TODO: For now, this 30 milli seconds is a random value.
+ Thread.sleep(30)
+ }
}
totalRead
}
Is this something that you would be able to apply against the latest
0.8.2 branch of Kafka, build the Kafka binary, try it out and see if it
improves the situation?
-Jaikiran
On Monday 26 January 2015 11:35 PM, Mathias Söderberg wrote:
Hi Neha,
I sent an e-mail earlier today, but noticed now that it didn't
actually go through.
Anyhow, I've attached two files, one with output from a 10 minute run
and one with output from a 30 minute run. Realized that maybe I
should've done one or two runs with 0.8.1.1 as well, but nevertheless.
I upgraded our staging cluster to 0.8.2.0-rc2, and I'm seeing the same
CPU usage as with the beta version (basically pegging all cores). If I
manage to find the time I'll do another run with hprof on the rc2
version later today.
Best regards,
Mathias
On Tue Dec 09 2014 at 10:08:21 PM Neha Narkhede <n...@confluent.io
<mailto:n...@confluent.io>> wrote:
The following should be sufficient
java
-agentlib:hprof=cpu=samples,depth=100,interval=20,lineno=y,thread=y,file=kafka.hprof
<classname>
You would need to start the Kafka server with the settings above for
sometime until you observe the problem.
On Tue, Dec 9, 2014 at 3:47 AM, Mathias Söderberg <
mathias.soederb...@gmail.com
<mailto:mathias.soederb...@gmail.com>> wrote:
> Hi Neha,
>
> Yeah sure. I'm not familiar with hprof, so any particular
options I should
> include or just run with defaults?
>
> Best regards,
> Mathias
>
> On Mon Dec 08 2014 at 7:41:32 PM Neha Narkhede
<n...@confluent.io <mailto:n...@confluent.io>> wrote:
>
> > Thanks for reporting the issue. Would you mind running hprof
and sending
> > the output?
> >
> > On Mon, Dec 8, 2014 at 1:25 AM, Mathias Söderberg <
> > mathias.soederb...@gmail.com
<mailto:mathias.soederb...@gmail.com>> wrote:
> >
> > > Good day,
> > >
> > > I upgraded a Kafka cluster from v0.8.1.1 to v0.8.2-beta and
noticed
> that
> > > the CPU usage on the broker machines went up by roughly 40%,
from ~60%
> to
> > > ~100% and am wondering if anyone else has experienced something
> similar?
> > > The load average also went up by 2x-3x.
> > >
> > > We're running on EC2 and the cluster currently consists of four
> > m1.xlarge,
> > > with roughly 1100 topics / 4000 partitions. Using Java 7
(1.7.0_65 to
> be
> > > exact) and Scala 2.9.2. Configurations can be found over here:
> > > https://gist.github.com/mthssdrbrg/7df34a795e07eef10262.
> > >
> > > I'm assuming that this is not expected behaviour for 0.8.2-beta?
> > >
> > > Best regards,
> > > Mathias
> > >
> >
> >
> >
> > --
> > Thanks,
> > Neha
> >
>
--
Thanks,
Neha