Hello,

When I'm reprocessing the data from kafka (about 40 Gb) with the new
Spark Streaming Kafka method createDirectStream, everything is fine
till a driver error happened (driver is killed, connection lost...).
When the driver pops up again, it resumes the processing with the
checkpoint in HDFS. Except, I got this:

15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4
times; aborting job
15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job
1437032118000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3
in stage 4.0 (TID 16, slave05.local):
java.lang.IllegalArgumentException
        at java.nio.Buffer.limit(Buffer.java:275)
        at kafka.message.Message.sliceDelimited(Message.scala:236)
        at kafka.message.Message.payload(Message.scala:218)
        at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
        at 
org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
        at 
org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
        at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at 
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
        at 
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
        at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
        at 
org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
        at 
org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

This is happening only when I'm doing a full data processing from Kafka. If
there's no load, when you killed the driver and then restart, it resumes
the checkpoint as expected without missing data. Did someone encounters
something similar ? How did you solve this ?

Regards,

Nicolas PHUNG

Reply via email to