Good Afternoon,

I am currently trying to do a rolling upgrade from Kafka 0.8.2.1 to 0.9.0.1
and am running into a problem when starting 0.9.0.1 with the protocol
version 0.8.2.1 set in the server.properties.

Here is my current Kafka topic setup, data retention and hardware used:

3 Zookeeper nodes
5 Broker nodes
Topics have at least 2 replicas
Topics have no more than 200 partitions
4,564 partitions across 61 topics
14 day retention
Each Kafka node has between 2.1T - 2.9T of data
Hardware is C4.2xlarge AWS instances
 - 8 Core (Intel(R) Xeon(R) CPU E5-2666 v3 @ 2.90GHz)
 - 14G Ram
 - 4TB EBS volume (10k IOPS [never gets maxed unless I up the
num.io.threads])

Here is my running broker configuration for 0.9.0.1:
<snip>
[2016-05-11 11:43:58,172] INFO KafkaConfig values:
advertised.host.name = server.domain
metric.reporters = []
quota.producer.default = 9223372036854775807
offsets.topic.num.partitions = 150
log.flush.interval.messages = 9223372036854775807
auto.create.topics.enable = false
controller.socket.timeout.ms = 30000
log.flush.interval.ms = 1000
principal.builder.class = class
org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
replica.socket.receive.buffer.bytes = 65536
min.insync.replicas = 1
replica.fetch.wait.max.ms = 500
num.recovery.threads.per.data.dir = 1
ssl.keystore.type = JKS
default.replication.factor = 3
ssl.truststore.password = null
log.preallocate = false
sasl.kerberos.principal.to.local.rules = [DEFAULT]
fetch.purgatory.purge.interval.requests = 1000
ssl.endpoint.identification.algorithm = null
replica.socket.timeout.ms = 30000
message.max.bytes = 10485760
num.io.threads =8
offsets.commit.required.acks = -1
log.flush.offset.checkpoint.interval.ms = 60000
delete.topic.enable = true
quota.window.size.seconds = 1
ssl.truststore.type = JKS
offsets.commit.timeout.ms = 5000
quota.window.num = 11
zookeeper.connect = zkserver:2181/kafka
authorizer.class.name =
num.replica.fetchers = 8
log.retention.ms = null
log.roll.jitter.hours = 0
log.cleaner.enable = false
offsets.load.buffer.size = 5242880
log.cleaner.delete.retention.ms = 86400000
ssl.client.auth = none
controlled.shutdown.max.retries = 3
queued.max.requests = 500
offsets.topic.replication.factor = 3
log.cleaner.threads = 1
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
socket.request.max.bytes = 104857600
ssl.trustmanager.algorithm = PKIX
zookeeper.session.timeout.ms = 6000
log.retention.bytes = -1
sasl.kerberos.min.time.before.relogin = 60000
zookeeper.set.acl = false
connections.max.idle.ms = 600000
offsets.retention.minutes = 1440
replica.fetch.backoff.ms = 1000
inter.broker.protocol.version = 0.8.2.1
log.retention.hours = 168
num.partitions = 16
broker.id.generation.enable = false
listeners = null
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
log.roll.ms = null
log.flush.scheduler.interval.ms = 9223372036854775807
ssl.cipher.suites = null
log.index.size.max.bytes = 10485760
ssl.keymanager.algorithm = SunX509
security.inter.broker.protocol = PLAINTEXT
replica.fetch.max.bytes = 104857600
advertised.port = null
log.cleaner.dedupe.buffer.size = 134217728
replica.high.watermark.checkpoint.interval.ms = 5000
log.cleaner.io.buffer.size = 524288
sasl.kerberos.ticket.renew.window.factor = 0.8
zookeeper.connection.timeout.ms = 6000
controlled.shutdown.retry.backoff.ms = 5000
log.roll.hours = 168
log.cleanup.policy = delete
host.name =
log.roll.jitter.ms = null
max.connections.per.ip = 2147483647
offsets.topic.segment.bytes = 104857600
background.threads = 10
quota.consumer.default = 9223372036854775807
request.timeout.ms = 30000
log.index.interval.bytes = 4096
log.dir = /tmp/kafka-logs
log.segment.bytes = 268435456
log.cleaner.backoff.ms = 15000
offset.metadata.max.bytes = 4096
ssl.truststore.location = null
group.max.session.timeout.ms = 30000
ssl.keystore.password = null
zookeeper.sync.time.ms = 2000
port = 9092
log.retention.minutes = null
log.segment.delete.delay.ms = 60000
log.dirs = /mnt/kafka/data
controlled.shutdown.enable = true
compression.type = producer
max.connections.per.ip.overrides =
sasl.kerberos.kinit.cmd = /usr/bin/kinit
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
auto.leader.rebalance.enable = true
leader.imbalance.check.interval.seconds = 300
log.cleaner.min.cleanable.ratio = 0.5
replica.lag.time.max.ms = 10000
num.network.threads =8
ssl.key.password = null
reserved.broker.max.id = 1000
metrics.num.samples = 2
socket.send.buffer.bytes = 2097152
ssl.protocol = TLS
socket.receive.buffer.bytes = 2097152
ssl.keystore.location = null
replica.fetch.min.bytes = 1
unclean.leader.election.enable = false
group.min.session.timeout.ms = 6000
log.cleaner.io.buffer.load.factor = 0.9
offsets.retention.check.interval.ms = 600000
producer.purgatory.purge.interval.requests = 1000
metrics.sample.window.ms = 30000
broker.id = 2
offsets.topic.compression.codec = 0
log.retention.check.interval.ms = 300000
advertised.listeners = null
leader.imbalance.per.broker.percentage = 10
</snip>

The above configuration is the same as the rest of the 0.8.2.1 cluster
other than the new 0.9.0.1 configuration options.

I tested the upgrade in our development environment and I had no issue.
But it was not the production load.

Here is what I did.

1) stopped kafka nicely.
2) purged the 0.8.2.1 kafka package
3) installed the new 0.9.0.1 kafka package
4) used configuration management to lay down the same configuration as
0.8.2.1 other than the below additions:

<snip>
broker.id.generation.enable = false
inter.broker.protocol.version = 0.8.2.1
</snip>

Started Kafka and after a minute or so I got the following errors on the
0.9.0.1 node:

My error is:
<snip>
[2016-05-10 16:13:58,459] WARN [ReplicaFetcherThread-7-5], Error in fetch
kafka.server.ReplicaFetcherThread$FetchRequest@505df75a. Possible cause:
java.io.IOException: Connection to 5 was disconnected before the response
was read (kafka.server.ReplicaFetcherThread)
[2016-05-10 16:13:58,462] WARN [ReplicaFetcherThread-7-3], Error in fetch
kafka.server.ReplicaFetcherThread$FetchRequest@538a0473. Possible cause:
java.lang.OutOfMemoryError (kafka.server.ReplicaFetcherThread)
</snip>

Looks like this may be related to the following bugs:
https://issues.apache.org/jira/browse/KAFKA-1980
https://issues.apache.org/jira/browse/KAFKA-664
https://issues.apache.org/jira/browse/KAFKA-3071

I tried increasing the JVM heap from -Xms2G -Xmx4G to -Xms4G -Xmx8G

This did not help the situation.  Not only did it still have the same error
as above, but the Linux OOM Killer killed Kafka and upstart restarted it.
Vicious circle.

I changed the heap back and the OOM killer stopped saving the instance.

Since it was a replicafetchthread that was OOMing I tried reducing the
number of replica threads, but that did not help the situation.

I tried reducing the amount of the num.network.threads and num.io.threads
and that did not work either.

I also tried to do all of the above, but with a blank dataset so it would
get its data from the other replicas.  This did not make a difference
either.  Still got the same errors.

It is worth mentioning that the errors started after 555M was replicated
from its replicas.  As a test, I decided to try ramping up the
num.io.threads, num.network.threads and num.replica.fetchers to 32 and
cleared out the disk again.  I was able to replicate 57G before the memory
errors started coming in again.  During this time, the disk was between 88%
- 100% utilized.

I am currently stuck on this issue and I am down a broker from 5 to 4.

I was thinking about trying to bring up a new broker with a new id and
manually move topics to it.  I am pretty sure this will fail in the same
manner.

Any suggestions to try and get through this problem?

Thanks

Reply via email to