If a consumer rebalances for any reason (e.g., if a consumer in the group has a soft failure such as a long GC) then the fetchers are stopped as part of the rebalance process. The sequence is as follows:
- Stop fetchers - Commit offsets - Release partition ownership - Rebalance (i.e., figure out what partitions this consumer should now consume with the updated set of consumers) - Acquire partition ownership - Add fetchers to those partitions and resume consumption i.e., rebalances should complete successfully and fetching should resume. If you have any rebalance failures (search for "can't rebalance after") then the consumer will effectively stop. >From later in this thread it seems your consumer somehow got into a weird state in zookeeper, so your only recourse at this point may be to stop all your consumers and restart. Thanks, Joel > If the fetchers get shutdown, due to a ClosedByInterruptException in the > "leader_finder" thread, which tells the "executor_watcher" thread to shutdown > the fetchers, that would be another reason the consumers stop processing > data. Is this possible? > > > -----Original Message----- > From: Seshadri, Balaji [mailto:balaji.sesha...@dish.com] > Sent: Friday, January 10, 2014 11:40 AM > To: users@kafka.apache.org > Subject: RE: Looks like consumer fetchers get stopped we are not getting any > data > > It would be helpful if you guys can shed some light why all fetchers are > getting stopped. > > -----Original Message----- > From: Seshadri, Balaji [mailto:balaji.sesha...@dish.com] > Sent: Friday, January 10, 2014 11:28 AM > To: users@kafka.apache.org > Subject: RE: Looks like consumer fetchers get stopped we are not getting any > data > > We also got the below error when this happens. > > {2014-01-10 00:58:11,292} INFO > [account-info-updated-hadoop-consumer_tm1mwdpl04-1389222553159-ad59660b_watcher_executor] > (?:?) - > [account-info-updated-hadoop-consumer_tm1mwdpl04-1389222553159-ad59660b], > exception during rebalance > org.I0Itec.zkclient.exception.ZkNoNodeException: > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = > NoNode for > /consumers/account-info-updated-hadoop-consumer/ids/account-info-updated-hadoop-consumer_tm1mwdpl04-1389222553159-ad59660b > at > org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766) > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) > at kafka.utils.ZkUtils$.readData(Unknown Source) > at kafka.consumer.TopicCount$.constructTopicCount(Unknown Source) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(Unknown > Source) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown > Source) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:142) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown > Source) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(Unknown > Source) Caused by: org.apache.zookeeper.KeeperException$NoNodeException: > KeeperErrorCode = NoNode for > /consumers/account-info-updated-hadoop-consumer/ids/account-info-updated-hadoop-consumer_tm1mwdpl04-1389222553159-ad59660b > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:102) > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:42) > at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:927) > at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:956) > at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103) > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770) > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766) > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) > ... 9 more > > -----Original Message----- > From: Seshadri, Balaji [mailto:balaji.sesha...@dish.com] > Sent: Friday, January 10, 2014 10:52 AM > To: users@kafka.apache.org > Subject: Looks like consumer fetchers get stopped we are not getting any data > > Please let us know why we are not getting any data from Kafaka after this log > from Kafka,can you guys lets us know. > > What could be causing all fetchers associated to be stooped why it is not > doing retry. > > {2014-01-10 00:58:09,284} WARN > [account-info-updated-hadoop-consumer_tm1mwdpl04-1389222553159-ad59660b-leader-finder-thread] > (?:?) - Fetching topic metadata with correlation id 3 for topics > [Set(account-info-updated)] from broker > [id:1,host:tm1-kafkabroker101,port:9092] failed > java.nio.channels.ClosedByInterruptException > at > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:506) > at java.nio.channels.SocketChannel.write(SocketChannel.java:493) > at kafka.network.BoundedByteBufferSend.writeTo(Unknown Source) > at kafka.network.Send$class.writeCompletely(Unknown Source) > at kafka.network.BoundedByteBufferSend.writeCompletely(Unknown Source) > at kafka.network.BlockingChannel.send(Unknown Source) > at kafka.producer.SyncProducer.liftedTree1$1(Unknown Source) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(Unknown > Source) > at kafka.producer.SyncProducer.send(Unknown Source) > at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown Source) > at kafka.client.ClientUtils$.fetchTopicMetadata(Unknown Source) > at > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(Unknown > Source) > at kafka.utils.ShutdownableThread.run(Unknown Source) > {2014-01-10 00:58:09,284} DEBUG > [account-access-hadoop-consumer_tm1mwdpl04-1389222551916-a0c87abc_watcher_executor] > (?:?) - initial fetch offset of account-access:27: fetched offset = 9655: > consumed offset = 9655 is 9655 > {2014-01-10 00:58:09,284} DEBUG > [bill-generated-hadoop-consumer_tm1mwdpl04-1389222547995-29a6dce9_watcher_executor] > (?:?) - initial consumer offset of bill-generated:11: fetched offset = 152: > consumed offset = 152 is 152 > {2014-01-10 00:58:09,284} DEBUG > [outbound-communications-hadoop-consumer_tm1mwdpl04-1389222550693-8bc34b77_watcher_executor] > (?:?) - > [outbound-communications-hadoop-consumer_tm1mwdpl04-1389222550693-8bc34b77], > outbound-communications:108: fetched offset = 1689: consumed offset = 1689 > selected new offset 1689 > {2014-01-10 00:58:09,284} DEBUG > [catalina-exec-3-SendThread(tvip-m1-mw-zookeeper.dish.com:2181)] > (ClientCnxn.java:839) - Reading reply sessionid:0x1434b49cf56383b, packet:: > clientPath:null serverPath:null finished:false header:: 279,4 replyHeader:: > 279,51539617506,0 request:: > '/consumers/outbound-call-attempted-hadoop-consumer/offsets/outbound-call-attempted/14,F > response:: > #30,s{39860186414,39860186414,1387517714994,1387517714994,0,0,0,0,1,0,39860186414} > {2014-01-10 00:58:09,285} INFO > [outbound-communications-hadoop-consumer_tm1mwdpl04-1389222550693-8bc34b77_watcher_executor] > (?:?) - > [outbound-communications-hadoop-consumer_tm1mwdpl04-1389222550693-8bc34b77], > outbound-communications-hadoop-consumer_tm1mwdpl04-1389222550693-8bc34b77-3 > attempting to claim partition 109 > {2014-01-10 00:58:09,284} DEBUG > [bill-generated-hadoop-consumer_tm1mwdpl04-1389222547995-29a6dce9_watcher_executor] > (?:?) - initial fetch offset of bill-generated:11: fetched offset = 152: > consumed offset = 152 is 152 > {2014-01-10 00:58:09,284} DEBUG > [catalina-exec-12-SendThread(tvip-m1-mw-zookeeper.dish.com:2181)] > (ClientCnxn.java:839) - Reading reply sessionid:0x1437b2879870005, packet:: > clientPath:null serverPath:null finished:false header:: 619,1 replyHeader:: > 619,51539617508,0 request:: > '/consumers/account-activated-hadoop-consumer/owners/account-activated/68,#6163636f756e742d6163746976617465642d6861646f6f702d636f6e73756d65725f746d316d7764706c30342d313338393232323535373930362d35363262363733382d30,v{s{31,s{'world,'anyone}}},1 > response:: > '/consumers/account-activated-hadoop-consumer/owners/account-activated/68 > {2014-01-10 00:58:09,284} DEBUG > [account-access-hadoop-consumer_tm1mwdpl04-1389222551916-a0c87abc_watcher_executor] > (?:?) - [account-access-hadoop-consumer_tm1mwdpl04-1389222551916-a0c87abc], > account-access:27: fetched offset = 9655: consumed offset = 9655 selected new > offset 9655 > {2014-01-10 00:58:09,284} INFO > [account-info-updated-hadoop-consumer_tm1mwdpl04-1389222553159-ad59660b_watcher_executor] > (?:?) - > [account-info-updated-hadoop-consumer_tm1mwdpl04-1389222553159-ad59660b-leader-finder-thread], > Shutdown completed > {2014-01-10 00:58:09,284} INFO > [account-info-updated-hadoop-consumer_tm1mwdpl04-1389222553159-ad59660b-leader-finder-thread] > (?:?) - > [account-info-updated-hadoop-consumer_tm1mwdpl04-1389222553159-ad59660b-leader-finder-thread], > Stopped > {2014-01-10 00:58:09,285} INFO > [account-info-updated-hadoop-consumer_tm1mwdpl04-1389222553159-ad59660b_watcher_executor] > (?:?) - [ConsumerFetcherManager-1389222553163] Stopping all fetchers >