Hey Sparkers, I would like to use Spark Streaming in production to observe a directory and process files that are put inside it. The problem is that some of those files can be broken leading to a IOException from the input reader. This should be fine for the framework I think: the exception should be caught by Spark Streaming and logged somewhere and the file causing the problem should be skipped. Instead, when the exception is thrown the job is aborted with error and no other files are processed. Ideally I would like to have my Spark Streaming job to run forever and if something is not readable, to just log it but stay alive. How can I achieve this?
The stack of the errors that kill my job is similar to 15/08/09 23:42:27 ERROR o.a.s.e.Executor Exception in task 823.0 in stage 0.0 (TID 823) java.io.IOException: unexpected end of stream at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:971) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:506) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:335) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:425) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:485) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.fillBuffer(CompressedSplitLineReader.java:130) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) at org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.readLine(CompressedSplitLineReader.java:159) at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:185) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.WritablePartitionedIterator$$anon$3.writeNext(WritablePartitionedPairCollection.scala:105) at org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Thank you, Mario