Maysam Yabandeh created KAFKA-3493: -------------------------------------- Summary: Replica fetcher load is not balanced over fetcher threads Key: KAFKA-3493 URL: https://issues.apache.org/jira/browse/KAFKA-3493 Project: Kafka Issue Type: Improvement Affects Versions: 0.9.0.1 Reporter: Maysam Yabandeh Fix For: 0.10.0.1
The replicas are not evenly distributed among the fetcher threads. This has caused some fetcher threads get overloaded and hence their requests timeout frequently. This is especially a big issue when a new node is added to the cluster and the fetch traffic is large. Here is an example run in a test cluster with 10 brokers and 6 fetcher threads (per source broker). A single topic consisting of 500+ partitions was assigned to have a replica for each parition on the newly added broker. {code}[kafka-jetstream.canary]myabandeh@sjc8c-rl17-23b:~$ for i in `seq 0 5`; do grep ReplicaFetcherThread-$i- /var/log/kafka/server.log | grep "reset its fetch offset from 0" | wc -l; done 85 83 85 83 85 85 [kafka-jetstream.canary]myabandeh@sjc8c-rl17-23b:~$ for i in `seq 0 5`; do grep ReplicaFetcherThread-$i-22 /var/log/kafka/server.log | grep "reset its fetch offset from 0" | wc -l; done 15 1 13 1 14 1 {code} The problem is that AbstractFetcherManager::getFetcherId method does not take the broker id into account: {code} private def getFetcherId(topic: String, partitionId: Int) : Int = { Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers } {code} Hence although the replicas are evenly distributed among the fetcher ids across all source brokers, this is not necessarily the case for each broker separately. I think a random function would do a much better job in distributing the load over the fetcher threads from each source broker. Thoughts? -- This message was sent by Atlassian JIRA (v6.3.4#6332)