[ 
https://issues.apache.org/jira/browse/KAFKA-3159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajiv Kurian updated KAFKA-3159:
--------------------------------
    Description: 
We are using the new kafka consumer with the following config (as logged by 
kafka)

metric.reporters = []

        metadata.max.age.ms = 300000

        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

        group.id = myGroup.id

        partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]

        reconnect.backoff.ms = 50

        sasl.kerberos.ticket.renew.window.factor = 0.8

        max.partition.fetch.bytes = 2097152

        bootstrap.servers = [myBrokerList]

        retry.backoff.ms = 100

        sasl.kerberos.kinit.cmd = /usr/bin/kinit

        sasl.kerberos.service.name = null

        sasl.kerberos.ticket.renew.jitter = 0.05

        ssl.keystore.type = JKS

        ssl.trustmanager.algorithm = PKIX

        enable.auto.commit = false

        ssl.key.password = null

        fetch.max.wait.ms = 1000

        sasl.kerberos.min.time.before.relogin = 60000

        connections.max.idle.ms = 540000

        ssl.truststore.password = null

        session.timeout.ms = 30000

        metrics.num.samples = 2

        client.id = 

        ssl.endpoint.identification.algorithm = null

        key.deserializer = class sf.kafka.VoidDeserializer

        ssl.protocol = TLS

        check.crcs = true

        request.timeout.ms = 40000

        ssl.provider = null

        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

        ssl.keystore.location = null

        heartbeat.interval.ms = 3000

        auto.commit.interval.ms = 5000

        receive.buffer.bytes = 32768

        ssl.cipher.suites = null

        ssl.truststore.type = JKS

        security.protocol = PLAINTEXT

        ssl.truststore.location = null

        ssl.keystore.password = null

        ssl.keymanager.algorithm = SunX509

        metrics.sample.window.ms = 30000

        fetch.min.bytes = 512

        send.buffer.bytes = 131072

        auto.offset.reset = earliest


We use the consumer.assign() feature to assign a list of partitions and call 
poll in a loop.  We have the following setup:

1. The messages have no key and we use the byte array deserializer to get byte 
arrays from the config.

2. The messages themselves are on an average about 75 bytes. We get this number 
by dividing the Kafka broker bytes-in metric by the messages-in metric.

3. Each consumer is assigned about 64 partitions of the same topic spread 
across three brokers.

4. We get very few messages per second maybe around 1-2 messages across all 
partitions on a client right now.

5. We have no compression on the topic.

Our run loop looks something like this

while (isRunning()) {

ConsumerRecords<Void, byte[]> records = null;
        try {
            // Here timeout is about 10 seconds, so it is pretty big.
            records = consumer.poll(timeout);
        } catch (Exception e) {
           // This never hits for us
            logger.error("Exception polling Kafka ", e);
            records = null;
        }

        if (records != null) {
            for (ConsumerRecord<Void, byte[]> record : records) {
               // The handler puts the byte array on a very fast ring buffer so 
it barely takes any time.
                handler.handleMessage(ByteBuffer.wrap(record.value()));
            }
        }
}


With this setup our performance has taken a horrendous hit as soon as we 
started this one thread that just polls Kafka in a loop.

I profiled the application using Java Mission Control and have a few insights.

1. There doesn't seem to be a single hotspot. The consumer just ends up using a 
lot of CPU for handing such a low number of messages. Our process was using 16% 
CPU before we added a single consumer and it went to 25% and above after. 
That's an increase of over 50% from a single consumer getting a single digit 
number of small messages per second. Here is an attachment of the cpu usage 
breakdown in the consumer (the namespace is different because we shade the 
kafka jar before using it) - http://imgur.com/BxWs9Q0 So 20.54% of our entire 
process CPU is used on polling these 64 partitions (across 3 brokers) with 
single digit number of 70-80 byte odd messages.  We've used bigger timeouts 
(100 seconds odd) and that doesn't seem to make much of a difference either.

2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure 
whether this is expected but this seems like it would completely kill 
performance. Here is the exception tab of Java mission control. 
http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3 minutes 
which is about 10 thousand exceptions per second! The exception stack trace 
shows that it originates from the poll call. I don't understand how it can 
throw so many exceptions given I call poll it with a timeout of 10 seconds and 
get a single digit number of messages per second. The exception seems to be 
thrown from here: 
https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L236

3. The single thread seems to allocate a lot too. The single thread is 
responsible for 17.87% of our entire JVM allocation rate. During other runs it 
has gone up to 20% of our entire JVM allocation rate. Most of what it allocates 
seems to be those same EOFExceptions. Here is a chart showing the single 
thread's allocation proportion: http://imgur.com/GNUJQsz Here is a chart that 
shows a breakdown of the allocations: http://imgur.com/YjCXljE About 20% of the 
allocations are for the EOFExceptions. But given that the 20% of the 
allocations (exceptions) is around 10k/second, the thread itself is allocating 
about 50k objects/second which seems excessive given how we are getting very 
few messages.

As a comparison, we also run a wrapper over the old SimpleConsumer that gets a 
lot more data (30 thousand 70 byte messages/sec on a different topic) and it is 
able to handle that load without much trouble. At this moment we are completely 
puzzled by this performance. At least some part of that seems to be due to the 
crazy volumes of exceptions but the CPU profiling breakdown seems to suggest 
that there are plenty of other causes including the initFetches call and the 
ConsumerNetworkClient.poll call. Note: Our messages seem to all be making 
through. We haven't measured the end to end latency. The exceptions are caught 
by Kafka's stack and never bubble up to us.

  was:
We are using the new kafka consumer with the following config (as logged by 
kafka)

metric.reporters = []

        metadata.max.age.ms = 300000

        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

        group.id = myGroup.id

        partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]

        reconnect.backoff.ms = 50

        sasl.kerberos.ticket.renew.window.factor = 0.8

        max.partition.fetch.bytes = 2097152

        bootstrap.servers = [myBrokerList]

        retry.backoff.ms = 100

        sasl.kerberos.kinit.cmd = /usr/bin/kinit

        sasl.kerberos.service.name = null

        sasl.kerberos.ticket.renew.jitter = 0.05

        ssl.keystore.type = JKS

        ssl.trustmanager.algorithm = PKIX

        enable.auto.commit = false

        ssl.key.password = null

        fetch.max.wait.ms = 1000

        sasl.kerberos.min.time.before.relogin = 60000

        connections.max.idle.ms = 540000

        ssl.truststore.password = null

        session.timeout.ms = 30000

        metrics.num.samples = 2

        client.id = 

        ssl.endpoint.identification.algorithm = null

        key.deserializer = class sf.kafka.VoidDeserializer

        ssl.protocol = TLS

        check.crcs = true

        request.timeout.ms = 40000

        ssl.provider = null

        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

        ssl.keystore.location = null

        heartbeat.interval.ms = 3000

        auto.commit.interval.ms = 5000

        receive.buffer.bytes = 32768

        ssl.cipher.suites = null

        ssl.truststore.type = JKS

        security.protocol = PLAINTEXT

        ssl.truststore.location = null

        ssl.keystore.password = null

        ssl.keymanager.algorithm = SunX509

        metrics.sample.window.ms = 30000

        fetch.min.bytes = 512

        send.buffer.bytes = 131072

        auto.offset.reset = earliest


We use the consumer.assign() feature to assign a list of partitions and call 
poll in a loop.  We have the following setup:

1. The messages have no key and we use the byte array deserializer to get byte 
arrays from the config.

2. The messages themselves are on an average about 75 bytes. We get this number 
by dividing the Kafka broker bytes-in metric by the messages-in metric.

3. Each consumer is assigned about 64 partitions of the same topic spread 
across three brokers.

4. We get very few messages per second maybe around 1-2 messages across all 
partitions on a client right now.

5. We have no compression on the topic.

Our run loop looks something like this

while (isRunning()) {

ConsumerRecords<Void, byte[]> records = null;
        try {
            // Here timeout is about 10 seconds, so it is pretty big.
            records = consumer.poll(timeout);
        } catch (Exception e) {
           // This never hits for us
            logger.error("Exception polling Kafka ", e);
            records = null;
        }

        if (records != null) {
            for (ConsumerRecord<Void, byte[]> record : records) {
               // The handler puts the byte array on a very fast ring buffer so 
it barely takes any time.
                handler.handleMessage(ByteBuffer.wrap(record.value()));
            }
        }
}


With this setup our performance has taken a horrendous hit as soon as we 
started this one thread that just polls Kafka in a loop.

I profiled the application using Java Mission Control and have a few insights.

1. There doesn't seem to be a single hotspot. The consumer just ends up using a 
lot of CPU for handing such a low number of messages. Our process was using 16% 
CPU before we added a single consumer and it went to 25% and above after. 
That's an increase of over 50% from a single consumer getting a single digit 
number of small messages per second. Here is an attachment of the cpu usage 
breakdown in the consumer (the namespace is different because we shade the 
kafka jar before using it) - http://imgur.com/BxWs9Q0 So 20.54% of our entire 
process CPU is used on polling these 64 partitions (across 3 brokers) with 
single digit number of 70-80 byte odd messages.  We've used bigger timeouts 
(100 seconds odd) and that doesn't seem to make much of a difference either.

2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure 
whether this is expected but this seems like it would completely kill 
performance. Here is the exception tab of Java mission control. 
http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3 minutes 
which is about 10 thousand exceptions per second! The exception stack trace 
shows that it originates from the poll call. I don't understand how it can 
throw so many exceptions given I call poll it with a timeout of 10 seconds and 
get a single digit number of messages per second. The exception seems to be 
thrown from here: 
https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L236

3. The single thread seems to allocate a lot too. The single thread is 
responsible for 17.87% of our entire JVM allocation rate. During other runs it 
has gone up to 20% of our entire JVM allocation rate. Most of what it allocates 
seems to be those same EOFExceptions. Here is a chart showing the single 
thread's allocation proportion: http://imgur.com/GNUJQsz Here is a chart that 
shows a breakdown of the allocations: http://imgur.com/YjCXljE About 20% of the 
allocations are for the EOFExceptions. This seems kind of crazy especially 
given that this happens about 10 thousand times a second. But given that the 
20% of the allocations (exceptions) is around 10k/second, the thread itself is 
allocation about 50k objects/second which seems excessive given how we are 
getting very few messages.

As a comparison, we also run a wrapper over the old SimpleConsumer that gets a 
lot more data (30 thousand 70 byte messages/sec on a different topic) and it is 
able to handle that load without much trouble. At this moment we are completely 
puzzled by this performance. At least some part of that seems to be due to the 
crazy volumes of exceptions. Note: Our messages seem to all be making through. 
We haven't measured the end to end latency. The exceptions are caught by 
Kafka's stack and never bubble up to us.


> Kafka consumer 0.9.0.0  client poll is very CPU intensive under certain 
> conditions
> ----------------------------------------------------------------------------------
>
>                 Key: KAFKA-3159
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3159
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.9.0.0
>         Environment: Linux, Oracle JVM 8.
>            Reporter: Rajiv Kurian
>            Assignee: Jason Gustafson
>
> We are using the new kafka consumer with the following config (as logged by 
> kafka)
> metric.reporters = []
>         metadata.max.age.ms = 300000
>         value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>         group.id = myGroup.id
>         partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RangeAssignor]
>         reconnect.backoff.ms = 50
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         max.partition.fetch.bytes = 2097152
>         bootstrap.servers = [myBrokerList]
>         retry.backoff.ms = 100
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         ssl.keystore.type = JKS
>         ssl.trustmanager.algorithm = PKIX
>         enable.auto.commit = false
>         ssl.key.password = null
>         fetch.max.wait.ms = 1000
>         sasl.kerberos.min.time.before.relogin = 60000
>         connections.max.idle.ms = 540000
>         ssl.truststore.password = null
>         session.timeout.ms = 30000
>         metrics.num.samples = 2
>         client.id = 
>         ssl.endpoint.identification.algorithm = null
>         key.deserializer = class sf.kafka.VoidDeserializer
>         ssl.protocol = TLS
>         check.crcs = true
>         request.timeout.ms = 40000
>         ssl.provider = null
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>         ssl.keystore.location = null
>         heartbeat.interval.ms = 3000
>         auto.commit.interval.ms = 5000
>         receive.buffer.bytes = 32768
>         ssl.cipher.suites = null
>         ssl.truststore.type = JKS
>         security.protocol = PLAINTEXT
>         ssl.truststore.location = null
>         ssl.keystore.password = null
>         ssl.keymanager.algorithm = SunX509
>         metrics.sample.window.ms = 30000
>         fetch.min.bytes = 512
>         send.buffer.bytes = 131072
>         auto.offset.reset = earliest
> We use the consumer.assign() feature to assign a list of partitions and call 
> poll in a loop.  We have the following setup:
> 1. The messages have no key and we use the byte array deserializer to get 
> byte arrays from the config.
> 2. The messages themselves are on an average about 75 bytes. We get this 
> number by dividing the Kafka broker bytes-in metric by the messages-in metric.
> 3. Each consumer is assigned about 64 partitions of the same topic spread 
> across three brokers.
> 4. We get very few messages per second maybe around 1-2 messages across all 
> partitions on a client right now.
> 5. We have no compression on the topic.
> Our run loop looks something like this
> while (isRunning()) {
> ConsumerRecords<Void, byte[]> records = null;
>         try {
>             // Here timeout is about 10 seconds, so it is pretty big.
>             records = consumer.poll(timeout);
>         } catch (Exception e) {
>            // This never hits for us
>             logger.error("Exception polling Kafka ", e);
>             records = null;
>         }
>         if (records != null) {
>             for (ConsumerRecord<Void, byte[]> record : records) {
>                // The handler puts the byte array on a very fast ring buffer 
> so it barely takes any time.
>                 handler.handleMessage(ByteBuffer.wrap(record.value()));
>             }
>         }
> }
> With this setup our performance has taken a horrendous hit as soon as we 
> started this one thread that just polls Kafka in a loop.
> I profiled the application using Java Mission Control and have a few insights.
> 1. There doesn't seem to be a single hotspot. The consumer just ends up using 
> a lot of CPU for handing such a low number of messages. Our process was using 
> 16% CPU before we added a single consumer and it went to 25% and above after. 
> That's an increase of over 50% from a single consumer getting a single digit 
> number of small messages per second. Here is an attachment of the cpu usage 
> breakdown in the consumer (the namespace is different because we shade the 
> kafka jar before using it) - http://imgur.com/BxWs9Q0 So 20.54% of our entire 
> process CPU is used on polling these 64 partitions (across 3 brokers) with 
> single digit number of 70-80 byte odd messages.  We've used bigger timeouts 
> (100 seconds odd) and that doesn't seem to make much of a difference either.
> 2. It also seems like Kafka throws a ton of EOFExceptions. I am not sure 
> whether this is expected but this seems like it would completely kill 
> performance. Here is the exception tab of Java mission control. 
> http://imgur.com/X3KSn37 That is 1.8 mn exceptions over a period of 3 minutes 
> which is about 10 thousand exceptions per second! The exception stack trace 
> shows that it originates from the poll call. I don't understand how it can 
> throw so many exceptions given I call poll it with a timeout of 10 seconds 
> and get a single digit number of messages per second. The exception seems to 
> be thrown from here: 
> https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java#L236
> 3. The single thread seems to allocate a lot too. The single thread is 
> responsible for 17.87% of our entire JVM allocation rate. During other runs 
> it has gone up to 20% of our entire JVM allocation rate. Most of what it 
> allocates seems to be those same EOFExceptions. Here is a chart showing the 
> single thread's allocation proportion: http://imgur.com/GNUJQsz Here is a 
> chart that shows a breakdown of the allocations: http://imgur.com/YjCXljE 
> About 20% of the allocations are for the EOFExceptions. But given that the 
> 20% of the allocations (exceptions) is around 10k/second, the thread itself 
> is allocating about 50k objects/second which seems excessive given how we are 
> getting very few messages.
> As a comparison, we also run a wrapper over the old SimpleConsumer that gets 
> a lot more data (30 thousand 70 byte messages/sec on a different topic) and 
> it is able to handle that load without much trouble. At this moment we are 
> completely puzzled by this performance. At least some part of that seems to 
> be due to the crazy volumes of exceptions but the CPU profiling breakdown 
> seems to suggest that there are plenty of other causes including the 
> initFetches call and the ConsumerNetworkClient.poll call. Note: Our messages 
> seem to all be making through. We haven't measured the end to end latency. 
> The exceptions are caught by Kafka's stack and never bubble up to us.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to