Hello all,

I am not sure if this is a Kafka issue, or an issue with the client that I
am using.

We have a fairly small setup, where everything sits on one server (Kafka
0.8, and Zookeeper). The message frequency is not too high (1-2 per second).

The setup works fine for a certain period of time but at some point, it
just dies, and exceptions are thrown. This is pretty much a daily
occurrence, but there is no pattern. Based on the logs, it appears that the
Kafka client tries to rebalance with Zookeeper and fails, it tries and
tries multiple times but after a few tries it gives up. Here is the stack
trace:

04:56:07,234 INFO  [kafka.consumer.SimpleConsumer]
> (ConsumerFetcherThread-kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0-0)
> Reconnect due to socket error: :
> java.nio.channels.ClosedByInterruptException
>  at
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> [rt.jar:1.7.0_25]
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:402)
> [rt.jar:1.7.0_25]
>  at
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:220)
> [rt.jar:1.7.0_25]
> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> [rt.jar:1.7.0_25]
>  at
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
> [rt.jar:1.7.0_25]
> at kafka.utils.Utils$.read(Utils.scala:394)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:71)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:108)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:107)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:106)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> 04:56:07,238 WARN  [kafka.consumer.ConsumerFetcherThread]
> (ConsumerFetcherThread-kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0-0)
> [ConsumerFetcherThread-kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0-0],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 0; ClientId:
> kafkaqueue.notifications-ConsumerFetcherThread-kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0-0;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> [kafkaqueue.notifications,0] -> PartitionFetchInfo(216003,1048576):
> java.nio.channels.ClosedByInterruptException
>  at
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
> [rt.jar:1.7.0_25]
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:650)
> [rt.jar:1.7.0_25]
>  at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:43)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:56)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:77)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:108)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:108)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:107)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:107)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:106)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> 04:56:07,240 INFO  [kafka.consumer.ConsumerFetcherThread]
> (ConsumerFetcherThread-kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0-0)
> [ConsumerFetcherThread-kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0-0],
> Stopped
> 04:56:07,240 INFO  [kafka.consumer.ConsumerFetcherThread]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor)
> [ConsumerFetcherThread-kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0-0],
> Shutdown completed
> 04:56:07,241 INFO  [kafka.consumer.ConsumerFetcherManager]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor)
> [ConsumerFetcherManager-1383643783834] All connections stopped
> 04:56:07,241 INFO  [kafka.consumer.ZookeeperConsumerConnector]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor)
> [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5],
> Cleared all relevant queues for this fetcher
> 04:56:07,242 INFO  [kafka.consumer.ZookeeperConsumerConnector]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor)
> [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5],
> Cleared the data chunks in all the consumer message iterators
> 04:56:07,242 INFO  [kafka.consumer.ZookeeperConsumerConnector]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor)
> [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5],
> Committing all offsets after clearing the fetcher queues
> 04:56:07,245 INFO  [kafka.consumer.ZookeeperConsumerConnector]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor)
> [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5],
> Releasing partition ownership
> 04:56:07,248 INFO  [kafka.consumer.ZookeeperConsumerConnector]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor)
> [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5],
> Consumer
> kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5
> rebalancing the following partitions: ArrayBuffer(0) for topic
> kafkaqueue.notifications with consumers:
> List(kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0)
> 04:56:07,249 INFO  [kafka.consumer.ZookeeperConsumerConnector]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor)
> [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5],
> kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0
> attempting to claim partition 0
> 04:56:07,252 INFO  [kafka.consumer.ZookeeperConsumerConnector]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor)
> [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5],
> kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0
> successfully owned partition 0 for topic kafkaqueue.notifications
> 04:56:07,253 INFO  [kafka.consumer.ZookeeperConsumerConnector]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor)
> [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5],
> Updating the cache
> 04:56:07,254 INFO  [proj.hd.core] (clojure-agent-send-off-pool-5) Invalid
> node name. Not performing walk. Node name:  POC6O003.2:BER:1/19/1
> 04:56:07,254 INFO  [kafka.consumer.ZookeeperConsumerConnector]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor)
> [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5],
> Consumer
> kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5
> selected partitions : kafkaqueue.notifications:0: fetched offset = 216003:
> consumed offset = 216003
> 04:56:07,255 INFO  [kafka.consumer.ZookeeperConsumerConnector]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5_watcher_executor)
> [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5], end
> rebalancing consumer
> kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5 try #0
> 04:56:07,257 INFO
>  [kafka.consumer.ConsumerFetcherManager$LeaderFinderThread]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread)
> [kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread],
> Starting
> 04:56:07,265 INFO  [kafka.utils.VerifiableProperties]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread)
> Verifying properties
> 04:56:07,265 INFO  [kafka.utils.VerifiableProperties]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread)
> Property metadata.broker.list is overridden to test-server.localnet:9092
> 04:56:07,266 INFO  [kafka.utils.VerifiableProperties]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread)
> Property request.timeout.ms is overridden to 30000
> 04:56:07,266 INFO  [kafka.utils.VerifiableProperties]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread)
> Property client.id is overridden to kafkaqueue.notifications
> 04:56:07,267 INFO  [kafka.client.ClientUtils$]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread)
> Fetching metadata from broker id:0,host:test-server.localnet,port:9092 with
> correlation id 15 for 1 topic(s) Set(kafkaqueue.notifications)
> 04:56:07,268 INFO  [kafka.producer.SyncProducer]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread)
> Connected to test-server.localnet:9092 for producing
> 04:56:07,272 INFO  [kafka.producer.SyncProducer]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread)
> Disconnecting from test-server.localnet:9092
> 04:56:07,274 INFO  [kafka.consumer.ConsumerFetcherManager]
> (kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-leader-finder-thread)
> [ConsumerFetcherManager-1383643783834] Adding fetcher for partition
> [kafkaqueue.notifications,0], initOffset 216003 to broker 0 with fetcherId 0
> 04:56:07,275 INFO  [kafka.consumer.ConsumerFetcherThread]
> (ConsumerFetcherThread-kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0-0)
> [ConsumerFetcherThread-kafkaqueue.notifications_test-server.localnet-1383643783745-3757e7a5-0-0],
> Starting
> 04:56:07,281 INFO  [proj.hd.core] (clojure-agent-send-off-pool-5) Invalid
> node name. Not performing walk. Node name:  B2Z_0053.2:Rx Frequency:1/2/1
> 04:56:10,010 INFO  [kafka.consumer.ZookeeperConsumerConnector]
> (kafkaqueue.topology.updates_test-server.localnet-1383643783747-c7775701_watcher_executor)
> [kafkaqueue.topology.updates_test-server.localnet-1383643783747-c7775701],
> begin rebalancing consumer
> kafkaqueue.topology.updates_test-server.localnet-1383643783747-c7775701 try
> #0
> 04:56:10,020 INFO  [kafka.consumer.ZookeeperConsumerConnector]
> (kafkaqueue.topology.updates_test-server.localnet-1383643783747-c7775701_watcher_executor)
> [kafkaqueue.topology.updates_test-server.localnet-1383643783747-c7775701],
> exception during rebalance :
> org.I0Itec.zkclient.exception.ZkNoNodeException:
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
> NoNode for
> /consumers/kafkaqueue.topology.updates/ids/kafkaqueue.topology.updates_test-server.localnet-1383643783747-c7775701
>  at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> [zkclient-0.3.jar:0.3]
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> [zkclient-0.3.jar:0.3]
>  at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> [zkclient-0.3.jar:0.3]
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> [zkclient-0.3.jar:0.3]
>  at kafka.utils.ZkUtils$.readData(ZkUtils.scala:407)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> at kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:52)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:401)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:78)
> [scala-library-2.9.2.jar:]
> at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:369)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>  at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:326)
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
> KeeperErrorCode = NoNode for
> /consumers/kafkaqueue.topology.updates/ids/kafkaqueue.topology.updates_test-server.localnet-1383643783747-c7775701
>  at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
> [zookeeper-3.4.3.jar:3.4.3-1240972]
> at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
> [zookeeper-3.4.3.jar:3.4.3-1240972]
>  at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1131)
> [zookeeper-3.4.3.jar:3.4.3-1240972]
> at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1160)
> [zookeeper-3.4.3.jar:3.4.3-1240972]
>  at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
> [zkclient-0.3.jar:0.3]
> at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
> [zkclient-0.3.jar:0.3]
>  at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
> [zkclient-0.3.jar:0.3]
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> [zkclient-0.3.jar:0.3]
>  ... 9 more


The attempts to rebalance occur a few times but eventually,this message
shows up: "can't rebalance after 4 retries".

Our app is deployed in JBoss and the only way to recover from this is to
restart JBoss.

This started happening after we went from Kafka 0.7 to Kafka 0.8. Nothing
else on our system changed except for that. We are connecting to Kafka
using a Clojure library called clj-kafka (
https://github.com/pingles/clj-kafka). clj-kafka was updated to work with
Kafka 0.8...

My apologies if this post doesn't belong here. I'm hoping that this may be
a generic issue rather than an issue specific to how we're connecting to
Kafka. Any ideas are appreciated.

Thanks!

Reply via email to