One way to narrow down the issue is to attach a debugger to the Kafka
JVM and add a breakpoint in SimpleConsumer to see the real exception
stacktrace which is causing the reconnect. I've filed a JIRA with a
patch to improve this logging to include the entire cause stacktrace
while logging this message https://issues.apache.org/jira/browse/KAFKA-2221
-Jaikiran
On Thursday 21 May 2015 03:36 PM, Kornel wrote:
Hi,
I'm having trouble with a spark (1.2.1) streaming job which is using apache
kafka (client: 0.8.1.1).
The job is consuming messages, and the observed behaviour is that after
processing some messages, the job stops processing, then restarts, then
consumes the lag which build up during the pause and the cycle repeats.
I see plenty of logs, amongst other WARNs like:
kafka.consumer.SimpleConsumer: Reconnect due to socket error: null.
ERRORS of the form:
Deregistered receiver for stream 0: Error starting receiver 0 -
kafka.common.ConsumerRebalanceFailedException:
SparkStreamingJob-com.allegrogroup.reco.rtp.events.item.ItemChangedSparkStreaming_s41696.dc4.local-1432199965502-e860d377
can't rebalance after 4 retries
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
at
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
at
kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)
at
kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:138)
at
org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:111)
kafka.consumer.ZookeeperConsumerConnector: my-streaimng-job can't rebalance
after 4 retries
kafka.common.ConsumerRebalanceFailedException: my-streaimng-job can't
rebalance after 4 retries
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355)
kafka.producer.SyncProducer: Producer connection to xxx-broker:9092
unsuccessful
java.nio.channels.ClosedByInterruptException
at
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:650)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Any ideas what the reason might be? The borker logs contain nothing
suspicious. The funny and most important part is, I have another job
reading from the same topic (with a different consumer group) and this job
has no trouble consuming the messages. This would indicate an error in the
streaming job, however, these logs from kafka are prevalent and I can't
find the cause for this.
The method kafka.client.ClientUtils#fetchTopicMetadata is called in the
failing job quite often, the working one does not call it (I base my
assumption on the presence of "Fetching metadata from broker..." mesages).
What could be the reason for that?
Thanks for any hints,
Kornel