Bhavesh, In 0.9 consumer would not talk to ZK and will be single threaded, which will be easier to provide monitoring mechanisms.
Guozhang On Thu, Nov 20, 2014 at 8:15 PM, Jun Rao <jun...@gmail.com> wrote: > Can you just monitor the consumer byte/message/fetch rate? > > Thanks, > > Jun > > On Thu, Nov 20, 2014 at 5:31 PM, Bhavesh Mistry < > mistry.p.bhav...@gmail.com> > wrote: > > > HI Jun, > > > > Do you want me to request Jira ticket for feature a notification for new > > consumer API and old consumer feature that consumer stream is dying. So > > application can try to restart it programmatically. I understand this is > > due to network or zk cluster instability. > > > > Let me know if you have alternative proposal for this for new and old > > high-level consumer API. > > > > Thanks, > > > > Bhavesh > > > > On Tue, Nov 18, 2014 at 9:53 PM, Bhavesh Mistry < > > mistry.p.bhav...@gmail.com> > > wrote: > > > > > Hi Jun, > > > > > > ZK cluster are up and running. What is best way to programmatically > > > recover and I would try to exponential recovery process which I am > > willing > > > to implement. So do you think monitoring "ZkClient-EventThread > > > <http://zkclient-eventthread-65-dare-msgq00.sv.walmartlabs.com:9091/ > >*" > > > thread status will be enough to indicate source thread is dead and > > > therefore start exponential reconnect process ? > > > > > > Can you guys at least for new consumer api (0.9.0) provide a call back > > > method or notification the consumer is died and reason for it ? > > > > > > > > > Thanks, > > > Bhavesh > > > > > > > > > > > > On Tue, Nov 18, 2014 at 9:34 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > >> Is your ZK service alive at that point? If not, you just need to > monitor > > >> the ZK server properly. > > >> > > >> Thanks, > > >> > > >> Jun > > >> > > >> On Mon, Nov 17, 2014 at 2:30 PM, Bhavesh Mistry < > > >> mistry.p.bhav...@gmail.com> > > >> wrote: > > >> > > >> > Hi Kafka Team, > > >> > > > >> > > > >> > I get following exception due to ZK/Network issues intermittently. > > How > > >> do > > >> > I recover from consumer thread dying *programmatically* and restart > > >> source > > >> > because we have alerts that due to this error we have partition > > >> OWNERSHIP > > >> > is *none* ? Please let me know how to restart source and detect > > >> consumer > > >> > thread died and need to be restarted ? > > >> > > > >> > > > >> > > > >> > 17 Nov 2014 04:29:41,180 ERROR [ > > >> > ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091, > > >> > dare-msgq01.sv.walmartlabs.com:9091, > > dare-msgq02.sv.walmartlabs.com:9091 > > >> ] > > >> > (org.I0Itec.zkclient.ZkEventThread.run:77) - Error handling event > > >> > ZkEvent[New session event sent to > > >> > > > >> > > > kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8 > > >> > ] > > >> > kafka.common.ConsumerRebalanceFailedException: > > >> > > > >> > > > >> > > > mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68 > > >> > can't rebalance after 8 retries > > >> > at > > >> > > > >> > > > >> > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626) > > >> > at > > >> > > > >> > > > >> > > > kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481) > > >> > at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472) > > >> > at > > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > ZK Connection Issues: > > >> > > > >> > java.net.SocketException: Transport endpoint is not connected > > >> > at sun.nio.ch.SocketChannelImpl.shutdown(Native Method) > > >> > at > > >> > > sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633) > > >> > at > > >> sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360) > > >> > at > > >> > > > org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205) > > >> > at > > >> > org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170) > > >> > > > >> > > > >> > > > >> > > > >> > at > > >> > > org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) > > >> > at > > >> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) > > >> > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766) > > >> > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) > > >> > at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449) > > >> > at > > >> > kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61) > > >> > at > > >> > > > >> > > > >> > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630) > > >> > at > > >> > > > >> > > > >> > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601) > > >> > at > > >> scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) > > >> > at > > >> > > > >> > > > >> > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595) > > >> > at > > >> > > > >> > > > >> > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592) > > >> > at > > >> > > > >> > > > >> > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592) > > >> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > >> > at > > >> > > > >> > > > >> > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591) > > >> > at > > >> > > > >> > > > >> > > > kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481) > > >> > at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472) > > >> > at > > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > > >> > Caused by: org.apache.zookeeper.KeeperException$NoNodeException: > > >> > KeeperErrorCode = NoNode for > > >> > > > >> > > > >> > > > /consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68 > > >> > at > > >> > > org.apache.zookeeper.KeeperException.create(KeeperException.java:102) > > >> > at > > >> > org.apache.zookeeper.KeeperException.create(KeeperException.java:42) > > >> > at > org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921) > > >> > at > org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950) > > >> > at > > >> org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103) > > >> > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770) > > >> > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766) > > >> > at > > >> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) > > >> > > > >> > > > > > > > > > -- -- Guozhang