Hi,
I have two mirror makers A and B both subscripting to the same whitelist.
During topic rebalancing one of the mirror maker A encountered
ZkNoNodeException and then stopped all connections. but mirror maker B
didn't pick up the topics that were consumed by A and left some of the
topics unassigned. I think this is due to A not releasing ownership of
those topics. My question is why A didn't release ownership upon receiving
error?
Here is the stack trace
[2015-06-08 15:55:46,379] INFO [test_some-ip-1433797157389-247c1fc4],
exception during rebalance (kafka.consumer.ZookeeperConsumerConnector)
org.I0Itec.zkclient.exception.ZkNoNodeException:
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
NoNode for /consumers/test/ids/test_some-ip-1433797157389-247c1fc4
at
org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
at
org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
at kafka.utils.ZkUtils$.readData(ZkUtils.scala:473)
at
kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:657)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:629)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:620)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:620)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:620)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:619)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:572)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
KeeperErrorCode = NoNode for
/consumers/test/ids/test_some-ip-1433797157389-247c1fc4
at
org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
at
org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1155)
at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1184)
at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
at
org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
... 13 more
Here is the last part of the log
[2015-06-08 15:55:49,848] INFO [ConsumerFetcherManager-1433797157399]
Stopping leader finder thread (kafka.consumer.ConsumerFetcherManager)
[2015-06-08 15:55:49,848] INFO [ConsumerFetcherManager-1433797157399]
Stopping all fetchers (kafka.consumer.ConsumerFetcherManager)
[2015-06-08 15:55:49,848] INFO [ConsumerFetcherManager-1433797157399] All
connections stopped (kafka.consumer.ConsumerFetcherManager)
[2015-06-08 15:55:49,848] INFO [test_some-ip-1433797157389-247c1fc4],
Cleared all relevant queues for this fetcher
(kafka.consumer.ZookeeperConsumerConnector)
[2015-06-08 15:55:49,849] INFO [test_some-ip-1433797157389-247c1fc4],
Cleared the data chunks in all the consumer message iterators
(kafka.consumer.ZookeeperConsumerConnector)
[2015-06-08 15:55:49,849] INFO [test_some-ip-1433797157389-247c1fc4],
Invoking rebalance listener before relasing partition ownerships.
(kafka.consumer.ZookeeperConsumerConnector)
As seen in the log Mirror maker A didn't release ownership and it didn't
attempt to trigger another round of rebalancing either. I checked zk. the
node that was reported missing actually existed and it was created at the
same time the error was thrown.
I use the latest trunk code