Bhavesh,

In 0.9 consumer would not talk to ZK and will be single threaded, which
will be easier to provide monitoring mechanisms.

Guozhang

On Thu, Nov 20, 2014 at 8:15 PM, Jun Rao <jun...@gmail.com> wrote:

> Can you just monitor the consumer byte/message/fetch rate?
>
> Thanks,
>
> Jun
>
> On Thu, Nov 20, 2014 at 5:31 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > HI Jun,
> >
> > Do you want me to request Jira ticket for feature a notification for new
> > consumer API and old consumer feature that consumer stream is dying.  So
> > application can try to restart it programmatically.  I understand this is
> > due to network or zk cluster instability.
> >
> > Let me know if you have alternative proposal for this for new  and old
> > high-level consumer API.
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Tue, Nov 18, 2014 at 9:53 PM, Bhavesh Mistry <
> > mistry.p.bhav...@gmail.com>
> > wrote:
> >
> > > Hi Jun,
> > >
> > > ZK cluster are up and running.  What is best way to programmatically
> > > recover and I would try to exponential recovery process which I am
> > willing
> > > to implement.    So do you think monitoring  "ZkClient-EventThread
> > > <http://zkclient-eventthread-65-dare-msgq00.sv.walmartlabs.com:9091/
> >*"
> > >  thread status will be enough to indicate source thread is dead and
> > > therefore start exponential reconnect process ?
> > >
> > > Can you guys at least for new consumer api (0.9.0) provide a call back
> > > method or notification the consumer is died and reason for it ?
> > >
> > >
> > > Thanks,
> > > Bhavesh
> > >
> > >
> > >
> > > On Tue, Nov 18, 2014 at 9:34 PM, Jun Rao <jun...@gmail.com> wrote:
> > >
> > >> Is your ZK service alive at that point? If not, you just need to
> monitor
> > >> the ZK server properly.
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >> On Mon, Nov 17, 2014 at 2:30 PM, Bhavesh Mistry <
> > >> mistry.p.bhav...@gmail.com>
> > >> wrote:
> > >>
> > >> > Hi Kafka Team,
> > >> >
> > >> >
> > >> > I get following exception due to ZK/Network issues intermittently.
> > How
> > >> do
> > >> > I recover from consumer thread dying *programmatically* and restart
> > >> source
> > >> > because we have alerts that due to this error we have partition
> > >> OWNERSHIP
> > >> > is *none* ?  Please let me know how to restart source and detect
> > >> consumer
> > >> > thread died and need to be restarted ?
> > >> >
> > >> >
> > >> >
> > >> > 17 Nov 2014 04:29:41,180 ERROR [
> > >> > ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091,
> > >> > dare-msgq01.sv.walmartlabs.com:9091,
> > dare-msgq02.sv.walmartlabs.com:9091
> > >> ]
> > >> > (org.I0Itec.zkclient.ZkEventThread.run:77)  - Error handling event
> > >> > ZkEvent[New session event sent to
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8
> > >> > ]
> > >> > kafka.common.ConsumerRebalanceFailedException:
> > >> >
> > >> >
> > >>
> >
> mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> > >> > can't rebalance after 8 retries
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
> > >> >         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> > >> >         at
> > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > ZK Connection Issues:
> > >> >
> > >> > java.net.SocketException: Transport endpoint is not connected
> > >> >         at sun.nio.ch.SocketChannelImpl.shutdown(Native Method)
> > >> >         at
> > >> >
> sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633)
> > >> >         at
> > >> sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360)
> > >> >         at
> > >> >
> > org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205)
> > >> >         at
> > >> > org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170)
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >         at
> > >> >
> org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> > >> >         at
> > >> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> > >> >         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> > >> >         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> > >> >         at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449)
> > >> >         at
> > >> > kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601)
> > >> >         at
> > >> scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
> > >> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591)
> > >> >         at
> > >> >
> > >> >
> > >>
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
> > >> >         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> > >> >         at
> > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> > >> > Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
> > >> > KeeperErrorCode = NoNode for
> > >> >
> > >> >
> > >>
> >
> /consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
> > >> >         at
> > >> >
> org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
> > >> >         at
> > >> > org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
> > >> >         at
> org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921)
> > >> >         at
> org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950)
> > >> >         at
> > >> org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
> > >> >         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
> > >> >         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
> > >> >         at
> > >> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> > >> >
> > >>
> > >
> > >
> >
>



-- 
-- Guozhang

Reply via email to