[ https://issues.apache.org/jira/browse/KAFKA-956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13694498#comment-13694498 ]
Jun Rao commented on KAFKA-956: ------------------------------- Got it. The weird part is that if a topic exists, why would the metadata request return an empty list of partitions (with the LeaderNotAvailableCode)? One case that I can think of is that the metadata may not have been propagated to all brokers yet. Since metadata propagation is quick in general, if we retry rebalance (which will happen with your patch), chances are that we will pick up the updated metadata. However, there is an issue. If a topic doesn't exist (and auto topic creation is turned off), throwing a KafkaException when there is no partition will prevent the consumer from starting. Instead, we should let rebalance proceed. When the topic is created in the future, a ZK watcher will be fired and a rebalance will be triggered. I am not sure what's the best way to deal with both cases. Need to think about this a bit more. > High-level consumer fails to check topic metadata response for errors > --------------------------------------------------------------------- > > Key: KAFKA-956 > URL: https://issues.apache.org/jira/browse/KAFKA-956 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.8 > Reporter: Sam Meder > Assignee: Neha Narkhede > Priority: Blocker > Fix For: 0.8 > > Attachments: consumer_metadata_fetch.patch > > > In our environment we noticed that consumers would sometimes hang when > started too close to starting the Kafka server. I tracked this down and it > seems to be related to some code in rebalance > (ZookeeperConsumerConnector.scala). In particular the following code seems > problematic: > val topicsMetadata = > ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, > brokers, > config.clientId, > > config.socketTimeoutMs, > > correlationId.getAndIncrement).topicsMetadata > val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]] > topicsMetadata.foreach(m => { > val topic = m.topic > val partitions = m.partitionsMetadata.map(m1 => m1.partitionId) > partitionsPerTopicMap.put(topic, partitions) > }) > The response is never checked for error, so may not actually contain any > partition info! Rebalance goes its merry way, but doesn't know about any > partitions so never assigns them... -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira