Hello, When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this:
15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:275) at kafka.message.Message.sliceDelimited(Message.scala:236) at kafka.message.Message.payload(Message.scala:218) at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) This is happening only when I'm doing a full data processing from Kafka. If there's no load, when you killed the driver and then restart, it resumes the checkpoint as expected without missing data. Did someone encounters something similar ? How did you solve this ? Regards, Nicolas PHUNG