One way to narrow down the issue is to attach a debugger to the Kafka JVM and add a breakpoint in SimpleConsumer to see the real exception stacktrace which is causing the reconnect. I've filed a JIRA with a patch to improve this logging to include the entire cause stacktrace while logging this message https://issues.apache.org/jira/browse/KAFKA-2221

-Jaikiran
On Thursday 21 May 2015 03:36 PM, Kornel wrote:
Hi,

I'm having trouble with a spark (1.2.1) streaming job which is using apache
kafka (client: 0.8.1.1).

The job is consuming messages, and the observed behaviour is that after
processing some messages, the job stops processing, then restarts, then
consumes the lag which build up during the pause and the cycle repeats.

I see plenty of logs, amongst other WARNs like:

kafka.consumer.SimpleConsumer: Reconnect due to socket error: null.

ERRORS of the form:

Deregistered receiver for stream 0: Error starting receiver 0 -
kafka.common.ConsumerRebalanceFailedException:
SparkStreamingJob-com.allegrogroup.reco.rtp.events.item.ItemChangedSparkStreaming_s41696.dc4.local-1432199965502-e860d377
can't rebalance after 4 retries

at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)

at
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)

at
kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)

at
kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:138)

at
org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:111)

kafka.consumer.ZookeeperConsumerConnector: my-streaimng-job can't rebalance
after 4 retries

kafka.common.ConsumerRebalanceFailedException: my-streaimng-job can't
rebalance after 4 retries

at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)

at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355)


kafka.producer.SyncProducer: Producer connection to xxx-broker:9092
unsuccessful

java.nio.channels.ClosedByInterruptException

at
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)

at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:650)

at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)

at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)

at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)

at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)

at kafka.producer.SyncProducer.send(SyncProducer.scala:112)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)

at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)

at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)

at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

Any ideas what the reason might be? The borker logs contain nothing
suspicious. The funny and most important part is, I have another job
reading from the same topic (with a different consumer group) and this job
has no trouble consuming the messages. This would indicate an error in the
streaming job, however, these logs from kafka are prevalent and I can't
find the cause for this.


The method kafka.client.ClientUtils#fetchTopicMetadata is called in the
failing job quite often, the working one does not call it (I base my
assumption on the presence of "Fetching metadata from broker..." mesages).
What could be the reason for that?



Thanks for any hints,

  Kornel


Reply via email to