[ 
https://issues.apache.org/jira/browse/KAFKA-956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13696465#comment-13696465
 ] 

Jun Rao commented on KAFKA-956:
-------------------------------

One possible solution is to let the consumer read the partition data from ZK 
directly. This way, if a consumer finds out that a topic doesn't exist, a ZK 
watcher is guaranteed to be triggered when the topic is created later. The only 
problem is that if there are many topics, reading them one at a time from ZK 
can be slow. ZK 3.4.x has the multi api support and we do plan to upgrade to 
that version. Perhaps we can revisit this issue at that point?
                
> High-level consumer fails to check topic metadata response for errors
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-956
>                 URL: https://issues.apache.org/jira/browse/KAFKA-956
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8
>            Reporter: Sam Meder
>            Assignee: Neha Narkhede
>            Priority: Blocker
>             Fix For: 0.8
>
>         Attachments: consumer_metadata_fetch.patch
>
>
> In our environment we noticed that consumers would sometimes hang when 
> started too close to starting the Kafka server. I tracked this down and it 
> seems to be related to some code in rebalance 
> (ZookeeperConsumerConnector.scala). In particular the following code seems 
> problematic:
>       val topicsMetadata = 
> ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
>                                                           brokers,
>                                                           config.clientId,
>                                                           
> config.socketTimeoutMs,
>                                                           
> correlationId.getAndIncrement).topicsMetadata
>       val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
>       topicsMetadata.foreach(m => {
>         val topic = m.topic
>         val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)
>         partitionsPerTopicMap.put(topic, partitions)
>       })
> The response is never checked for error, so may not actually contain any 
> partition info! Rebalance goes its merry way, but doesn't know about any 
> partitions so never assigns them...

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to