Stack trace is
15/09/09 22:49:52 ERROR kafka.KafkaRDD: Lost leader for topic topicname
partition 99,  sleeping for 200ms
kafka.common.NotLeaderForPartitionException
        at sun.reflect.GeneratedConstructorAccessor26.newInstance(Unknown
Source)
        at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at java.lang.Class.newInstance(Class.java:374)
        at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73)
        at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:142)
        at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:151)
        at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
        at
com.example.hadoop.spark.consumer.KafkaStreamTransformations.call(KafkaStreamTransformations.java:147)
        at
com.example.hadoop.spark.consumer.KafkaStreamTransformations.call(KafkaStreamTransformations.java:35)
        at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198)
        at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198)
        at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
        at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
        at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
        at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
15/09/09 22:49:52 ERROR consumer.KafkaStreamTransformations: Error while
consuming messages from kafka




Actual code is :

In driver :
final KafkaStreamTransformations transformations = new
KafkaStreamTransformations
(...);

directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>() {

@Override
public Void call(JavaRDD<byte[][]> v1) throws Exception {
v1.foreachPartition(transformations);
return null;
}
});
--------------------------------------------

In KafkaStreamTransformations :


@Override
public void call(Iterator<byte[][]> t) throws Exception {
try{
while(t.hasNext()){
...long running task
}
}catch(Exception e){
e.printStackTrace();
logger.error("Error while consuming messages from kafka");
}






On Thu, Sep 10, 2015 at 6:58 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Post the actual stacktrace you're getting
>
> On Thu, Sep 10, 2015 at 12:21 AM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> Executors in spark streaming 1.3 fetch messages from kafka in batches and
>> what happens when executor takes longer time to complete a fetch batch
>>
>> say in
>>
>>
>> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>() {
>>
>> @Override
>> public Void call(JavaRDD<byte[][]> v1) throws Exception {
>> v1.foreachPartition(new  VoidFunction<Iterator<byte[][]>>{
>> @Override
>> public void call(Iterator<byte[][]> t) throws Exception {
>> //long running task
>> }});}});
>>
>> Will this long running task drops the connectio of executor with kafka
>> brokers-
>> And how to handle that. I am getting Connection tmeout in my code.
>>
>>
>>
>>
>

Reply via email to