[
https://issues.apache.org/jira/browse/KAFKA-956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13752116#comment-13752116
]
Neha Narkhede commented on KAFKA-956:
-------------------------------------
The root cause of this issue is the fact that a broker, that is not really
ready to serve requests, ends up serving requests and misleading clients. When
a broker starts up, it expects to receive an UpdateMetadata request from the
controller. Until this happens, the broker should explicitly return a
BrokerNotReady error code. Upon receiving this error code, the client should
try to send the request to another broker. In the specific example of
rebalance, the consumer will get BrokerNotReady error code and will try
fetching metadata from all the brokers at least once before giving up. A
similar problem exists on the producer side. If you rolling bounce a Kafka
cluster when several thousands of producer clients are connected to the
cluster, and auto creation of topics is turned on, it creates a storm of topic
metadata requests turning into create topic requests to the brokers. The
brokers spend a lot of time trying to create topics since they don't yet know
that the topic exists.
You could argue that a broker that is not ready should not accept connections
and probably not even start the socket server until it is ready to serve
requests. But currently since the broker uses the same socket server to
communicate with the controller, this is not an easy fix to put in 0.8
> High-level consumer fails to check topic metadata response for errors
> ---------------------------------------------------------------------
>
> Key: KAFKA-956
> URL: https://issues.apache.org/jira/browse/KAFKA-956
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.8
> Reporter: Sam Meder
> Assignee: Neha Narkhede
> Priority: Blocker
> Fix For: 0.8
>
> Attachments: consumer_metadata_fetch.patch
>
>
> In our environment we noticed that consumers would sometimes hang when
> started too close to starting the Kafka server. I tracked this down and it
> seems to be related to some code in rebalance
> (ZookeeperConsumerConnector.scala). In particular the following code seems
> problematic:
> val topicsMetadata =
> ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
> brokers,
> config.clientId,
>
> config.socketTimeoutMs,
>
> correlationId.getAndIncrement).topicsMetadata
> val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
> topicsMetadata.foreach(m => {
> val topic = m.topic
> val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)
> partitionsPerTopicMap.put(topic, partitions)
> })
> The response is never checked for error, so may not actually contain any
> partition info! Rebalance goes its merry way, but doesn't know about any
> partitions so never assigns them...
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira