Hi Kakfa team, So just monitor "ZkClient-EventThread <http://zkclient-eventthread-65-dare-msgq00.sv.walmartlabs.com:9091/>*" threads via ThreadInfo[] threads = ManagementFactory.getThreadMXBean().....; and if this ZkClient-EventThread <http://zkclient-eventthread-65-dare-msgq00.sv.walmartlabs.com:9091/>* dies thread dies, then restart the sources. Is there any alter approach or life cycle method that so api consumer can attached to Consumer life cycle that it is dying and get notified so we can take some action.
Thanks, Bhavesh On Mon, Nov 17, 2014 at 2:30 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com> wrote: > Hi Kafka Team, > > > I get following exception due to ZK/Network issues intermittently. How do > I recover from consumer thread dying *programmatically* and restart > source because we have alerts that due to this error we have partition > OWNERSHIP is *none* ? Please let me know how to restart source and > detect consumer thread died and need to be restarted ? > > > > 17 Nov 2014 04:29:41,180 ERROR [ > ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091, > dare-msgq01.sv.walmartlabs.com:9091,dare-msgq02.sv.walmartlabs.com:9091] > (org.I0Itec.zkclient.ZkEventThread.run:77) - Error handling event > ZkEvent[New session event sent to > kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8 > ] > kafka.common.ConsumerRebalanceFailedException: > mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68 > can't rebalance after 8 retries > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626) > at > kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481) > at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > > > > > > ZK Connection Issues: > > java.net.SocketException: Transport endpoint is not connected > at sun.nio.ch.SocketChannelImpl.shutdown(Native Method) > at > sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633) > at sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360) > at > org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205) > at > org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170) > > > > > at > org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) > at > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766) > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) > at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449) > at > kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591) > at > kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481) > at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > Caused by: org.apache.zookeeper.KeeperException$NoNodeException: > KeeperErrorCode = NoNode for > /consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68 > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:102) > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:42) > at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921) > at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950) > at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103) > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770) > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766) > at > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) > >