Spark specific questions are better directed to the Spark user list.

Spark will retry failed tasks automatically up to a configurable number of
times.  The direct stream will retry failures on the driver up to a
configurable number of times.

See

http://spark.apache.org/docs/latest/configuration.html

The properties you're looking for are

spark.task.maxFailures
spark.streaming.kafka.maxRetries

respectively

On Fri, Nov 20, 2015 at 7:12 AM, Charan Ganga Phani Adabala <
char...@eiqnetworks.com> wrote:

> Hi All,
>
> We are using the Apache Spark 1.5.1 and kafka_2.10-0.8.2.1 and Kafka
> DirectStream API to fetch data from Kafka using Spark.
>
> Kafka topic properties: Replication Factor :1 and Partitions : 1
> Kafka cluster size: 3 Nodes
>
>
> When all Kafka nodes are up & running, I could successfully get the data
> for all the topics.
>
>
>
> But when one of the Kafka node is down , we are getting below exceptions
> and though the Node is up after some time, still we are not succeeded, and
> the spark job is terminated. and unable fetch the data from remaining
> topics in Kafka.
>
> ERROR DirectKafkaInputDStream:125 -
> ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for
> Set([normalized-tenant4,0]))
> ERROR JobScheduler:96 - Error generating jobs for time 1447929990000 ms
> org.apache.spark.SparkException:
> ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for
> Set([normalized-tenant4,0]))
>         at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
>         at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>         at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>         at scala.Option.orElse(Option.scala:257)
>         at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>         at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>         at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>         at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>         at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>         at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>         at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>         at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>         at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
>         at scala.util.Try$.apply(Try.scala:161)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
>         at org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
>
> Thanks in advance please help how to resolve the issue.
>
>
>
>
>
>
>
> Thanks & Regards,
>
> *Ganga Phani Charan Adabala | Software Engineer*
>
> o:  +91-40-23116680 | c:  +91-9491418099
>
> EiQ Networks, Inc. <http://www.eiqnetworks.com/>
>
>
>
>
>
> [image: cid:image001.png@01D11C9D.AF5CC1F0] <http://www.eiqnetworks.com/>
>
> *"This email is intended only for the use of the individual or entity
> named above and may contain information that is confidential and
> privileged. If you are not the intended recipient, you are hereby notified
> that any dissemination, distribution or copying of the email is strictly
> prohibited. If you have received this email in error, please destroy
> the original message."*
>

Reply via email to