Hi
My streaming application gets killed with below error
5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
ArrayBuffer(kafka.common.NotLeaderForPartitionException,
kafka.common.NotLeaderForPartitionException,
kafka.common.NotLeaderForPartitionException,
kafka.common.NotLeaderForPartitionException,
kafka.common.NotLeaderForPartitionException,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
[testtopic,193]))
15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs for
time 1440626120000 ms
org.apache.spark.SparkException:
ArrayBuffer(kafka.common.NotLeaderForPartitionException,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([testtopic,115]))
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at
Kafka params in job logs printed are :
value.serializer = class
org.apache.kafka.common.serialization.StringSerializer
key.serializer = class
org.apache.kafka.common.serialization.StringSerializer
block.on.buffer.full = true
retry.backoff.ms = 100
buffer.memory = 1048576
batch.size = 16384
metrics.sample.window.ms = 30000
metadata.max.age.ms = 300000
receive.buffer.bytes = 32768
timeout.ms = 30000
max.in.flight.requests.per.connection = 5
bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
metric.reporters = []
client.id =
compression.type = none
retries = 0
max.request.size = 1048576
send.buffer.bytes = 131072
acks = all
reconnect.backoff.ms = 10
linger.ms = 0
metrics.num.samples = 2
metadata.fetch.timeout.ms = 60000
Is it kafka broker getting down and job is getting killed ? Whats the best
way to handle it ?
Increasing retries and backoff time wil help and to what values those
should be set to never have streaming application failure - rather it keep
on retrying after few seconds and send a event so that my custom code can
send notification of kafka broker down if its because of that.
Thanks