Can you see if this applies in your case: https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyaretheremanyrebalancesinmyconsumerlog%3F
Also, what version of kafka 0.8 are you using? If not the beta, then what's the git hash? Joel On Thu, Nov 07, 2013 at 02:51:41PM -0500, Ahmed H. wrote: > Hello all, > > I am not sure if this is a Kafka issue, or an issue with the client that I > am using. > > We have a fairly small setup, where everything sits on one server (Kafka > 0.8, and Zookeeper). The message frequency is not too high (1-2 per second). > > The setup works fine for a certain period of time but at some point, it > just dies, and exceptions are thrown. This is pretty much a daily > occurrence, but there is no pattern. Based on the logs, it appears that the > Kafka client tries to rebalance with Zookeeper and fails, it tries and > tries multiple times but after a few tries it gives up. Here is the stack > trace: > > 04:56:07,234 INFO [kafka.consumer.SimpleConsumer] > > (ConsumerFetcherThread-kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0-0) > > Reconnect due to socket error: : > > java.nio.channels.ClosedByInterruptException > > at > > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) > > [rt.jar:1.7.0_25] > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:402) > > [rt.jar:1.7.0_25] > > at > > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:220) > > [rt.jar:1.7.0_25] > > at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) > > [rt.jar:1.7.0_25] > > at > > java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) > > [rt.jar:1.7.0_25] > > at kafka.utils.Utils$.read(Utils.scala:394) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:71) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:108) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:107) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:106) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > 04:56:07,238 WARN [kafka.consumer.ConsumerFetcherThread] > > (ConsumerFetcherThread-kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0-0) > > [ConsumerFetcherThread-kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0-0], > > Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 0; ClientId: > > kafkaqueue.notifications-ConsumerFetcherThread-kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0-0; > > ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: > > [kafkaqueue.notifications,0] -> PartitionFetchInfo(216003,1048576): > > java.nio.channels.ClosedByInterruptException > > at > > java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) > > [rt.jar:1.7.0_25] > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:650) > > [rt.jar:1.7.0_25] > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:43) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:56) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:108) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:107) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:106) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > 04:56:07,240 INFO [kafka.consumer.ConsumerFetcherThread] > > (ConsumerFetcherThread-kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0-0) > > [ConsumerFetcherThread-kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0-0], > > Stopped > > 04:56:07,240 INFO [kafka.consumer.ConsumerFetcherThread] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor) > > [ConsumerFetcherThread-kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0-0], > > Shutdown completed > > 04:56:07,241 INFO [kafka.consumer.ConsumerFetcherManager] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor) > > [ConsumerFetcherManager-1383643783834] All connections stopped > > 04:56:07,241 INFO [kafka.consumer.ZookeeperConsumerConnector] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor) > > [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5], > > Cleared all relevant queues for this fetcher > > 04:56:07,242 INFO [kafka.consumer.ZookeeperConsumerConnector] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor) > > [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5], > > Cleared the data chunks in all the consumer message iterators > > 04:56:07,242 INFO [kafka.consumer.ZookeeperConsumerConnector] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor) > > [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5], > > Committing all offsets after clearing the fetcher queues > > 04:56:07,245 INFO [kafka.consumer.ZookeeperConsumerConnector] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor) > > [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5], > > Releasing partition ownership > > 04:56:07,248 INFO [kafka.consumer.ZookeeperConsumerConnector] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor) > > [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5], > > Consumer > > kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5 > > rebalancing the following partitions: ArrayBuffer(0) for topic > > kafkaqueue.notifications with consumers: > > List(kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0) > > 04:56:07,249 INFO [kafka.consumer.ZookeeperConsumerConnector] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor) > > [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5], > > kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0 > > attempting to claim partition 0 > > 04:56:07,252 INFO [kafka.consumer.ZookeeperConsumerConnector] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor) > > [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5], > > kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0 > > successfully owned partition 0 for topic kafkaqueue.notifications > > 04:56:07,253 INFO [kafka.consumer.ZookeeperConsumerConnector] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor) > > [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5], > > Updating the cache > > 04:56:07,254 INFO [proj.hd.core] (clojure-agent-send-off-pool-5) Invalid > > node name. Not performing walk. Node name: POC6O003.2:BER:1/19/1 > > 04:56:07,254 INFO [kafka.consumer.ZookeeperConsumerConnector] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor) > > [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5], > > Consumer > > kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5 > > selected partitions : kafkaqueue.notifications:0: fetched offset = 216003: > > consumed offset = 216003 > > 04:56:07,255 INFO [kafka.consumer.ZookeeperConsumerConnector] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor) > > [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5], end > > rebalancing consumer > > kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5 try #0 > > 04:56:07,257 INFO > > [kafka.consumer.ConsumerFetcherManager$LeaderFinderThread] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread) > > [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread], > > Starting > > 04:56:07,265 INFO [kafka.utils.VerifiableProperties] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread) > > Verifying properties > > 04:56:07,265 INFO [kafka.utils.VerifiableProperties] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread) > > Property metadata.broker.list is overridden to test-server.localnet:9092 > > 04:56:07,266 INFO [kafka.utils.VerifiableProperties] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread) > > Property request.timeout.ms is overridden to 30000 > > 04:56:07,266 INFO [kafka.utils.VerifiableProperties] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread) > > Property client.id is overridden to kafkaqueue.notifications > > 04:56:07,267 INFO [kafka.client.ClientUtils$] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread) > > Fetching metadata from broker id:0,host:test-server.localnet,port:9092 with > > correlation id 15 for 1 topic(s) Set(kafkaqueue.notifications) > > 04:56:07,268 INFO [kafka.producer.SyncProducer] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread) > > Connected to test-server.localnet:9092 for producing > > 04:56:07,272 INFO [kafka.producer.SyncProducer] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread) > > Disconnecting from test-server.localnet:9092 > > 04:56:07,274 INFO [kafka.consumer.ConsumerFetcherManager] > > (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread) > > [ConsumerFetcherManager-1383643783834] Adding fetcher for partition > > [kafkaqueue.notifications,0], initOffset 216003 to broker 0 with fetcherId 0 > > 04:56:07,275 INFO [kafka.consumer.ConsumerFetcherThread] > > (ConsumerFetcherThread-kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0-0) > > [ConsumerFetcherThread-kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0-0], > > Starting > > 04:56:07,281 INFO [proj.hd.core] (clojure-agent-send-off-pool-5) Invalid > > node name. Not performing walk. Node name: B2Z_0053.2:Rx Frequency:1/2/1 > > 04:56:10,010 INFO [kafka.consumer.ZookeeperConsumerConnector] > > (kafkaqueue.topology.updates_test-server.localnet-1383643783747-c7775701_watcher_executor) > > [kafkaqueue.topology.updates_test-server.localnet-1383643783747-c7775701], > > begin rebalancing consumer > > kafkaqueue.topology.updates_test-server.localnet-1383643783747-c7775701 try > > #0 > > 04:56:10,020 INFO [kafka.consumer.ZookeeperConsumerConnector] > > (kafkaqueue.topology.updates_test-server.localnet-1383643783747-c7775701_watcher_executor) > > [kafkaqueue.topology.updates_test-server.localnet-1383643783747-c7775701], > > exception during rebalance : > > org.I0Itec.zkclient.exception.ZkNoNodeException: > > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = > > NoNode for > > /consumers/kafkaqueue.topology.updates/ids/kafkaqueue.topology.updates_test-server.localnet-1383643783747-c7775701 > > at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) > > [zkclient-0.3.jar:0.3] > > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) > > [zkclient-0.3.jar:0.3] > > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766) > > [zkclient-0.3.jar:0.3] > > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) > > [zkclient-0.3.jar:0.3] > > at kafka.utils.ZkUtils$.readData(ZkUtils.scala:407) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:52) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:401) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:78) > > [scala-library-2.9.2.jar:] > > at > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:369) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > at > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:326) > > [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT] > > Caused by: org.apache.zookeeper.KeeperException$NoNodeException: > > KeeperErrorCode = NoNode for > > /consumers/kafkaqueue.topology.updates/ids/kafkaqueue.topology.updates_test-server.localnet-1383643783747-c7775701 > > at org.apache.zookeeper.KeeperException.create(KeeperException.java:111) > > [zookeeper-3.4.3.jar:3.4.3-1240972] > > at org.apache.zookeeper.KeeperException.create(KeeperException.java:51) > > [zookeeper-3.4.3.jar:3.4.3-1240972] > > at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1131) > > [zookeeper-3.4.3.jar:3.4.3-1240972] > > at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1160) > > [zookeeper-3.4.3.jar:3.4.3-1240972] > > at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103) > > [zkclient-0.3.jar:0.3] > > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770) > > [zkclient-0.3.jar:0.3] > > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766) > > [zkclient-0.3.jar:0.3] > > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) > > [zkclient-0.3.jar:0.3] > > ... 9 more > > > The attempts to rebalance occur a few times but eventually,this message > shows up: "can't rebalance after 4 retries". > > Our app is deployed in JBoss and the only way to recover from this is to > restart JBoss. > > This started happening after we went from Kafka 0.7 to Kafka 0.8. Nothing > else on our system changed except for that. We are connecting to Kafka > using a Clojure library called clj-kafka ( > https://github.com/pingles/clj-kafka). clj-kafka was updated to work with > Kafka 0.8... > > My apologies if this post doesn't belong here. I'm hoping that this may be > a generic issue rather than an issue specific to how we're connecting to > Kafka. Any ideas are appreciated. > > Thanks!