Well, working backwards down the stack trace...

at java.nio.Buffer.limit(Buffer.java:275)

That exception gets thrown if the limit is negative or greater than
the buffer's capacity


at kafka.message.Message.sliceDelimited(Message.scala:236)

If size had been negative, it would have just returned null, so we know the
exception got thrown because the size was greater than the buffer's capacity


I haven't seen that before... maybe a corrupted message of some kind?

If that problem is reproducible, try providing an explicit argument for
messageHandler, with a function that logs the message offset.


On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung <nicolas.ph...@gmail.com>
wrote:

> Hello,
>
> When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark 
> Streaming Kafka method createDirectStream, everything is fine till a driver 
> error happened (driver is killed, connection lost...). When the driver pops 
> up again, it resumes the processing with the checkpoint in HDFS. Except, I 
> got this:
>
> 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; 
> aborting job
> 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 
> 1437032118000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in 
> stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 
> (TID 16, slave05.local): java.lang.IllegalArgumentException
>       at java.nio.Buffer.limit(Buffer.java:275)
>       at kafka.message.Message.sliceDelimited(Message.scala:236)
>       at kafka.message.Message.payload(Message.scala:218)
>       at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
>       at 
> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>       at 
> org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395)
>       at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176)
>       at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at 
> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>       at 
> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>       at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>       at 
> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
>       at 
> org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
>       at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48)
>       at 
> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>       at 
> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>       at org.apache.spark.scheduler.Task.run(Task.scala:64)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
>
> This is happening only when I'm doing a full data processing from Kafka.
> If there's no load, when you killed the driver and then restart, it resumes
> the checkpoint as expected without missing data. Did someone encounters
> something similar ? How did you solve this ?
>
> Regards,
>
> Nicolas PHUNG
>

Reply via email to