Spark specific questions are better directed to the Spark user list. Spark will retry failed tasks automatically up to a configurable number of times. The direct stream will retry failures on the driver up to a configurable number of times.
See http://spark.apache.org/docs/latest/configuration.html The properties you're looking for are spark.task.maxFailures spark.streaming.kafka.maxRetries respectively On Fri, Nov 20, 2015 at 7:12 AM, Charan Ganga Phani Adabala < char...@eiqnetworks.com> wrote: > Hi All, > > We are using the Apache Spark 1.5.1 and kafka_2.10-0.8.2.1 and Kafka > DirectStream API to fetch data from Kafka using Spark. > > Kafka topic properties: Replication Factor :1 and Partitions : 1 > Kafka cluster size: 3 Nodes > > > When all Kafka nodes are up & running, I could successfully get the data > for all the topics. > > > > But when one of the Kafka node is down , we are getting below exceptions > and though the Node is up after some time, still we are not succeeded, and > the spark job is terminated. and unable fetch the data from remaining > topics in Kafka. > > ERROR DirectKafkaInputDStream:125 - > ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for > Set([normalized-tenant4,0])) > ERROR JobScheduler:96 - Error generating jobs for time 1447929990000 ms > org.apache.spark.SparkException: > ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for > Set([normalized-tenant4,0])) > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > at scala.Option.orElse(Option.scala:257) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at > scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245) > at scala.util.Try$.apply(Try.scala:161) > at > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245) > at org.apache.spark.streaming.scheduler.JobGenerator.org > $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > > Thanks in advance please help how to resolve the issue. > > > > > > > > Thanks & Regards, > > *Ganga Phani Charan Adabala | Software Engineer* > > o: +91-40-23116680 | c: +91-9491418099 > > EiQ Networks, Inc. <http://www.eiqnetworks.com/> > > > > > > [image: cid:image001.png@01D11C9D.AF5CC1F0] <http://www.eiqnetworks.com/> > > *"This email is intended only for the use of the individual or entity > named above and may contain information that is confidential and > privileged. If you are not the intended recipient, you are hereby notified > that any dissemination, distribution or copying of the email is strictly > prohibited. If you have received this email in error, please destroy > the original message."* >