Good Afternoon, I am currently trying to do a rolling upgrade from Kafka 0.8.2.1 to 0.9.0.1 and am running into a problem when starting 0.9.0.1 with the protocol version 0.8.2.1 set in the server.properties.
Here is my current Kafka topic setup, data retention and hardware used: 3 Zookeeper nodes 5 Broker nodes Topics have at least 2 replicas Topics have no more than 200 partitions 4,564 partitions across 61 topics 14 day retention Each Kafka node has between 2.1T - 2.9T of data Hardware is C4.2xlarge AWS instances - 8 Core (Intel(R) Xeon(R) CPU E5-2666 v3 @ 2.90GHz) - 14G Ram - 4TB EBS volume (10k IOPS [never gets maxed unless I up the num.io.threads]) Here is my running broker configuration for 0.9.0.1: <snip> [2016-05-11 11:43:58,172] INFO KafkaConfig values: advertised.host.name = server.domain metric.reporters = [] quota.producer.default = 9223372036854775807 offsets.topic.num.partitions = 150 log.flush.interval.messages = 9223372036854775807 auto.create.topics.enable = false controller.socket.timeout.ms = 30000 log.flush.interval.ms = 1000 principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder replica.socket.receive.buffer.bytes = 65536 min.insync.replicas = 1 replica.fetch.wait.max.ms = 500 num.recovery.threads.per.data.dir = 1 ssl.keystore.type = JKS default.replication.factor = 3 ssl.truststore.password = null log.preallocate = false sasl.kerberos.principal.to.local.rules = [DEFAULT] fetch.purgatory.purge.interval.requests = 1000 ssl.endpoint.identification.algorithm = null replica.socket.timeout.ms = 30000 message.max.bytes = 10485760 num.io.threads =8 offsets.commit.required.acks = -1 log.flush.offset.checkpoint.interval.ms = 60000 delete.topic.enable = true quota.window.size.seconds = 1 ssl.truststore.type = JKS offsets.commit.timeout.ms = 5000 quota.window.num = 11 zookeeper.connect = zkserver:2181/kafka authorizer.class.name = num.replica.fetchers = 8 log.retention.ms = null log.roll.jitter.hours = 0 log.cleaner.enable = false offsets.load.buffer.size = 5242880 log.cleaner.delete.retention.ms = 86400000 ssl.client.auth = none controlled.shutdown.max.retries = 3 queued.max.requests = 500 offsets.topic.replication.factor = 3 log.cleaner.threads = 1 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 socket.request.max.bytes = 104857600 ssl.trustmanager.algorithm = PKIX zookeeper.session.timeout.ms = 6000 log.retention.bytes = -1 sasl.kerberos.min.time.before.relogin = 60000 zookeeper.set.acl = false connections.max.idle.ms = 600000 offsets.retention.minutes = 1440 replica.fetch.backoff.ms = 1000 inter.broker.protocol.version = 0.8.2.1 log.retention.hours = 168 num.partitions = 16 broker.id.generation.enable = false listeners = null ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] log.roll.ms = null log.flush.scheduler.interval.ms = 9223372036854775807 ssl.cipher.suites = null log.index.size.max.bytes = 10485760 ssl.keymanager.algorithm = SunX509 security.inter.broker.protocol = PLAINTEXT replica.fetch.max.bytes = 104857600 advertised.port = null log.cleaner.dedupe.buffer.size = 134217728 replica.high.watermark.checkpoint.interval.ms = 5000 log.cleaner.io.buffer.size = 524288 sasl.kerberos.ticket.renew.window.factor = 0.8 zookeeper.connection.timeout.ms = 6000 controlled.shutdown.retry.backoff.ms = 5000 log.roll.hours = 168 log.cleanup.policy = delete host.name = log.roll.jitter.ms = null max.connections.per.ip = 2147483647 offsets.topic.segment.bytes = 104857600 background.threads = 10 quota.consumer.default = 9223372036854775807 request.timeout.ms = 30000 log.index.interval.bytes = 4096 log.dir = /tmp/kafka-logs log.segment.bytes = 268435456 log.cleaner.backoff.ms = 15000 offset.metadata.max.bytes = 4096 ssl.truststore.location = null group.max.session.timeout.ms = 30000 ssl.keystore.password = null zookeeper.sync.time.ms = 2000 port = 9092 log.retention.minutes = null log.segment.delete.delay.ms = 60000 log.dirs = /mnt/kafka/data controlled.shutdown.enable = true compression.type = producer max.connections.per.ip.overrides = sasl.kerberos.kinit.cmd = /usr/bin/kinit log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 auto.leader.rebalance.enable = true leader.imbalance.check.interval.seconds = 300 log.cleaner.min.cleanable.ratio = 0.5 replica.lag.time.max.ms = 10000 num.network.threads =8 ssl.key.password = null reserved.broker.max.id = 1000 metrics.num.samples = 2 socket.send.buffer.bytes = 2097152 ssl.protocol = TLS socket.receive.buffer.bytes = 2097152 ssl.keystore.location = null replica.fetch.min.bytes = 1 unclean.leader.election.enable = false group.min.session.timeout.ms = 6000 log.cleaner.io.buffer.load.factor = 0.9 offsets.retention.check.interval.ms = 600000 producer.purgatory.purge.interval.requests = 1000 metrics.sample.window.ms = 30000 broker.id = 2 offsets.topic.compression.codec = 0 log.retention.check.interval.ms = 300000 advertised.listeners = null leader.imbalance.per.broker.percentage = 10 </snip> The above configuration is the same as the rest of the 0.8.2.1 cluster other than the new 0.9.0.1 configuration options. I tested the upgrade in our development environment and I had no issue. But it was not the production load. Here is what I did. 1) stopped kafka nicely. 2) purged the 0.8.2.1 kafka package 3) installed the new 0.9.0.1 kafka package 4) used configuration management to lay down the same configuration as 0.8.2.1 other than the below additions: <snip> broker.id.generation.enable = false inter.broker.protocol.version = 0.8.2.1 </snip> Started Kafka and after a minute or so I got the following errors on the 0.9.0.1 node: My error is: <snip> [2016-05-10 16:13:58,459] WARN [ReplicaFetcherThread-7-5], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@505df75a. Possible cause: java.io.IOException: Connection to 5 was disconnected before the response was read (kafka.server.ReplicaFetcherThread) [2016-05-10 16:13:58,462] WARN [ReplicaFetcherThread-7-3], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@538a0473. Possible cause: java.lang.OutOfMemoryError (kafka.server.ReplicaFetcherThread) </snip> Looks like this may be related to the following bugs: https://issues.apache.org/jira/browse/KAFKA-1980 https://issues.apache.org/jira/browse/KAFKA-664 https://issues.apache.org/jira/browse/KAFKA-3071 I tried increasing the JVM heap from -Xms2G -Xmx4G to -Xms4G -Xmx8G This did not help the situation. Not only did it still have the same error as above, but the Linux OOM Killer killed Kafka and upstart restarted it. Vicious circle. I changed the heap back and the OOM killer stopped saving the instance. Since it was a replicafetchthread that was OOMing I tried reducing the number of replica threads, but that did not help the situation. I tried reducing the amount of the num.network.threads and num.io.threads and that did not work either. I also tried to do all of the above, but with a blank dataset so it would get its data from the other replicas. This did not make a difference either. Still got the same errors. It is worth mentioning that the errors started after 555M was replicated from its replicas. As a test, I decided to try ramping up the num.io.threads, num.network.threads and num.replica.fetchers to 32 and cleared out the disk again. I was able to replicate 57G before the memory errors started coming in again. During this time, the disk was between 88% - 100% utilized. I am currently stuck on this issue and I am down a broker from 5 to 4. I was thinking about trying to bring up a new broker with a new id and manually move topics to it. I am pretty sure this will fail in the same manner. Any suggestions to try and get through this problem? Thanks