[ https://issues.apache.org/jira/browse/KAFKA-1010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13742354#comment-13742354 ]
Sam Meder commented on KAFKA-1010: ---------------------------------- weird, patch -p1 -i get_cluster_0_8.patch worked, but git apply didn't. Let me see if I can get something that git apply likes. > Concurrency issue in getCluster() causes rebalance failure and dead consumer > ---------------------------------------------------------------------------- > > Key: KAFKA-1010 > URL: https://issues.apache.org/jira/browse/KAFKA-1010 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.8 > Reporter: Sam Meder > Assignee: Neha Narkhede > Priority: Blocker > Fix For: 0.8 > > Attachments: get_cluster_0_8.patch > > > We're seeing the following stack trace on the consumer when brokers are > (forcefully) removed from the cluster: > Thu Aug 15 05:10:06 GMT 2013 Exception in thread "main" > org.I0Itec.zkclient.exception.ZkNoNodeException: > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = > NoNode for /brokers/ids/4 > at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766) > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) > at kafka.utils.ZkUtils$.readData(ZkUtils.scala:407) > at kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:453) > at kafka.utils.ZkUtils$$anonfun$getCluster$1.apply(ZkUtils.scala:452) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at > scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:549) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at > scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:596) > at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:452) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:394) > at > scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) > at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:391) > at > kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722) > at > kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:206) > at > kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:77) > at > kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:89) > I'm pretty sure this is due to the following logic in getCluster(): > val nodes = getChildrenParentMayNotExist(zkClient, BrokerIdsPath) > for (node <- nodes) { > val brokerZKString = readData(zkClient, BrokerIdsPath + "/" + node)._1 > cluster.add(Broker.createBroker(node.toInt, brokerZKString)) > } > which is obviously not safe since the nodes retrieved in the first call may > have disappeared by the time we iterate to get the values. > getCluster() seems to only be used in > ZookeeperConsumerConnector.syncedRebalance and in > ImportZkOffsets.updateZkOffsets (which doesn't actually look like it is using > the values), so the simplest solution may be to just move the getCluster() > call into the try block in syncedRebalance and kill the usage in the other > call. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira