Hi Kafka users, *tldr questions;*
*1. Is it normal or expected for the coordinator load state to last for 6 hours? Is this load time affected by log retention settings, message production rate, or other parameters?* *2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming only from the non-erroring partitions? Pykafka's insistence that all partitions return successful OffsetFetchResponses can be a source of consumption downtime.* *3. Why does Kafka sometimes select a non-synced replica as the preferred replica during coordinator loads? How can I reassign partition leaders to replicas in the ISR?* *4. Are all of these questions moot because I should just be using a newer version of Kafka than 0.8.2.1?* I'm using Kafka 0.8.2.1, running a topic with 200 partitions and RF=3, with log retention set to about 1GB. An unknown event caused the cluster to enter the "coordinator load" or "group load" state. A few signals made this apparent: the pykafka-based consumers began to fail <https://github.com/Parsely/pykafka/blob/858554029830e15cfa6d15df002d1772672d8ee0/pykafka/simpleconsumer.py#L643> during FetchOffsetRequests with error code 14 COORDINATOR_LOAD_IN_PROGRESS for some subset of partitions. These errors were triggered when consuming with a consumer group that had existed since before the coordinator load. In broker logs, messages like this appeared: [2018-05...] ERROR Controller 17 epoch 20 initiated state change for partition [my.cool.topic,144] from OnlinePartition to OnlinePartition failed (state.change.logger) kafka.common.StateChangeFailedException: encountered error while electing leader for partition [my.cool.topic,144] due to: Preferred replica 11 for partition [my.cool.topic,144] is either not alive or not in the isr. Current leader and ISR: [{"leader":12,"leader_epoch":7,"isr":[12,13]}]. For some reason, Kafka decided that replica 11 was the "preferred" replica despite the fact that it was not in the ISR. To my knowledge, consumption *could* continue uninterrupted from either replica 12 or 13 while 11 resynchronized - it's not clear why Kafka chose a non-synced replica as the preferred leader. The above-described behavior lasted for about 6 hours, during which the pykafka fetch_offsets error made message consumption impossible. While the coordinator load was still in progress, other consumer groups were able to consume the topic without error. In fact, the eventual fix was to restart the broken consumers with a new consumer_group name. *Questions* 1. Is it normal or expected for the coordinator load state to last for 6 hours? Is this load time affected by log retention settings, message production rate, or other parameters? 2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming only from the non-erroring partitions? Pykafka's insistence that all partitions return successful OffsetFetchResponses can be a source of consumption downtime. 3. Why does Kafka sometimes select a non-synced replica as the preferred replica during coordinator loads? How can I reassign partition leaders to replicas in the ISR? 4. Are all of these questions moot because I should just be using a newer version of Kafka? Thanks for your help, and please let me know if I can provide additional information or answer additional questions. Broker config options: broker.id=10 port=9092 zookeeper.connect=****/kafka5 log.dirs=***** delete.topic.enable=true replica.fetch.max.bytes=1048576 replica.fetch.wait.max.ms=500 replica.high.watermark.checkpoint.interval.ms=5000 replica.socket.timeout.ms=30000 replica.socket.receive.buffer.bytes=65536 replica.lag.time.max.ms=10000 replica.lag.max.messages=4000 controller.socket.timeout.ms=30000 message.max.bytes=1000000 auto.create.topics.enable=false log.index.interval.bytes=4096 log.index.size.max.bytes=10485760 log.retention.hours=96 log.roll.hours=168 log.retention.check.interval.ms=300000 log.segment.bytes=1073741824 zookeeper.connection.timeout.ms=6000 zookeeper.sync.time.ms=2000 num.io.threads=8 socket.request.max.bytes=104857600 num.replica.fetchers=4 controller.message.queue.size=10 num.partitions=8 log.flush.interval.ms=60000 log.flush.interval.messages=60000 log.flush.scheduler.interval.ms=2000 num.network.threads=8 socket.receive.buffer.bytes=1048576 socket.send.buffer.bytes=1048576 queued.max.requests=500 fetch.purgatory.purge.interval.requests=100 producer.purgatory.purge.interval.requests=100 controlled.shutdown.enable=true -- Emmett Butler | Senior Software Engineer <http://www.parsely.com/?utm_source=Signature&utm_medium=emmett-butler&utm_campaign=Signature>