Hi, I'm having trouble with a spark (1.2.1) streaming job which is using apache kafka (client: 0.8.1.1).
The job is consuming messages, and the observed behaviour is that after processing some messages, the job stops processing, then restarts, then consumes the lag which build up during the pause and the cycle repeats. I see plenty of logs, amongst other WARNs like: kafka.consumer.SimpleConsumer: Reconnect due to socket error: null. ERRORS of the form: Deregistered receiver for stream 0: Error starting receiver 0 - kafka.common.ConsumerRebalanceFailedException: SparkStreamingJob-com.allegrogroup.reco.rtp.events.item.ItemChangedSparkStreaming_s41696.dc4.local-1432199965502-e860d377 can't rebalance after 4 retries at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432) at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722) at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212) at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:138) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:111) kafka.consumer.ZookeeperConsumerConnector: my-streaimng-job can't rebalance after 4 retries kafka.common.ConsumerRebalanceFailedException: my-streaimng-job can't rebalance after 4 retries at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355) kafka.producer.SyncProducer: Producer connection to xxx-broker:9092 unsuccessful java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:650) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) at kafka.producer.SyncProducer.send(SyncProducer.scala:112) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) Any ideas what the reason might be? The borker logs contain nothing suspicious. The funny and most important part is, I have another job reading from the same topic (with a different consumer group) and this job has no trouble consuming the messages. This would indicate an error in the streaming job, however, these logs from kafka are prevalent and I can't find the cause for this. The method kafka.client.ClientUtils#fetchTopicMetadata is called in the failing job quite often, the working one does not call it (I base my assumption on the presence of "Fetching metadata from broker..." mesages). What could be the reason for that? Thanks for any hints, Kornel