Hey Rajiv, Thanks for the detailed report. Can you go ahead and create a JIRA? I do see the exceptions locally, but not nearly at the rate that you're reporting. That might be a factor of the number of partitions, so I'll do some investigation.
-Jason On Wed, Jan 27, 2016 at 8:40 AM, Rajiv Kurian <ra...@signalfx.com> wrote: > Hi Guozhang, > > The Github link I pasted was from the 0.9.0 branch. The same line seems to > be throwing exceptions in my code built of the maven 0.9.0.0 package. Are > you saying that something else has changed higher up the call stack that > will probably not trigger so many exceptions ? > > Thanks, > Rajiv > > On Tue, Jan 26, 2016 at 10:44 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > Rajiv, > > > > Could you try to build the new consumer from 0.9.0 branch and see if the > > issue can be re-produced? > > > > Guozhang > > > > On Mon, Jan 25, 2016 at 9:46 PM, Rajiv Kurian <ra...@signalfx.com> > wrote: > > > > > The exception seems to be thrown here > > > > > > > > > https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L236 > > > > > > Is this not expected to hit often? > > > > > > On Mon, Jan 25, 2016 at 9:22 PM, Rajiv Kurian <ra...@signalfx.com> > > wrote: > > > > > > > Wanted to add that we are not using auto commit since we use custom > > > > partition assignments. In fact we never call consumer.commitAsync() > or > > > > consumer.commitSync() calls. My assumption is that since we store our > > own > > > > offsets these calls are not necessary. Hopefully this is not > > responsible > > > > for the poor performance. > > > > > > > > On Mon, Jan 25, 2016 at 9:20 PM, Rajiv Kurian <ra...@signalfx.com> > > > wrote: > > > > > > > >> We are using the new kafka consumer with the following config (as > > logged > > > >> by kafka) > > > >> > > > >> metric.reporters = [] > > > >> > > > >> metadata.max.age.ms = 300000 > > > >> > > > >> value.deserializer = class > > > >> org.apache.kafka.common.serialization.ByteArrayDeserializer > > > >> > > > >> group.id = myGroup.id > > > >> > > > >> partition.assignment.strategy = [org.apache.kafka.clients. > > > >> consumer.RangeAssignor] > > > >> > > > >> reconnect.backoff.ms = 50 > > > >> > > > >> sasl.kerberos.ticket.renew.window.factor = 0.8 > > > >> > > > >> max.partition.fetch.bytes = 2097152 > > > >> > > > >> bootstrap.servers = [myBrokerList] > > > >> > > > >> retry.backoff.ms = 100 > > > >> > > > >> sasl.kerberos.kinit.cmd = /usr/bin/kinit > > > >> > > > >> sasl.kerberos.service.name = null > > > >> > > > >> sasl.kerberos.ticket.renew.jitter = 0.05 > > > >> > > > >> ssl.keystore.type = JKS > > > >> > > > >> ssl.trustmanager.algorithm = PKIX > > > >> > > > >> enable.auto.commit = false > > > >> > > > >> ssl.key.password = null > > > >> > > > >> fetch.max.wait.ms = 1000 > > > >> > > > >> sasl.kerberos.min.time.before.relogin = 60000 > > > >> > > > >> connections.max.idle.ms = 540000 > > > >> > > > >> ssl.truststore.password = null > > > >> > > > >> session.timeout.ms = 30000 > > > >> > > > >> metrics.num.samples = 2 > > > >> > > > >> client.id = > > > >> > > > >> ssl.endpoint.identification.algorithm = null > > > >> > > > >> key.deserializer = class sf.kafka.VoidDeserializer > > > >> > > > >> ssl.protocol = TLS > > > >> > > > >> check.crcs = true > > > >> > > > >> request.timeout.ms = 40000 > > > >> > > > >> ssl.provider = null > > > >> > > > >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > > >> > > > >> ssl.keystore.location = null > > > >> > > > >> heartbeat.interval.ms = 3000 > > > >> > > > >> auto.commit.interval.ms = 5000 > > > >> > > > >> receive.buffer.bytes = 32768 > > > >> > > > >> ssl.cipher.suites = null > > > >> > > > >> ssl.truststore.type = JKS > > > >> > > > >> security.protocol = PLAINTEXT > > > >> > > > >> ssl.truststore.location = null > > > >> > > > >> ssl.keystore.password = null > > > >> > > > >> ssl.keymanager.algorithm = SunX509 > > > >> > > > >> metrics.sample.window.ms = 30000 > > > >> > > > >> fetch.min.bytes = 512 > > > >> > > > >> send.buffer.bytes = 131072 > > > >> > > > >> auto.offset.reset = earliest > > > >> > > > >> > > > >> We use the consumer.assign() feature to assign a list of partitions > > and > > > >> call poll in a loop. We have the following setup: > > > >> > > > >> 1. The messages have no key and we use the byte array deserializer > to > > > get > > > >> byte arrays from the config. > > > >> > > > >> 2. The messages themselves are on an average about 75 bytes. We get > > this > > > >> number by diving the Kafka broker bytes-in metric by the messages-in > > > metric. > > > >> > > > >> 3. Each consumer is assigned about 64 partitions of the same topic > > > spread > > > >> across three brokers. > > > >> > > > >> 4. We get very few messages per second maybe around 1-2 messages > > across > > > >> all partitions on a client right now. > > > >> > > > >> 5. We have no compression on the topic. > > > >> > > > >> Our run loop looks something like this > > > >> > > > >> while (isRunning()) { > > > >> > > > >> ConsumerRecords<Void, byte[]> records = null; > > > >> > > > >> try { > > > >> > > > >> // Here timeout is about 10 seconds, so it is pretty > big. > > > >> > > > >> records = consumer.poll(timeout); > > > >> > > > >> } catch (Exception e) { > > > >> > > > >> logger.error("Exception polling Kafka ", e); > > > >> > > > >> records = null; > > > >> > > > >> } > > > >> > > > >> if (records != null) { > > > >> > > > >> for (ConsumerRecord<Void, byte[]> record : records) { > > > >> > > > >> // The handler puts the byte array on a very fast > ring > > > >> buffer so it barely takes any time. > > > >> > > > >> > > handler.handleMessage(ByteBuffer.wrap(record.value())); > > > >> > > > >> } > > > >> > > > >> } > > > >> > > > >> } > > > >> > > > >> > > > >> > > > >> With this setup our performance has taken a horrendous hit as soon > as > > we > > > >> started this one thread that just polls kafka in a loop. > > > >> > > > >> I profiled the application using Java Mission Control and have a few > > > >> insights. > > > >> > > > >> 1. There doesn't seem to be a single hotspot. The consumer just ends > > up > > > >> using a lot of CPU for handing such a low number of messages. Our > > > process > > > >> was using 16% CPU before we added a single consumer and it went to > 25% > > > and > > > >> above after. That's an increase of over 50% from a single consumer > > > getting > > > >> a single digit number of small messages per second. Here is an > > > attachment > > > >> of the cpu usage breakdown in the consumer (the namespace is > different > > > >> because we shade the kafka jar before using it) - > > > >> http://imgur.com/tHjdVnM We've used bigger timeouts (100 seconds > > odd) > > > >> and that doesn't seem to make much of a difference either. > > > >> > > > >> 2. It also seems like Kafka throws a ton of EOFExceptions. I am not > > sure > > > >> whether this is expected but this seems like it would completely > kill > > > >> performance. Here is the exception tab of Java mission control. > > > >> http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period > of 3 > > > >> minutes which is about 10 thousand exceptions per second! The > > exception > > > >> stack trace shows that it originates from the poll call. I don't > > > understand > > > >> how it can throw so many exceptions given I call poll it with a > > timeout > > > of > > > >> 10 seconds and get messages at about 1 per second. > > > >> > > > >> 3. The single thread seems to allocate a lot too. The single thread > is > > > >> responsible for 17.87% of our entire JVM allocation rate. Most of > what > > > it > > > >> allocates seems to be those same EOFExceptions. Here is a chart > > showing > > > the > > > >> single thread's allocation proportion: http://imgur.com/GNUJQsz > Here > > is > > > >> a chart that shows a breakdown of the allocations: > > > >> http://imgur.com/YjCXljE About 20% of the allocations are for the > > > >> EOFExceptions. This seems kind of crazy especially given that this > > > happens > > > >> about 10 thousand times a second. The rest of the allocations seem > to > > be > > > >> spread all over but again seem excessive given how we are getting > very > > > few > > > >> messages. > > > >> > > > >> As a comparison, we also run a wrapper over the old SimpleConsumer > > that > > > >> gets a lot more data (10 -15 thousand 70 byte messages/sec on a > > > different > > > >> topic) and it is able to handle that load without much trouble. At > > this > > > >> moment we are completely puzzled by this performance. Most of it > does > > > seem > > > >> to be due to the crazy volumes of exceptions. Note: Our messages > seem > > to > > > >> all be making through. The exceptions are caught by Kafka's stack > and > > > never > > > >> bubble though to us. > > > >> > > > >> Are we doing anything wrong with how we are using the new consumer > > > >> (longer timeouts of a 100 second odd don't seem to help)? > > > >> > > > >> Thanks in advance, > > > >> > > > >> Rajiv > > > >> > > > >> > > > >> > > > > > > > > > > > > > > > -- > > -- Guozhang > > >