[
https://issues.apache.org/jira/browse/KAFKA-1010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sam Meder updated KAFKA-1010:
-----------------------------
Attachment: get_cluster_0_8_git.patch
> Concurrency issue in getCluster() causes rebalance failure and dead consumer
> ----------------------------------------------------------------------------
>
> Key: KAFKA-1010
> URL: https://issues.apache.org/jira/browse/KAFKA-1010
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.8
> Reporter: Sam Meder
> Assignee: Neha Narkhede
> Priority: Blocker
> Fix For: 0.8
>
> Attachments: get_cluster_0_8_git.patch, get_cluster_0_8.patch
>
>
> We're seeing the following stack trace on the consumer when brokers are
> (forcefully) removed from the cluster:
> Thu Aug 15 05:10:06 GMT 2013 Exception in thread "main"
> org.I0Itec.zkclient.exception.ZkNoNodeException:
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
> NoNode for /brokers/ids/4
> at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> at kafka.utils.ZkUtils$.readData(ZkUtils.scala:407)
> at kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:453)
> at kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:452)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:549)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> at
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:596)
> at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:452)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:394)
> at
> scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
> at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:391)
> at
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
> at
> kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:206)
> at
> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:77)
> at
> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:89)
> I'm pretty sure this is due to the following logic in getCluster():
> val nodes = getChildrenParentMayNotExist(zkClient, BrokerIdsPath)
> for (node <- nodes) {
> val brokerZKString = readData(zkClient, BrokerIdsPath + "/" + node)._1
> cluster.add(Broker.createBroker(node.toInt, brokerZKString))
> }
> which is obviously not safe since the nodes retrieved in the first call may
> have disappeared by the time we iterate to get the values.
> getCluster() seems to only be used in
> ZookeeperConsumerConnector.syncedRebalance and in
> ImportZkOffsets.updateZkOffsets (which doesn't actually look like it is using
> the values), so the simplest solution may be to just move the getCluster()
> call into the try block in syncedRebalance and kill the usage in the other
> call.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira