Well, working backwards down the stack trace... at java.nio.Buffer.limit(Buffer.java:275)
That exception gets thrown if the limit is negative or greater than the buffer's capacity at kafka.message.Message.sliceDelimited(Message.scala:236) If size had been negative, it would have just returned null, so we know the exception got thrown because the size was greater than the buffer's capacity I haven't seen that before... maybe a corrupted message of some kind? If that problem is reproducible, try providing an explicit argument for messageHandler, with a function that logs the message offset. On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <nicolas.ph...@gmail.com> wrote: > Hello, > > When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark > Streaming Kafka method createDirectStream, everything is fine till a driver > error happened (driver is killed, connection lost...). When the driver pops > up again, it resumes the processing with the checkpoint in HDFS. Except, I > got this: > > 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; > aborting job > 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job > 1437032118000 ms.0 > org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in > stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 > (TID 16, slave05.local): java.lang.IllegalArgumentException > at java.nio.Buffer.limit(Buffer.java:275) > at kafka.message.Message.sliceDelimited(Message.scala:236) > at kafka.message.Message.payload(Message.scala:218) > at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32) > at > org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) > at > org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) > at > org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at > org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93) > at > org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48) > at > org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) > at > org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > This is happening only when I'm doing a full data processing from Kafka. > If there's no load, when you killed the driver and then restart, it resumes > the checkpoint as expected without missing data. Did someone encounters > something similar ? How did you solve this ? > > Regards, > > Nicolas PHUNG >