The exception seems to be thrown here
https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L236

Is this not expected to hit often?

On Mon, Jan 25, 2016 at 9:22 PM, Rajiv Kurian <ra...@signalfx.com> wrote:

> Wanted to add that we are not using auto commit since we use custom
> partition assignments. In fact we never call  consumer.commitAsync() or
> consumer.commitSync() calls. My assumption is that since we store our own
> offsets these calls are not necessary. Hopefully this is not responsible
> for the poor performance.
>
> On Mon, Jan 25, 2016 at 9:20 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>
>> We are using the new kafka consumer with the following config (as logged
>> by kafka)
>>
>> metric.reporters = []
>>
>>         metadata.max.age.ms = 300000
>>
>>         value.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>
>>         group.id = myGroup.id
>>
>>         partition.assignment.strategy = [org.apache.kafka.clients.
>> consumer.RangeAssignor]
>>
>>         reconnect.backoff.ms = 50
>>
>>         sasl.kerberos.ticket.renew.window.factor = 0.8
>>
>>         max.partition.fetch.bytes = 2097152
>>
>>         bootstrap.servers = [myBrokerList]
>>
>>         retry.backoff.ms = 100
>>
>>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>
>>         sasl.kerberos.service.name = null
>>
>>         sasl.kerberos.ticket.renew.jitter = 0.05
>>
>>         ssl.keystore.type = JKS
>>
>>         ssl.trustmanager.algorithm = PKIX
>>
>>         enable.auto.commit = false
>>
>>         ssl.key.password = null
>>
>>         fetch.max.wait.ms = 1000
>>
>>         sasl.kerberos.min.time.before.relogin = 60000
>>
>>         connections.max.idle.ms = 540000
>>
>>         ssl.truststore.password = null
>>
>>         session.timeout.ms = 30000
>>
>>         metrics.num.samples = 2
>>
>>         client.id =
>>
>>         ssl.endpoint.identification.algorithm = null
>>
>>         key.deserializer = class sf.kafka.VoidDeserializer
>>
>>         ssl.protocol = TLS
>>
>>         check.crcs = true
>>
>>         request.timeout.ms = 40000
>>
>>         ssl.provider = null
>>
>>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>
>>         ssl.keystore.location = null
>>
>>         heartbeat.interval.ms = 3000
>>
>>         auto.commit.interval.ms = 5000
>>
>>         receive.buffer.bytes = 32768
>>
>>         ssl.cipher.suites = null
>>
>>         ssl.truststore.type = JKS
>>
>>         security.protocol = PLAINTEXT
>>
>>         ssl.truststore.location = null
>>
>>         ssl.keystore.password = null
>>
>>         ssl.keymanager.algorithm = SunX509
>>
>>         metrics.sample.window.ms = 30000
>>
>>         fetch.min.bytes = 512
>>
>>         send.buffer.bytes = 131072
>>
>>         auto.offset.reset = earliest
>>
>>
>> We use the consumer.assign() feature to assign a list of partitions and
>> call poll in a loop.  We have the following setup:
>>
>> 1. The messages have no key and we use the byte array deserializer to get
>> byte arrays from the config.
>>
>> 2. The messages themselves are on an average about 75 bytes. We get this
>> number by diving the Kafka broker bytes-in metric by the messages-in metric.
>>
>> 3. Each consumer is assigned about 64 partitions of the same topic spread
>> across three brokers.
>>
>> 4. We get very few messages per second maybe around 1-2 messages across
>> all partitions on a client right now.
>>
>> 5. We have no compression on the topic.
>>
>> Our run loop looks something like this
>>
>> while (isRunning()) {
>>
>> ConsumerRecords<Void, byte[]> records = null;
>>
>>         try {
>>
>>             // Here timeout is about 10 seconds, so it is pretty big.
>>
>>             records = consumer.poll(timeout);
>>
>>         } catch (Exception e) {
>>
>>             logger.error("Exception polling Kafka ", e);
>>
>>             records = null;
>>
>>         }
>>
>>         if (records != null) {
>>
>>             for (ConsumerRecord<Void, byte[]> record : records) {
>>
>>                // The handler puts the byte array on a very fast ring
>> buffer so it barely takes any time.
>>
>>                 handler.handleMessage(ByteBuffer.wrap(record.value()));
>>
>>             }
>>
>>         }
>>
>> }
>>
>>
>>
>> With this setup our performance has taken a horrendous hit as soon as we
>> started this one thread that just polls kafka in a loop.
>>
>> I profiled the application using Java Mission Control and have a few
>> insights.
>>
>> 1. There doesn't seem to be a single hotspot. The consumer just ends up
>> using a lot of CPU for handing such a low number of messages. Our process
>> was using 16% CPU before we added a single consumer and it went to 25% and
>> above after. That's an increase of over 50% from a single consumer getting
>> a single digit number of small messages per second. Here is an attachment
>> of the cpu usage breakdown in the consumer (the namespace is different
>> because we shade the kafka jar before using it) -
>> http://imgur.com/tHjdVnM  We've used bigger timeouts (100 seconds odd)
>> and that doesn't seem to make much of a difference either.
>>
>> 2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure
>> whether this is expected but this seems like it would completely kill
>> performance. Here is the exception tab of Java mission control.
>> http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3
>> minutes which is about 10 thousand exceptions per second! The exception
>> stack trace shows that it originates from the poll call. I don't understand
>> how it can throw so many exceptions given I call poll it with a timeout of
>> 10 seconds and get messages at about 1 per second.
>>
>> 3. The single thread seems to allocate a lot too. The single thread is
>> responsible for 17.87% of our entire JVM allocation rate. Most of what it
>> allocates seems to be those same EOFExceptions. Here is a chart showing the
>> single thread's allocation proportion: http://imgur.com/GNUJQsz Here is
>> a chart that shows a breakdown of the allocations:
>> http://imgur.com/YjCXljE About 20% of the allocations are for the
>> EOFExceptions. This seems kind of crazy especially given that this happens
>> about 10 thousand times a second. The rest of the allocations seem to be
>> spread all over but again seem excessive given how we are getting very few
>> messages.
>>
>> As a comparison, we also run a wrapper over the old SimpleConsumer that
>> gets a lot more data (10 -15 thousand 70 byte messages/sec on a different
>> topic) and it is able to handle that load without much trouble. At this
>> moment we are completely puzzled by this performance. Most of it does seem
>> to be due to the crazy volumes of exceptions. Note: Our messages seem to
>> all be making through. The exceptions are caught by Kafka's stack and never
>> bubble though to us.
>>
>> Are we doing anything wrong with how we are using the new consumer
>> (longer timeouts of a 100 second odd don't seem to help)?
>>
>> Thanks in advance,
>>
>> Rajiv
>>
>>
>>
>

Reply via email to