Stack trace is 15/09/09 22:49:52 ERROR kafka.KafkaRDD: Lost leader for topic topicname partition 99, sleeping for 200ms kafka.common.NotLeaderForPartitionException at sun.reflect.GeneratedConstructorAccessor26.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:374) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:142) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:151) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) at com.example.hadoop.spark.consumer.KafkaStreamTransformations.call(KafkaStreamTransformations.java:147) at com.example.hadoop.spark.consumer.KafkaStreamTransformations.call(KafkaStreamTransformations.java:35) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/09/09 22:49:52 ERROR consumer.KafkaStreamTransformations: Error while consuming messages from kafka
Actual code is : In driver : final KafkaStreamTransformations transformations = new KafkaStreamTransformations (...); directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>() { @Override public Void call(JavaRDD<byte[][]> v1) throws Exception { v1.foreachPartition(transformations); return null; } }); -------------------------------------------- In KafkaStreamTransformations : @Override public void call(Iterator<byte[][]> t) throws Exception { try{ while(t.hasNext()){ ...long running task } }catch(Exception e){ e.printStackTrace(); logger.error("Error while consuming messages from kafka"); } On Thu, Sep 10, 2015 at 6:58 PM, Cody Koeninger <c...@koeninger.org> wrote: > Post the actual stacktrace you're getting > > On Thu, Sep 10, 2015 at 12:21 AM, Shushant Arora < > shushantaror...@gmail.com> wrote: > >> Executors in spark streaming 1.3 fetch messages from kafka in batches and >> what happens when executor takes longer time to complete a fetch batch >> >> say in >> >> >> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>() { >> >> @Override >> public Void call(JavaRDD<byte[][]> v1) throws Exception { >> v1.foreachPartition(new VoidFunction<Iterator<byte[][]>>{ >> @Override >> public void call(Iterator<byte[][]> t) throws Exception { >> //long running task >> }});}}); >> >> Will this long running task drops the connectio of executor with kafka >> brokers- >> And how to handle that. I am getting Connection tmeout in my code. >> >> >> >> >